|
45 | 45 | import io.servicetalk.transport.api.IoExecutor;
|
46 | 46 |
|
47 | 47 | import java.net.InetSocketAddress;
|
| 48 | +import java.net.MalformedURLException; |
48 | 49 | import java.util.concurrent.ConcurrentHashMap;
|
49 | 50 | import java.util.concurrent.ConcurrentMap;
|
50 | 51 | import java.util.function.Function;
|
|
63 | 64 | * A builder of {@link StreamingHttpClient} instances which have a capacity to call any server based on the parsed
|
64 | 65 | * absolute-form URL address information from each {@link StreamingHttpRequest}.
|
65 | 66 | * <p>
|
| 67 | + * If {@link HttpRequestMetaData#requestTarget()} is not an absolute-form URL, a {@link MalformedURLException} will be |
| 68 | + * returned or thrown. |
| 69 | + * <p> |
66 | 70 | * It also provides a good set of default settings and configurations, which could be used by most users as-is or
|
67 | 71 | * could be overridden to address specific use cases.
|
68 | 72 | *
|
@@ -118,23 +122,21 @@ public StreamingHttpClient buildStreaming() {
|
118 | 122 | /**
|
119 | 123 | * Returns a cached {@link UrlKey} or creates a new one based on {@link StreamingHttpRequest} information.
|
120 | 124 | */
|
121 |
| - private static final class CachingKeyFactory |
122 |
| - implements Function<HttpRequestMetaData, UrlKey>, AsyncCloseable { |
| 125 | + private static final class CachingKeyFactory implements AsyncCloseable { |
123 | 126 |
|
124 | 127 | private final ConcurrentMap<String, UrlKey> urlKeyCache = new ConcurrentHashMap<>();
|
125 | 128 |
|
126 |
| - @Override |
127 |
| - public UrlKey apply(final HttpRequestMetaData metaData) { |
| 129 | + public UrlKey get(final HttpRequestMetaData metaData) throws MalformedURLException { |
128 | 130 | final String host = metaData.host();
|
129 | 131 | if (host == null) {
|
130 |
| - throw new IllegalArgumentException( |
| 132 | + throw new MalformedURLException( |
131 | 133 | "Request-target does not contain target host address: " + metaData.requestTarget() +
|
132 | 134 | ", expected absolute-form URL");
|
133 | 135 | }
|
134 | 136 |
|
135 | 137 | final String scheme = metaData.scheme();
|
136 | 138 | if (scheme == null) {
|
137 |
| - throw new IllegalArgumentException("Request-target does not contains scheme: " + |
| 139 | + throw new MalformedURLException("Request-target does not contains scheme: " + |
138 | 140 | metaData.requestTarget() + ", expected absolute-form URL");
|
139 | 141 | }
|
140 | 142 |
|
@@ -262,21 +264,32 @@ private static final class StreamingUrlHttpClient implements FilterableStreaming
|
262 | 264 | this.executionContext = requireNonNull(executionContext);
|
263 | 265 | }
|
264 | 266 |
|
265 |
| - private FilterableStreamingHttpClient selectClient( |
266 |
| - HttpRequestMetaData metaData) { |
267 |
| - return group.get(keyFactory.apply(metaData)); |
| 267 | + private FilterableStreamingHttpClient selectClient(HttpRequestMetaData metaData) throws MalformedURLException { |
| 268 | + return group.get(keyFactory.get(metaData)); |
268 | 269 | }
|
269 | 270 |
|
270 | 271 | @Override
|
271 | 272 | public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnection(
|
272 | 273 | final HttpExecutionStrategy strategy, final HttpRequestMetaData metaData) {
|
273 |
| - return defer(() -> selectClient(metaData).reserveConnection(strategy, metaData).subscribeShareContext()); |
| 274 | + return defer(() -> { |
| 275 | + try { |
| 276 | + return selectClient(metaData).reserveConnection(strategy, metaData).subscribeShareContext(); |
| 277 | + } catch (Throwable t) { |
| 278 | + return Single.<FilterableReservedStreamingHttpConnection>failed(t).subscribeShareContext(); |
| 279 | + } |
| 280 | + }); |
274 | 281 | }
|
275 | 282 |
|
276 | 283 | @Override
|
277 | 284 | public Single<StreamingHttpResponse> request(final HttpExecutionStrategy strategy,
|
278 | 285 | final StreamingHttpRequest request) {
|
279 |
| - return defer(() -> selectClient(request).request(strategy, request).subscribeShareContext()); |
| 286 | + return defer(() -> { |
| 287 | + try { |
| 288 | + return selectClient(request).request(strategy, request).subscribeShareContext(); |
| 289 | + } catch (Throwable t) { |
| 290 | + return Single.<StreamingHttpResponse>failed(t).subscribeShareContext(); |
| 291 | + } |
| 292 | + }); |
280 | 293 | }
|
281 | 294 |
|
282 | 295 | @Override
|
|
0 commit comments