diff --git a/riptide-backup/src/test/java/org/zalando/riptide/backup/BackupRequestPluginTest.java b/riptide-backup/src/test/java/org/zalando/riptide/backup/BackupRequestPluginTest.java index c7e2ef865..6a8272913 100644 --- a/riptide-backup/src/test/java/org/zalando/riptide/backup/BackupRequestPluginTest.java +++ b/riptide-backup/src/test/java/org/zalando/riptide/backup/BackupRequestPluginTest.java @@ -8,11 +8,11 @@ import org.junit.Rule; import org.junit.Test; import org.springframework.core.task.AsyncListenableTaskExecutor; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; import org.zalando.riptide.capture.Completion; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.util.concurrent.Executors; @@ -37,9 +37,10 @@ public final class BackupRequestPluginTest { private final CloseableHttpClient client = HttpClientBuilder.create().build(); private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(Executors.newFixedThreadPool(2)); - private final AsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); + private final ClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client); private final Http unit = Http.builder() + .executor(executor) .requestFactory(factory) .baseUrl(driver.getBaseUrl()) .plugin(new BackupRequestPlugin(newSingleThreadScheduledExecutor(), 1, SECONDS, executor)) @@ -125,6 +126,7 @@ public void shouldSendBackupRequestsForGetWithBody() throws Throwable { @Test public void shouldSendBackupRequestForCustomSafeDetectedRequest() throws Throwable { final Http unit = Http.builder() + .executor(executor) .requestFactory(factory) .baseUrl(driver.getBaseUrl()) .plugin(new BackupRequestPlugin(newSingleThreadScheduledExecutor(), 1, SECONDS, executor) diff --git a/riptide-capture/src/test/java/org/zalando/riptide/capture/CaptureTest.java b/riptide-capture/src/test/java/org/zalando/riptide/capture/CaptureTest.java index 3e20a47b3..09e919370 100644 --- a/riptide-capture/src/test/java/org/zalando/riptide/capture/CaptureTest.java +++ b/riptide-capture/src/test/java/org/zalando/riptide/capture/CaptureTest.java @@ -11,13 +11,14 @@ import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http; import java.io.IOException; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -42,10 +43,11 @@ public final class CaptureTest { private final MockRestServiceServer server; public CaptureTest() { - final AsyncRestTemplate template = new AsyncRestTemplate(); + final RestTemplate template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); this.unit = Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converter(createJsonConverter()) .converter(new StringHttpMessageConverter()) .baseUrl("https://api.example.com") diff --git a/riptide-core/src/main/java/org/zalando/riptide/AbstractCancelableCompletableFuture.java b/riptide-core/src/main/java/org/zalando/riptide/AbstractCancelableCompletableFuture.java index 76e1db2e4..545d929cb 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/AbstractCancelableCompletableFuture.java +++ b/riptide-core/src/main/java/org/zalando/riptide/AbstractCancelableCompletableFuture.java @@ -21,7 +21,7 @@ public abstract class AbstractCancelableCompletableFuture extends Completable * @param generic future return type * @return a new incomplete future to be used when constructing a dependent future */ - @Override + // TODO (Java 9): @Override public CompletableFuture newIncompleteFuture() { return preserveCancelability(this); } diff --git a/riptide-core/src/main/java/org/zalando/riptide/DefaultHttp.java b/riptide-core/src/main/java/org/zalando/riptide/DefaultHttp.java index 02f7cea67..579b7447b 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/DefaultHttp.java +++ b/riptide-core/src/main/java/org/zalando/riptide/DefaultHttp.java @@ -4,31 +4,35 @@ import com.google.common.collect.ImmutableMultimap; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.HttpMessageConverter; import java.net.URI; import java.util.List; +import java.util.concurrent.Executor; import java.util.function.Supplier; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.springframework.http.HttpHeaders.readOnlyHttpHeaders; final class DefaultHttp implements Http { private static final HttpHeaders EMPTY = readOnlyHttpHeaders(new HttpHeaders()); - private final AsyncClientHttpRequestFactory requestFactory; + private final Executor executor; + private final ClientHttpRequestFactory requestFactory; private final MessageWorker worker; private final Supplier baseUrlProvider; private final RequestArguments arguments; private final Plugin plugin; - DefaultHttp(final AsyncClientHttpRequestFactory requestFactory, final List> converters, + DefaultHttp(final Executor executor, final ClientHttpRequestFactory requestFactory, + final List> converters, final Supplier baseUrlProvider, final UrlResolution resolution, final Plugin plugin) { - this.requestFactory = checkNotNull(requestFactory, "request factory"); + this.executor = requireNonNull(executor, "executor"); + this.requestFactory = requireNonNull(requestFactory, "request factory"); this.worker = new MessageWorker(converters); - this.baseUrlProvider = checkNotNull(baseUrlProvider, "base url provider"); + this.baseUrlProvider = requireNonNull(baseUrlProvider, "base url provider"); this.arguments = RequestArguments.create().withUrlResolution(resolution); this.plugin = plugin; } @@ -178,7 +182,7 @@ public Requester execute(final HttpMethod method) { } private Requester execute(final RequestArguments arguments) { - return new Requester(requestFactory, worker, arguments, plugin, ImmutableMultimap.of(), EMPTY); + return new Requester(executor, requestFactory, worker, arguments, plugin, ImmutableMultimap.of(), EMPTY); } } diff --git a/riptide-core/src/main/java/org/zalando/riptide/DefaultHttpBuilder.java b/riptide-core/src/main/java/org/zalando/riptide/DefaultHttpBuilder.java index 54f11eabb..7199d0473 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/DefaultHttpBuilder.java +++ b/riptide-core/src/main/java/org/zalando/riptide/DefaultHttpBuilder.java @@ -1,24 +1,27 @@ package org.zalando.riptide; import com.google.common.collect.ImmutableList; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http.ConfigurationStage; +import org.zalando.riptide.Http.ExecutorStage; import org.zalando.riptide.Http.FinalStage; import org.zalando.riptide.Http.RequestFactoryStage; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static org.zalando.riptide.Plugin.compound; -final class DefaultHttpBuilder implements RequestFactoryStage, ConfigurationStage, FinalStage { +final class DefaultHttpBuilder implements ExecutorStage, RequestFactoryStage, ConfigurationStage, FinalStage { // package private so we can trick code coverage static class Converters { @@ -41,7 +44,8 @@ private Plugins() { private static final UrlResolution DEFAULT_RESOLUTION = UrlResolution.RFC; - private AsyncClientHttpRequestFactory requestFactory; + private Executor executor; + private ClientHttpRequestFactory requestFactory; private final List> converters = new ArrayList<>(); private Supplier baseUrlProvider = () -> null; private UrlResolution resolution = DEFAULT_RESOLUTION; @@ -52,7 +56,13 @@ private Plugins() { } @Override - public ConfigurationStage requestFactory(final AsyncClientHttpRequestFactory requestFactory) { + public RequestFactoryStage executor(final Executor executor) { + this.executor = executor; + return this; + } + + @Override + public ConfigurationStage requestFactory(final ClientHttpRequestFactory requestFactory) { this.requestFactory = requestFactory; return this; } @@ -63,7 +73,7 @@ public ConfigurationStage defaultConverters() { } @Override - public ConfigurationStage converters(final Iterable> converters) { + public ConfigurationStage converters(@Nonnull final Iterable> converters) { converters.forEach(this::converter); return this; } @@ -108,7 +118,7 @@ public ConfigurationStage defaultPlugins() { } @Override - public ConfigurationStage plugins(final Iterable plugins) { + public ConfigurationStage plugins(@Nonnull final Iterable plugins) { plugins.forEach(this::plugin); return this; } @@ -121,7 +131,7 @@ public ConfigurationStage plugin(final Plugin plugin) { @Override public Http build() { - return new DefaultHttp(requestFactory, converters(), baseUrlProvider, resolution, compound(plugins())); + return new DefaultHttp(executor, requestFactory, converters(), baseUrlProvider, resolution, compound(plugins())); } private List> converters() { diff --git a/riptide-core/src/main/java/org/zalando/riptide/Http.java b/riptide-core/src/main/java/org/zalando/riptide/Http.java index 563586df5..4fd2f2b86 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/Http.java +++ b/riptide-core/src/main/java/org/zalando/riptide/Http.java @@ -2,16 +2,13 @@ import org.apiguardian.api.API; import org.springframework.http.HttpMethod; -import org.springframework.http.client.AsyncClientHttpRequestFactory; -import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; -import org.springframework.web.client.AsyncRestTemplate; import org.springframework.web.client.RestTemplate; import javax.annotation.Nullable; import java.net.URI; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static org.apiguardian.api.API.Status.STABLE; @@ -26,7 +23,6 @@ * .dispatch(..)} * * @see RestTemplate - * @see AsyncRestTemplate */ @API(status = STABLE) public interface Http { @@ -67,17 +63,16 @@ public interface Http { Requester execute(HttpMethod method, URI uri); Requester execute(HttpMethod method); - static RequestFactoryStage builder() { + static ExecutorStage builder() { return new DefaultHttpBuilder(); } + interface ExecutorStage { + RequestFactoryStage executor(Executor executor); + } + interface RequestFactoryStage { - ConfigurationStage requestFactory(AsyncClientHttpRequestFactory requestFactory); - default ConfigurationStage simpleRequestFactory(final ExecutorService executor) { - final SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setTaskExecutor(new ConcurrentTaskExecutor(executor)); - return requestFactory(factory); - } + ConfigurationStage requestFactory(ClientHttpRequestFactory requestFactory); } interface ConfigurationStage extends FinalStage { diff --git a/riptide-core/src/main/java/org/zalando/riptide/MessageWorker.java b/riptide-core/src/main/java/org/zalando/riptide/MessageWorker.java index 88e893c93..af1bca7d7 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/MessageWorker.java +++ b/riptide-core/src/main/java/org/zalando/riptide/MessageWorker.java @@ -6,13 +6,14 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.http.client.AsyncClientHttpRequest; +import org.springframework.http.client.ClientHttpRequest; import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.web.client.HttpMessageConverterExtractor; import org.springframework.web.client.ResponseExtractor; import org.springframework.web.client.RestClientException; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.ParameterizedType; @@ -66,7 +67,8 @@ private I cast(final Object result) { } @Override - public void write(final AsyncClientHttpRequest request, final HttpEntity entity) throws IOException { + public void write(@Nonnull final ClientHttpRequest request, @Nonnull final HttpEntity entity) + throws IOException { final HttpHeaders headers = entity.getHeaders(); request.getHeaders().putAll(headers); diff --git a/riptide-core/src/main/java/org/zalando/riptide/MessageWriter.java b/riptide-core/src/main/java/org/zalando/riptide/MessageWriter.java index 37d232253..cbf48ae0b 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/MessageWriter.java +++ b/riptide-core/src/main/java/org/zalando/riptide/MessageWriter.java @@ -2,7 +2,7 @@ import org.apiguardian.api.API; import org.springframework.http.HttpEntity; -import org.springframework.http.client.AsyncClientHttpRequest; +import org.springframework.http.client.ClientHttpRequest; import java.io.IOException; @@ -11,6 +11,6 @@ @API(status = STABLE) interface MessageWriter { - void write(AsyncClientHttpRequest request, HttpEntity entity) throws IOException; + void write(ClientHttpRequest request, HttpEntity entity) throws IOException; } diff --git a/riptide-core/src/main/java/org/zalando/riptide/Requester.java b/riptide-core/src/main/java/org/zalando/riptide/Requester.java index 378518bd1..cb6fd47a3 100644 --- a/riptide-core/src/main/java/org/zalando/riptide/Requester.java +++ b/riptide-core/src/main/java/org/zalando/riptide/Requester.java @@ -8,10 +8,9 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; -import org.springframework.http.client.AsyncClientHttpRequest; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequest; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.ClientHttpResponse; -import org.springframework.util.concurrent.ListenableFuture; import org.zalando.fauxpas.ThrowingUnaryOperator; import javax.annotation.Nullable; @@ -21,14 +20,17 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.apiguardian.api.API.Status.STABLE; -import static org.zalando.riptide.CancelableCompletableFuture.preserveCancelability; +import static org.zalando.fauxpas.FauxPas.throwingFunction; +import static org.zalando.fauxpas.FauxPas.throwingRunnable; @API(status = STABLE) public final class Requester extends Dispatcher { - private final AsyncClientHttpRequestFactory requestFactory; + private final Executor executor; + private final ClientHttpRequestFactory requestFactory; private final MessageWorker worker; private final RequestArguments arguments; private final Plugin plugin; @@ -36,9 +38,11 @@ public final class Requester extends Dispatcher { private final ImmutableMultimap query; private final HttpHeaders headers; - Requester(final AsyncClientHttpRequestFactory requestFactory, final MessageWorker worker, - final RequestArguments arguments, final Plugin plugin, final ImmutableMultimap query, + Requester(final Executor executor, final ClientHttpRequestFactory requestFactory, + final MessageWorker worker, final RequestArguments arguments, final Plugin plugin, + final ImmutableMultimap query, final HttpHeaders headers) { + this.executor = executor; this.requestFactory = requestFactory; this.worker = worker; this.arguments = arguments; @@ -47,8 +51,10 @@ public final class Requester extends Dispatcher { this.headers = headers; } - private Requester(final Requester requester, final ImmutableMultimap query, final HttpHeaders headers) { - this(requester.requestFactory, requester.worker, requester.arguments, requester.plugin, query, headers); + private Requester(final Requester requester, final ImmutableMultimap query, + final HttpHeaders headers) { + this(requester.executor, requester.requestFactory, requester.worker, requester.arguments, + requester.plugin, query, headers); } public Requester queryParam(final String name, final String value) { @@ -154,34 +160,35 @@ private final class ResponseDispatcher extends Dispatcher { @Override public CompletableFuture call(final Route route) { - try { - final RequestExecution before = plugin.beforeSend(this::send); - final RequestExecution after = plugin.beforeDispatch(dispatch(before, route)); - - return after.execute(arguments); - } catch (final Exception e) { - // for issues that occur before the future was successfully created, i.e. request being sent - final CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } + final RequestExecution before = plugin.beforeSend(this::send); + final RequestExecution after = plugin.beforeDispatch(dispatch(before, route)); + + // TODO get rid of this + return throwingFunction(after::execute).apply(arguments); } - private CompletableFuture send(final RequestArguments arguments) throws IOException { - final AsyncClientHttpRequest request = createRequest(arguments); - // TODO do this in the IO thread! - worker.write(request, entity); - final ListenableFuture original = request.executeAsync(); + private CompletableFuture send(final RequestArguments arguments) { + final CompletableFuture future = new CompletableFuture<>(); + + executor.execute(throwingRunnable(() -> { + try { + final ClientHttpRequest request = createRequest(arguments); + + worker.write(request, entity); + + future.complete(request.execute()); + } catch (final Exception e) { + future.completeExceptionally(e); + } + })); - final CompletableFuture future = preserveCancelability(original); - original.addCallback(future::complete, future::completeExceptionally); return future; } - private AsyncClientHttpRequest createRequest(final RequestArguments arguments) throws IOException { + private ClientHttpRequest createRequest(final RequestArguments arguments) throws IOException { final URI requestUri = arguments.getRequestUri(); final HttpMethod method = arguments.getMethod(); - return requestFactory.createAsyncRequest(requestUri, method); + return requestFactory.createRequest(requestUri, method); } private RequestExecution dispatch(final RequestExecution execution, final Route route) { diff --git a/riptide-core/src/test/java/org/zalando/riptide/AsyncTest.java b/riptide-core/src/test/java/org/zalando/riptide/AsyncTest.java index 7cf7ac3e8..9ec78a791 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/AsyncTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/AsyncTest.java @@ -16,7 +16,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import static org.hamcrest.Matchers.instanceOf; @@ -58,11 +58,13 @@ public AsyncTest() { public void shouldCall() throws Exception { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get(url).dispatch(series(), - on(SUCCESSFUL).call(verifier)).join(); + unit.get(url) + .dispatch(series(), + on(SUCCESSFUL).call(verifier)) + .join(); verify(verifier).tryAccept(any()); } @@ -71,12 +73,13 @@ public void shouldCall() throws Exception { public void shouldExpand() throws Exception { server.expect(requestTo(URI.create("http://localhost/123"))).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); unit.get("http://localhost/{id}", 123) - .dispatch(series(), - on(SUCCESSFUL).call(verifier)).join(); + .dispatch(series(), + on(SUCCESSFUL).call(verifier)) + .join(); verify(verifier).tryAccept(any()); } @@ -85,11 +88,12 @@ public void shouldExpand() throws Exception { public void shouldCallWithoutParameters() throws Exception { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingRunnable verifier = mock(ThrowingRunnable.class); + @SuppressWarnings("unchecked") final ThrowingRunnable verifier = mock(ThrowingRunnable.class); - unit.get(url).dispatch(series(), - on(SUCCESSFUL).call(verifier)).join(); + unit.get(url) + .dispatch(series(), + on(SUCCESSFUL).call(verifier)) + .join(); verify(verifier).tryRun(); } @@ -99,8 +103,8 @@ public void shouldCallWithoutParameters() throws Exception { public void shouldCallWithHeaders() throws Exception { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); unit.get(url).headers(new HttpHeaders()).dispatch(series(), on(SUCCESSFUL).call(verifier)).join(); @@ -112,11 +116,13 @@ public void shouldCallWithHeaders() throws Exception { public void shouldCallWithBody() throws Exception { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get(url).body("test").dispatch(series(), - on(SUCCESSFUL).call(verifier)).join(); + unit.get(url).body("test") + .dispatch(series(), + on(SUCCESSFUL).call(verifier)) + .join(); verify(verifier).tryAccept(any()); } @@ -125,11 +131,15 @@ public void shouldCallWithBody() throws Exception { public void shouldCallWithHeadersAndBody() throws Exception { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get(url).headers(new HttpHeaders()).body("test").dispatch(series(), - on(SUCCESSFUL).call(verifier)).join(); + unit.get(url) + .headers(new HttpHeaders()) + .body("test") + .dispatch(series(), + on(SUCCESSFUL).call(verifier)) + .join(); verify(verifier).tryAccept(any()); } @@ -150,14 +160,6 @@ public void shouldCapture() throws InterruptedException, ExecutionException, Tim .get(100, TimeUnit.MILLISECONDS); } - @Test - public void shouldIgnoreException() { - server.expect(requestTo(url)).andRespond(withSuccess()); - - unit.get(url).dispatch(series(), - on(CLIENT_ERROR).call(pass())); - } - @Test public void shouldHandleExceptionWithGet() { server.expect(requestTo(url)).andRespond(withSuccess()); @@ -174,14 +176,17 @@ public void shouldHandleExceptionWithGet() { public void shouldHandleNoRouteExceptionWithCallback() { server.expect(requestTo(url)).andRespond(withSuccess()); - @SuppressWarnings("unchecked") final BiConsumer callback = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final BiFunction callback = + mock(BiFunction.class); - unit.get(url).dispatch(series(), - on(CLIENT_ERROR).call(pass())) - .whenComplete(callback); + unit.get(url) + .dispatch(series(), + on(CLIENT_ERROR).call(pass())) + .handle(callback) + .join(); final ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); - verify(callback).accept(eq(null), captor.capture()); + verify(callback).apply(eq(null), captor.capture()); final Exception exception = captor.getValue(); assertThat(exception, is(instanceOf(CompletionException.class))); @@ -194,8 +199,9 @@ public void shouldIgnoreSuccessWhenHandlingExceptionWithCallback() { @SuppressWarnings("unchecked") final Consumer callback = mock(Consumer.class); - unit.get(url).dispatch(series(), - on(SUCCESSFUL).call(pass())) + unit.get(url) + .dispatch(series(), + on(SUCCESSFUL).call(pass())) .whenComplete((result, throwable) -> { if (throwable != null) { callback.accept(throwable); diff --git a/riptide-core/src/test/java/org/zalando/riptide/CallTest.java b/riptide-core/src/test/java/org/zalando/riptide/CallTest.java index 042e8dd28..46c55478c 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/CallTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/CallTest.java @@ -56,13 +56,14 @@ public void shouldCallEntity() throws Exception { .body(new ClassPathResource("account.json")) .contentType(APPLICATION_JSON)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); unit.get("accounts/123") .dispatch(status(), on(OK).call(AccountBody.class, verifier), - anyStatus().call(this::fail)); + anyStatus().call(this::fail)) + .join(); verify(verifier).tryAccept(any(AccountBody.class)); } @@ -74,16 +75,17 @@ public void shouldCallResponseEntity() throws Exception { .body(new ClassPathResource("account.json")) .contentType(APPLICATION_JSON)); - @SuppressWarnings("unchecked") - final ThrowingConsumer, Exception> verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer, Exception> verifier = mock( + ThrowingConsumer.class); unit.get("accounts/123") .dispatch(status(), on(OK).call(responseEntityOf(AccountBody.class), verifier), - anyStatus().call(this::fail)); + anyStatus().call(this::fail)) + .join(); - @SuppressWarnings("unchecked") - final ArgumentCaptor> captor = ArgumentCaptor.forClass(ResponseEntity.class); + @SuppressWarnings("unchecked") final ArgumentCaptor> captor = ArgumentCaptor.forClass( + ResponseEntity.class); verify(verifier).tryAccept(captor.capture()); final ResponseEntity entity = captor.getValue(); @@ -99,13 +101,13 @@ public void shouldCallWithoutParameters() throws Exception { .body(new ClassPathResource("account.json")) .contentType(APPLICATION_JSON)); - @SuppressWarnings("unchecked") - final ThrowingRunnable verifier = mock(ThrowingRunnable.class); + @SuppressWarnings("unchecked") final ThrowingRunnable verifier = mock(ThrowingRunnable.class); unit.get("accounts/123") .dispatch(status(), on(OK).call(verifier), - anyStatus().call(this::fail)); + anyStatus().call(this::fail)) + .join(); verify(verifier).tryRun(); } diff --git a/riptide-core/src/test/java/org/zalando/riptide/DynamicBaseUrlTest.java b/riptide-core/src/test/java/org/zalando/riptide/DynamicBaseUrlTest.java index ed4b19679..88b5eecb1 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/DynamicBaseUrlTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/DynamicBaseUrlTest.java @@ -44,11 +44,13 @@ public void shouldUseDynamicBaseUrl() { unit.get("/123") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); unit.get("/123") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); server.verify(); verify(baseUrlProviderMock, times(2)).get(); diff --git a/riptide-core/src/test/java/org/zalando/riptide/ExceptionHandlingTest.java b/riptide-core/src/test/java/org/zalando/riptide/ExceptionHandlingTest.java index 001c389cb..eeff57a0a 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/ExceptionHandlingTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/ExceptionHandlingTest.java @@ -32,24 +32,24 @@ public ExceptionHandlingTest() { } @Before - public void setUp() throws Exception { + public void setUp() { server.expect(requestTo("https://api.example.com/")) .andRespond(withSuccess()); } @After - public void tearDown() throws Exception { + public void tearDown() { server.verify(); } @Test public void shouldNotThrowIOExceptionWhenSettingBody() { - unit.get("/").body("body").call(tree); + unit.get("/").body("body").call(tree).join(); } @Test public void shouldNotThrowIOExceptionWhenDispatchingWithoutBody() { - unit.get("/").call(tree); + unit.get("/").call(tree).join(); } @Test diff --git a/riptide-core/src/test/java/org/zalando/riptide/FailedDispatchTest.java b/riptide-core/src/test/java/org/zalando/riptide/FailedDispatchTest.java index 06204d224..26273538d 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/FailedDispatchTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/FailedDispatchTest.java @@ -176,7 +176,8 @@ public void shouldPropagateIfNoMatch() throws Exception { on(TEXT_PLAIN).call(pass())), on(ACCEPTED).call(pass()), anyStatus().call(consumer)), - on(CLIENT_ERROR).call(pass())); + on(CLIENT_ERROR).call(pass())) + .join(); verify(consumer).tryAccept(any()); } @@ -198,7 +199,8 @@ public void shouldPropagateMultipleLevelsIfNoMatch() throws Exception { on(TEXT_PLAIN).call(pass())), on(ACCEPTED).call(pass())), on(CLIENT_ERROR).call(pass()), - anySeries().call(consumer)); + anySeries().call(consumer)) + .join(); verify(consumer).tryAccept(any()); } diff --git a/riptide-core/src/test/java/org/zalando/riptide/HandlesIOExceptionTest.java b/riptide-core/src/test/java/org/zalando/riptide/HandlesIOExceptionTest.java index f6ada273c..9c24ba5cd 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/HandlesIOExceptionTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/HandlesIOExceptionTest.java @@ -3,10 +3,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import java.io.IOException; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; import static org.hamcrest.Matchers.instanceOf; import static org.springframework.http.HttpStatus.Series.SUCCESSFUL; @@ -25,12 +26,17 @@ public void shouldHandleExceptionDuringRequestCreation() { exception.expectCause(instanceOf(IOException.class)); exception.expectMessage("Could not create request"); - final AsyncClientHttpRequestFactory factory = (uri, httpMethod) -> { + final ClientHttpRequestFactory factory = (uri, httpMethod) -> { throw new IOException("Could not create request"); }; - Http.builder().requestFactory(factory).defaultConverters().build() - .get("http://localhost/") + final Http unit = Http.builder() + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(factory) + .defaultConverters() + .build(); + + unit.get("http://localhost/") .dispatch(series(), on(SUCCESSFUL).call(pass())) .join(); diff --git a/riptide-core/src/test/java/org/zalando/riptide/IOTest.java b/riptide-core/src/test/java/org/zalando/riptide/IOTest.java index c9d0779db..43ab60389 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/IOTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/IOTest.java @@ -9,12 +9,9 @@ import org.junit.Test; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import java.io.IOException; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -50,17 +47,12 @@ public String getLogin() { private final ExecutorService executor = newSingleThreadExecutor(); private final Http http = Http.builder() - .requestFactory(createRequestFactory(executor)) + .executor(executor) + .requestFactory(new SimpleClientHttpRequestFactory()) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .build(); - private static SimpleClientHttpRequestFactory createRequestFactory(final Executor executor) { - final SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setTaskExecutor(new ConcurrentTaskExecutor(executor)); - return factory; - } - private static MappingJackson2HttpMessageConverter createJsonConverter() { final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter(); converter.setObjectMapper(new ObjectMapper().findAndRegisterModules() @@ -92,7 +84,7 @@ public void shouldReadContributors() throws IOException { } @Test - public void shouldCancelRequest() throws ExecutionException, InterruptedException { + public void shouldCancelRequest() throws InterruptedException { // TODO: support proper cancellations and remove this expectation driver.addExpectation(onRequestTo("/foo"), giveEmptyResponse()); diff --git a/riptide-core/src/test/java/org/zalando/riptide/InputStreamTest.java b/riptide-core/src/test/java/org/zalando/riptide/InputStreamTest.java index 42b25c126..9a62fec03 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/InputStreamTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/InputStreamTest.java @@ -9,13 +9,14 @@ import org.springframework.http.converter.HttpMessageNotReadableException; import org.springframework.http.converter.HttpMessageNotWritableException; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; @@ -132,10 +133,11 @@ public synchronized int available() throws IOException { private final MockRestServiceServer server; public InputStreamTest() { - final AsyncRestTemplate template = new AsyncRestTemplate(); + final RestTemplate template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); this.unit = Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converter(new InputStreamHttpMessageConverter()) .baseUrl("https://api.example.com") .build(); diff --git a/riptide-core/src/test/java/org/zalando/riptide/MethodDelegateTest.java b/riptide-core/src/test/java/org/zalando/riptide/MethodDelegateTest.java index bb042516a..ed4ceb2b0 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/MethodDelegateTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/MethodDelegateTest.java @@ -146,7 +146,8 @@ public void shouldDelegate() { .andRespond(withSuccess()); tester.test(unit) - .call(pass()); + .call(pass()) + .join(); server.verify(); } diff --git a/riptide-core/src/test/java/org/zalando/riptide/MockSetup.java b/riptide-core/src/test/java/org/zalando/riptide/MockSetup.java index 6c471939f..fc1ecdf5b 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/MockSetup.java +++ b/riptide-core/src/test/java/org/zalando/riptide/MockSetup.java @@ -5,11 +5,12 @@ import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; import static com.google.common.base.MoreObjects.firstNonNull; @@ -20,7 +21,7 @@ public final class MockSetup { private final String baseUrl; private final Iterable> converters; - private final AsyncRestTemplate template; + private final RestTemplate template; private final MockRestServiceServer server; private static MappingJackson2HttpMessageConverter defaultJsonConverter() { @@ -40,7 +41,7 @@ public MockSetup(@Nullable final String baseUrl) { public MockSetup(@Nullable final String baseUrl, @Nullable final Iterable> converters) { this.baseUrl = baseUrl; this.converters = converters; - this.template = new AsyncRestTemplate(); + this.template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); } @@ -50,7 +51,8 @@ public MockRestServiceServer getServer() { public Http.ConfigurationStage getHttpBuilder() { return Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converters(firstNonNull(converters, DEFAULT_CONVERTERS)) .baseUrl(baseUrl) .defaultPlugins(); diff --git a/riptide-core/src/test/java/org/zalando/riptide/NoBaseUrlTest.java b/riptide-core/src/test/java/org/zalando/riptide/NoBaseUrlTest.java index 252d98eeb..84c6d13b2 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/NoBaseUrlTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/NoBaseUrlTest.java @@ -3,26 +3,25 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.Mock; import org.mockito.Mockito; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.SimpleClientHttpRequestFactory; import java.net.URI; -import java.util.concurrent.ExecutorService; public class NoBaseUrlTest { @Rule public final ExpectedException exception = ExpectedException.none(); - private final ExecutorService executor = Mockito.mock(ExecutorService.class); + private final ClientHttpRequestFactory requestFactory = Mockito.mock(ClientHttpRequestFactory.class); @Test public void shouldFailOnNonAbsoluteBaseUrl() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Base URL is not absolute"); - Http.builder().simpleRequestFactory(executor).baseUrl(""); + Http.builder().executor(Runnable::run).requestFactory(requestFactory).baseUrl(""); } @Test @@ -30,7 +29,7 @@ public void shouldFailOnNonAbsoluteBaseUri() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Base URL is not absolute"); - Http.builder().simpleRequestFactory(executor).baseUrl(URI.create("")); + Http.builder().executor(Runnable::run).requestFactory(requestFactory).baseUrl(URI.create("")); } @Test @@ -39,6 +38,7 @@ public void shouldFailOnProvisioningOfNonAbsoluteBaseUri() { exception.expectMessage("Base URL is not absolute"); final Http unit = Http.builder() + .executor(Runnable::run) .requestFactory(new SimpleClientHttpRequestFactory()) .baseUrl(() -> URI.create("")) .build(); diff --git a/riptide-core/src/test/java/org/zalando/riptide/RequestUriTest.java b/riptide-core/src/test/java/org/zalando/riptide/RequestUriTest.java index 135d0fc81..a8c780162 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/RequestUriTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/RequestUriTest.java @@ -12,9 +12,12 @@ import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.test.web.client.MockRestServiceServer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.net.URI; import java.util.Collection; import java.util.Objects; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.stream.Stream; @@ -119,7 +122,8 @@ public String toString() { } private interface Result { - void execute(final String baseUrl, final UrlResolution resolution, final String uri, final HttpMethod method, final Consumer tester); + void execute(final String baseUrl, final UrlResolution resolution, @Nullable final String uri, + final HttpMethod method, final Consumer tester); } @Value @@ -128,8 +132,8 @@ private static final class RequestUri implements Result { String requestUri; @Override - public void execute(final String baseUrl, final UrlResolution resolution, final String uri, - final HttpMethod method, final Consumer tester) { + public void execute(final String baseUrl, final UrlResolution resolution, @Nullable final String uri, + final HttpMethod method, @Nonnull final Consumer tester) { final MockSetup setup = new MockSetup(baseUrl); final Http unit = setup.getHttpBuilder().urlResolution(resolution).build(); @@ -161,11 +165,12 @@ private static final class Failure implements Result { String message; @Override - public void execute(final String baseUrl, final UrlResolution resolution, final String uri, + public void execute(final String baseUrl, final UrlResolution resolution, @Nullable final String uri, final HttpMethod method, final Consumer tester) { try { final Http unit = Http.builder() + .executor(Executors.newSingleThreadExecutor()) .requestFactory(new SimpleClientHttpRequestFactory()) .baseUrl(baseUrl) .urlResolution(resolution) @@ -194,8 +199,8 @@ private static Result error(final String message) { public void shouldIgnoreEmptyUri() { assumeThat(uri, is(nullValue())); - result.execute(baseUrl, resolution, uri, method, http -> - http.execute(method).call(pass())); + result.execute(baseUrl, resolution, null, method, http -> + http.execute(method).call(pass()).join()); } @Test @@ -203,7 +208,7 @@ public void shouldResolveUri() { assumeThat(uri, is(notNullValue())); result.execute(baseUrl, resolution, uri, method, http -> - http.execute(method, URI.create(uri)).call(pass())); + http.execute(method, URI.create(uri)).call(pass()).join()); } @Test @@ -211,7 +216,7 @@ public void shouldResolveUriTemplate() { assumeThat(uri, is(notNullValue())); result.execute(baseUrl, resolution, uri, method, http -> - http.execute(method, uri).call(pass())); + http.execute(method, uri).call(pass()).join()); } /** diff --git a/riptide-core/src/test/java/org/zalando/riptide/RequesterTest.java b/riptide-core/src/test/java/org/zalando/riptide/RequesterTest.java index a3be523f1..84228e833 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/RequesterTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/RequesterTest.java @@ -10,7 +10,6 @@ import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; import static org.zalando.riptide.Bindings.on; -import static org.zalando.riptide.Navigators.contentType; import static org.zalando.riptide.Navigators.series; import static org.zalando.riptide.PassRoute.pass; @@ -36,7 +35,8 @@ public void shouldExpandWithoutVariables() { unit.get("/123") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -45,7 +45,8 @@ public void shouldExpandOne() { unit.get("/{id}", 123) .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -54,7 +55,8 @@ public void shouldExpandTwo() { unit.get("/{parent}/{child}", 123, "456") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -66,7 +68,8 @@ public void shouldExpandInlinedQueryParams() { unit.get("https://example.com/posts/{id}?filter={filter}", postId, filter) .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -75,7 +78,8 @@ public void shouldEncodePath() { unit.get("https://ru.wikipedia.org/wiki/{article-name}", "Отбойное_течение") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -84,7 +88,8 @@ public void shouldEncodeInlinedQueryParams() { unit.get("https://ru.wiktionary.org/w/index.php?title={title}&bookcmd=book_creator&referer={referer}", "Служебная:Коллекция_книг", "Заглавная страница") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -99,7 +104,8 @@ public void shouldAppendQueryParams() { "bar", "null" )) .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -111,7 +117,8 @@ public void shouldEncodeAppendedQueryParams() { .queryParam("bookcmd", "book_creator") .queryParam("referer", "Заглавная страница") .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -121,7 +128,8 @@ public void shouldExpandOnGetWithHeaders() { unit.get("/123") .headers(new HttpHeaders()) .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } @Test @@ -130,7 +138,8 @@ public void shouldExpandOnGetWithBody() { unit.get("/123") .body("deadbody") - .dispatch(contentType()); + .call(pass()) + .join(); } @Test @@ -140,7 +149,8 @@ public void shouldExpandOnGetWithHeadersAndBody() { unit.get("/123") .headers(new HttpHeaders()) .body("deadbody") - .dispatch(contentType()); + .call(pass()) + .join(); } private void expectRequestTo(final String url) { diff --git a/riptide-core/src/test/java/org/zalando/riptide/RouteTest.java b/riptide-core/src/test/java/org/zalando/riptide/RouteTest.java index fdd1a107c..755307954 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/RouteTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/RouteTest.java @@ -102,7 +102,8 @@ public void shouldAllowToCaptureBody() throws Exception { final AtomicReference body = new AtomicReference<>(); unit.get(url).call(((response, reader) -> - body.set(response.getBody()))); + body.set(response.getBody()))) + .join(); // read response outside of consumer/callback // to make sure the stream is still available diff --git a/riptide-core/src/test/java/org/zalando/riptide/StackTraceTest.java b/riptide-core/src/test/java/org/zalando/riptide/StackTraceTest.java index f79902c7a..463b5cca2 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/StackTraceTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/StackTraceTest.java @@ -6,6 +6,7 @@ import org.junit.Rule; import org.junit.Test; import org.springframework.http.client.ClientHttpResponse; +import org.springframework.http.client.SimpleClientHttpRequestFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -71,7 +72,8 @@ public void shouldNotKeepOriginalStackTrace() { private Http.ConfigurationStage configureRest() { return Http.builder() - .simpleRequestFactory(executor) + .executor(executor) + .requestFactory(new SimpleClientHttpRequestFactory()) .baseUrl(driver.getBaseUrl()); } diff --git a/riptide-core/src/test/java/org/zalando/riptide/UriTemplateArgTest.java b/riptide-core/src/test/java/org/zalando/riptide/UriTemplateArgTest.java index e8e0a5358..5b9c22c74 100644 --- a/riptide-core/src/test/java/org/zalando/riptide/UriTemplateArgTest.java +++ b/riptide-core/src/test/java/org/zalando/riptide/UriTemplateArgTest.java @@ -87,7 +87,8 @@ public void shouldExpand() { this.unit.get(uriTemplate, uriVariables) .dispatch(series(), - on(SUCCESSFUL).call(pass())); + on(SUCCESSFUL).call(pass())) + .join(); } } diff --git a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginCircuitBreakerTest.java b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginCircuitBreakerTest.java index 92694752f..308fcf475 100644 --- a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginCircuitBreakerTest.java +++ b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginCircuitBreakerTest.java @@ -13,10 +13,9 @@ import org.junit.Test; import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; import org.zalando.riptide.OriginalStackTracePlugin; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.net.SocketTimeoutException; @@ -46,8 +45,8 @@ public class FailsafePluginCircuitBreakerTest { private final RetryListener listeners = mock(RetryListener.class); private final Http unit = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, - new ConcurrentTaskExecutor(newSingleThreadExecutor()))) + .executor(newSingleThreadExecutor()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new FailsafePlugin(newSingleThreadScheduledExecutor()) diff --git a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginNoCircuitBreakerTest.java b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginNoCircuitBreakerTest.java index 370c8fb29..c7bcd4f54 100644 --- a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginNoCircuitBreakerTest.java +++ b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginNoCircuitBreakerTest.java @@ -12,10 +12,9 @@ import org.junit.Test; import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; import org.zalando.riptide.OriginalStackTracePlugin; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.net.SocketTimeoutException; @@ -44,8 +43,8 @@ public class FailsafePluginNoCircuitBreakerTest { private final RetryListener listeners = mock(RetryListener.class); private final Http unit = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, - new ConcurrentTaskExecutor(newSingleThreadExecutor()))) + .executor(newSingleThreadExecutor()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new FailsafePlugin(newSingleThreadScheduledExecutor()) diff --git a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginRetriesTest.java b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginRetriesTest.java index fc79338bb..efd555a05 100644 --- a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginRetriesTest.java +++ b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/FailsafePluginRetriesTest.java @@ -15,9 +15,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import javax.annotation.Nullable; import java.io.IOException; @@ -62,8 +61,8 @@ public class FailsafePluginRetriesTest { private final RetryListener listeners = mock(RetryListener.class); private final Http unit = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, - new ConcurrentTaskExecutor(newCachedThreadPool()))) + .executor(newCachedThreadPool()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new FailsafePlugin(new ScheduledThreadPoolExecutor(2)) @@ -127,8 +126,8 @@ public void shouldNotRetryNonIdempotentMethod() throws Throwable { @Test public void shouldRetryCustomDetectedIdempotentRequest() { final Http unit = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, - new ConcurrentTaskExecutor(newCachedThreadPool()))) + .executor(newCachedThreadPool()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new FailsafePlugin(newSingleThreadScheduledExecutor()) diff --git a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/MockSetup.java b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/MockSetup.java index ba237c4bc..08ded0c57 100644 --- a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/MockSetup.java +++ b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/MockSetup.java @@ -5,12 +5,13 @@ import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http; import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; import static com.google.common.base.MoreObjects.firstNonNull; @@ -29,7 +30,7 @@ private static MappingJackson2HttpMessageConverter createJsonConverter() { private final String baseUrl; private final Iterable> converters; - private final AsyncRestTemplate template; + private final RestTemplate template; private final MockRestServiceServer server; public MockSetup() { @@ -43,7 +44,7 @@ public MockSetup(final String baseUrl) { public MockSetup(@Nullable final String baseUrl, @Nullable final Iterable> converters) { this.baseUrl = baseUrl; this.converters = converters; - this.template = new AsyncRestTemplate(); + this.template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); } @@ -53,7 +54,8 @@ public MockRestServiceServer getServer() { public Http.ConfigurationStage getRestBuilder() { return Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converters(firstNonNull(converters, DEFAULT_CONVERTERS)) .baseUrl(baseUrl); } diff --git a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/RetryAfterDelayFunctionTest.java b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/RetryAfterDelayFunctionTest.java index b695c1f56..119553a2e 100644 --- a/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/RetryAfterDelayFunctionTest.java +++ b/riptide-failsafe/src/test/java/org/zalando/riptide/failsafe/RetryAfterDelayFunctionTest.java @@ -14,9 +14,8 @@ import org.junit.Test; import org.springframework.http.HttpStatus; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.time.Clock; @@ -53,8 +52,8 @@ public class RetryAfterDelayFunctionTest { private final Clock clock = Clock.fixed(parse("2018-04-11T22:34:27Z"), UTC); private final Http unit = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, - new ConcurrentTaskExecutor(newSingleThreadExecutor()))) + .executor(newSingleThreadExecutor()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new FailsafePlugin(newSingleThreadScheduledExecutor()) diff --git a/riptide-faults/src/test/java/org/zalando/riptide/faults/MockSetup.java b/riptide-faults/src/test/java/org/zalando/riptide/faults/MockSetup.java index 8dfbd8019..1d73f28d1 100644 --- a/riptide-faults/src/test/java/org/zalando/riptide/faults/MockSetup.java +++ b/riptide-faults/src/test/java/org/zalando/riptide/faults/MockSetup.java @@ -5,12 +5,13 @@ import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http; import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; import static com.google.common.base.MoreObjects.firstNonNull; @@ -29,7 +30,7 @@ private static MappingJackson2HttpMessageConverter createJsonConverter() { private final String baseUrl; private final Iterable> converters; - private final AsyncRestTemplate template; + private final RestTemplate template; private final MockRestServiceServer server; public MockSetup() { @@ -43,7 +44,7 @@ public MockSetup(final String baseUrl) { public MockSetup(@Nullable final String baseUrl, @Nullable final Iterable> converters) { this.baseUrl = baseUrl; this.converters = converters; - this.template = new AsyncRestTemplate(); + this.template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); } @@ -53,7 +54,8 @@ public MockRestServiceServer getServer() { public Http.ConfigurationStage getRestBuilder() { return Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converters(firstNonNull(converters, DEFAULT_CONVERTERS)) .baseUrl(baseUrl); } diff --git a/riptide-faults/src/test/java/org/zalando/riptide/faults/TransientFaultPluginTest.java b/riptide-faults/src/test/java/org/zalando/riptide/faults/TransientFaultPluginTest.java index d94c540ab..b5a2b7fbc 100644 --- a/riptide-faults/src/test/java/org/zalando/riptide/faults/TransientFaultPluginTest.java +++ b/riptide-faults/src/test/java/org/zalando/riptide/faults/TransientFaultPluginTest.java @@ -13,7 +13,7 @@ import org.zalando.riptide.Http; import org.zalando.riptide.Plugin; import org.zalando.riptide.RequestExecution; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.net.MalformedURLException; @@ -52,7 +52,7 @@ public final class TransientFaultPluginTest { .build(); private final ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); - private final RestAsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); + private final ApacheClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client); @After public void tearDown() throws IOException { @@ -136,6 +136,7 @@ public void shouldClassifyAsPermanent() { private Http newUnit(final Plugin... plugins) { return Http.builder() + .executor(executor) .requestFactory(factory) .baseUrl(driver.getBaseUrl()) .plugins(Arrays.asList(plugins)) diff --git a/riptide-httpclient/README.md b/riptide-httpclient/README.md index b42401f1a..f2dc0dc8f 100644 --- a/riptide-httpclient/README.md +++ b/riptide-httpclient/README.md @@ -16,14 +16,13 @@ ```java final Http http = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, executor)) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .build(); ``` ## Features - independent from *Riptide: Core*, i.e. it can be used with a plain [`RestTemplate`](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/client/RestTemplate.html) -- allows to use a plain [`HttpClient`](https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/client/HttpClient.html) [asynchronously](http://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/AsyncClientHttpRequestFactory.html) - fixes several issues with Spring's [`HttpComponentsClientHttpRequestFactory`](http://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/client/HttpComponentsClientHttpRequestFactory.html): - preserve the underlying client's request config - releasing connections back to the pool after closing streams @@ -51,10 +50,8 @@ CloseableHttpClient client = HttpClientBuilder.create() // TODO configure client here .build(); -AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(); - final Http http = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, executor)) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .build(); ``` diff --git a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequest.java b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequest.java new file mode 100644 index 000000000..79d01a828 --- /dev/null +++ b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequest.java @@ -0,0 +1,50 @@ +package org.zalando.riptide.httpclient; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.ClientHttpRequest; +import org.springframework.http.client.ClientHttpResponse; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; + +final class ApacheClientHttpRequest implements ClientHttpRequest { + + private final ClientHttpRequest request; + + ApacheClientHttpRequest(final ClientHttpRequest request) { + this.request = request; + } + + @Override + public ClientHttpResponse execute() throws IOException { + return new ApacheClientHttpResponse(request.execute()); + } + + @Override + public OutputStream getBody() throws IOException { + return request.getBody(); + } + + @Override + public HttpMethod getMethod() { + return request.getMethod(); + } + + @Override + public String getMethodValue() { + return request.getMethodValue(); + } + + @Override + public URI getURI() { + return request.getURI(); + } + + @Override + public HttpHeaders getHeaders() { + return request.getHeaders(); + } + +} diff --git a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactory.java b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactory.java similarity index 71% rename from riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactory.java rename to riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactory.java index 90bab1907..9e18a4955 100644 --- a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactory.java +++ b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactory.java @@ -20,12 +20,11 @@ import static org.apiguardian.api.API.Status.STABLE; @API(status = STABLE) -public final class RestAsyncClientHttpRequestFactory implements ClientHttpRequestFactory, AsyncClientHttpRequestFactory { +public final class ApacheClientHttpRequestFactory implements ClientHttpRequestFactory { private final ClientHttpRequestFactory factory; - private final AsyncListenableTaskExecutor executor; - public RestAsyncClientHttpRequestFactory(final HttpClient client, final AsyncListenableTaskExecutor executor) { + public ApacheClientHttpRequestFactory(final HttpClient client) { final RequestConfig config = Configurable.class.cast(client).getConfig(); this.factory = new HttpComponentsClientHttpRequestFactory(client) { @@ -35,17 +34,11 @@ protected void postProcessHttpRequest(final HttpUriRequest request) { HttpRequestBase.class.cast(request).setConfig(config); } }; - this.executor = executor; } @Override public ClientHttpRequest createRequest(final URI uri, final HttpMethod method) throws IOException { - return factory.createRequest(uri, method); - } - - @Override - public AsyncClientHttpRequest createAsyncRequest(final URI uri, final HttpMethod method) throws IOException { - return new RestAsyncClientHttpRequest(factory.createRequest(uri, method), executor); + return new ApacheClientHttpRequest(factory.createRequest(uri, method)); } } diff --git a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponse.java b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpResponse.java similarity index 91% rename from riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponse.java rename to riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpResponse.java index 3adcb772e..5648e1b0b 100644 --- a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponse.java +++ b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/ApacheClientHttpResponse.java @@ -9,11 +9,11 @@ import java.io.IOException; import java.io.InputStream; -final class RestAsyncClientHttpResponse implements ClientHttpResponse { +final class ApacheClientHttpResponse implements ClientHttpResponse { private final ClientHttpResponse response; - RestAsyncClientHttpResponse(final ClientHttpResponse response) { + ApacheClientHttpResponse(final ClientHttpResponse response) { this.response = response; } diff --git a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequest.java b/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequest.java deleted file mode 100644 index e54f9daf1..000000000 --- a/riptide-httpclient/src/main/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequest.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.zalando.riptide.httpclient; - -import org.springframework.core.task.AsyncListenableTaskExecutor; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.client.AsyncClientHttpRequest; -import org.springframework.http.client.ClientHttpRequest; -import org.springframework.http.client.ClientHttpResponse; -import org.springframework.util.concurrent.ListenableFuture; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.Objects; - -final class RestAsyncClientHttpRequest implements AsyncClientHttpRequest { - - private final ClientHttpRequest request; - private final AsyncListenableTaskExecutor executor; - - RestAsyncClientHttpRequest(final ClientHttpRequest request, final AsyncListenableTaskExecutor executor) { - this.request = request; - this.executor = executor; - } - - @Override - public ListenableFuture executeAsync() throws IOException { - return executor.submitListenable(this::execute); - } - - private RestAsyncClientHttpResponse execute() throws IOException { - return new RestAsyncClientHttpResponse(request.execute()); - } - - @Override - public OutputStream getBody() throws IOException { - return request.getBody(); - } - - @Override - public HttpMethod getMethod() { - return request.getMethod(); - } - - // TODO @Override as soon as we no longer support Spring 4 - @Nonnull - public String getMethodValue() { - return Objects.requireNonNull(request.getMethod(), "method").toString(); - } - - @Override - public String getMethodValue() { - return request.getMethodValue(); - } - - @Override - public URI getURI() { - return request.getURI(); - } - - @Override - public HttpHeaders getHeaders() { - return request.getHeaders(); - } - -} diff --git a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactoryTest.java b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactoryTest.java similarity index 93% rename from riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactoryTest.java rename to riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactoryTest.java index 2bd417c58..aa4c1ba95 100644 --- a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestFactoryTest.java +++ b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestFactoryTest.java @@ -15,7 +15,7 @@ import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.HttpEntity; import org.springframework.http.HttpStatus; -import org.springframework.http.client.AsyncClientHttpRequest; +import org.springframework.http.client.ClientHttpRequest; import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; @@ -27,7 +27,7 @@ import java.io.InputStream; import java.net.URI; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Objects; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NON_PRIVATE; import static com.github.restdriver.clientdriver.RestClientDriver.giveResponseAsBytes; @@ -52,7 +52,7 @@ import static org.zalando.riptide.Navigators.series; import static org.zalando.riptide.Types.listOf; -public final class RestAsyncClientHttpRequestFactoryTest { +public final class ApacheClientHttpRequestFactoryTest { @Rule public final ClientDriverRule driver = new ClientDriverRule(); @@ -68,9 +68,10 @@ public String getLogin() { private final CloseableHttpClient client = HttpClientBuilder.create().build(); private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(); - private final RestAsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); + private final ApacheClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client); private final Http http = Http.builder() + .executor(executor) .requestFactory(factory) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) @@ -104,6 +105,8 @@ public void shouldReadContributors() throws IOException { HttpEntity.EMPTY, new ParameterizedTypeReference>() { }).getBody(); + Objects.requireNonNull(users); + final List names = users.stream() .map(User::getLogin) .collect(toList()); @@ -131,12 +134,12 @@ public void shouldReadContributorsAsync() throws IOException { } @Test - public void shouldReadContributorsManually() throws IOException, ExecutionException, InterruptedException { + public void shouldReadContributorsManually() throws IOException { driver.addExpectation(onRequestTo("/repos/zalando/riptide/contributors").withMethod(Method.POST), giveResponseAsBytes(getResource("contributors.json").openStream(), "application/json")); final URI uri = URI.create(driver.getBaseUrl()).resolve("/repos/zalando/riptide/contributors"); - final AsyncClientHttpRequest request = factory.createAsyncRequest(uri, POST); + final ClientHttpRequest request = factory.createRequest(uri, POST); request.getHeaders().setAccept(singletonList(APPLICATION_JSON)); request.getBody().write("{}".getBytes(UTF_8)); @@ -146,7 +149,7 @@ public void shouldReadContributorsManually() throws IOException, ExecutionExcept assertThat(request.getURI(), hasToString(endsWith("/repos/zalando/riptide/contributors"))); assertThat(request.getHeaders().getAccept(), hasItem(APPLICATION_JSON)); - final ClientHttpResponse response = request.executeAsync().get(); + final ClientHttpResponse response = request.execute(); assertThat(response.getStatusCode(), is(HttpStatus.OK)); assertThat(response.getRawStatusCode(), is(200)); diff --git a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestTest.java b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestTest.java similarity index 67% rename from riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestTest.java rename to riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestTest.java index d6d267c23..2c0d4577c 100644 --- a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpRequestTest.java +++ b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpRequestTest.java @@ -1,7 +1,6 @@ package org.zalando.riptide.httpclient; import org.junit.Test; -import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.HttpMethod; import org.springframework.http.client.ClientHttpRequest; @@ -10,15 +9,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RestAsyncClientHttpRequestTest { +public class ApacheClientHttpRequestTest { @Test public void getMethodValue() { final ClientHttpRequest request = mock(ClientHttpRequest.class); - final AsyncListenableTaskExecutor executor = mock(AsyncListenableTaskExecutor.class); - final RestAsyncClientHttpRequest unit = new RestAsyncClientHttpRequest(request, executor); - + final ApacheClientHttpRequest unit = new ApacheClientHttpRequest(request); when(request.getMethod()).thenReturn(HttpMethod.POST); + when(request.getMethodValue()).thenReturn("POST"); assertThat(unit.getMethod(), is(HttpMethod.POST)); assertThat(unit.getMethodValue(), is("POST")); diff --git a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponseBodyTest.java b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpResponseBodyTest.java similarity index 82% rename from riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponseBodyTest.java rename to riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpResponseBodyTest.java index 2ef6af3c9..5c2ca8bcc 100644 --- a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/RestAsyncClientHttpResponseBodyTest.java +++ b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/ApacheClientHttpResponseBodyTest.java @@ -11,7 +11,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RestAsyncClientHttpResponseBodyTest { +public class ApacheClientHttpResponseBodyTest { @Test public void shouldCallCloseOnNormalStreams() throws IOException { @@ -19,7 +19,7 @@ public void shouldCallCloseOnNormalStreams() throws IOException { ClientHttpResponse response = mock(ClientHttpResponse.class); when(response.getBody()).thenReturn(stream); - try (RestAsyncClientHttpResponse unit = new RestAsyncClientHttpResponse(response)) { + try (ApacheClientHttpResponse unit = new ApacheClientHttpResponse(response)) { unit.getBody().close(); } @@ -32,7 +32,7 @@ public void shouldCallAbortAndCloseOnConnectionReleaseTrigger() throws IOExcepti ClientHttpResponse response = mock(ClientHttpResponse.class); when(response.getBody()).thenReturn(stream); - try (RestAsyncClientHttpResponse unit = new RestAsyncClientHttpResponse(response)) { + try (ApacheClientHttpResponse unit = new ApacheClientHttpResponse(response)) { unit.getBody().close(); } diff --git a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/GzipHttpRequestInterceptorTest.java b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/GzipHttpRequestInterceptorTest.java index 86e2ecf07..84937aeeb 100644 --- a/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/GzipHttpRequestInterceptorTest.java +++ b/riptide-httpclient/src/test/java/org/zalando/riptide/httpclient/GzipHttpRequestInterceptorTest.java @@ -12,12 +12,11 @@ import org.junit.After; import org.junit.Rule; import org.junit.Test; -import org.springframework.core.task.AsyncListenableTaskExecutor; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; import javax.annotation.Nullable; import java.io.IOException; +import java.util.concurrent.Executors; import static com.github.restdriver.clientdriver.ClientDriverRequest.Method.POST; import static com.github.restdriver.clientdriver.RestClientDriver.giveEmptyResponse; @@ -52,11 +51,9 @@ public void process(final HttpRequest request, final HttpContext context) throws .addInterceptorLast(new NonTextHttpRequestInterceptor(new GzipHttpRequestInterceptor())) .build(); - private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(); - private final RestAsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); - private final Http http = Http.builder() - .requestFactory(factory) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .build(); diff --git a/riptide-metrics/pom.xml b/riptide-metrics/pom.xml index 035929b18..32199e056 100644 --- a/riptide-metrics/pom.xml +++ b/riptide-metrics/pom.xml @@ -29,11 +29,6 @@ micrometer-core ${micrometer.version} - - io.micrometer - micrometer-core - 1.0.0-rc.5 - org.springframework spring-test diff --git a/riptide-metrics/src/test/java/org/zalando/riptide/metrics/MetricsPluginTest.java b/riptide-metrics/src/test/java/org/zalando/riptide/metrics/MetricsPluginTest.java index 2e2b6cc70..ad3056031 100644 --- a/riptide-metrics/src/test/java/org/zalando/riptide/metrics/MetricsPluginTest.java +++ b/riptide-metrics/src/test/java/org/zalando/riptide/metrics/MetricsPluginTest.java @@ -13,16 +13,15 @@ import org.junit.After; import org.junit.Rule; import org.junit.Test; -import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import javax.annotation.Nullable; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; import static com.github.restdriver.clientdriver.ClientDriverRequest.Method.POST; import static com.github.restdriver.clientdriver.RestClientDriver.giveEmptyResponse; @@ -49,13 +48,12 @@ public class MetricsPluginTest { .setSocketTimeout(500) .build()) .build(); - private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(); - private final RestAsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); private final MeterRegistry registry = new SimpleMeterRegistry(); private final Http unit = Http.builder() - .requestFactory(factory) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) .plugin(new MetricsPlugin(registry) diff --git a/riptide-problem/src/test/java/org/zalando/riptide/problem/MockSetup.java b/riptide-problem/src/test/java/org/zalando/riptide/problem/MockSetup.java index d90d7ef1e..534c99fc0 100644 --- a/riptide-problem/src/test/java/org/zalando/riptide/problem/MockSetup.java +++ b/riptide-problem/src/test/java/org/zalando/riptide/problem/MockSetup.java @@ -5,12 +5,13 @@ import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http; import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; import static com.google.common.base.MoreObjects.firstNonNull; @@ -29,7 +30,7 @@ private static MappingJackson2HttpMessageConverter createJsonConverter() { private final String baseUrl; private final Iterable> converters; - private final AsyncRestTemplate template; + private final RestTemplate template; private final MockRestServiceServer server; public MockSetup() { @@ -43,7 +44,7 @@ public MockSetup(final String baseUrl) { public MockSetup(@Nullable final String baseUrl, @Nullable final Iterable> converters) { this.baseUrl = baseUrl; this.converters = converters; - this.template = new AsyncRestTemplate(); + this.template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); } @@ -53,7 +54,8 @@ public MockRestServiceServer getServer() { public Http.ConfigurationStage getRestBuilder() { return Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converters(firstNonNull(converters, DEFAULT_CONVERTERS)) .baseUrl(baseUrl); } diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBean.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBean.java deleted file mode 100644 index 049c37e62..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBean.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import org.springframework.beans.factory.config.AbstractFactoryBean; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client.OAuth; -import org.zalando.riptide.autoconfigure.RiptideProperties.GlobalOAuth; -import org.zalando.stups.tokens.AccessTokens; -import org.zalando.stups.tokens.AccessTokensBuilder; -import org.zalando.stups.tokens.JsonFileBackedClientCredentialsProvider; -import org.zalando.stups.tokens.JsonFileBackedUserCredentialsProvider; -import org.zalando.stups.tokens.Tokens; - -import javax.annotation.Nullable; -import java.net.URI; -import java.nio.file.Path; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkArgument; - -final class AccessTokensFactoryBean extends AbstractFactoryBean { - - private AccessTokensBuilder builder; - - AccessTokensFactoryBean(final RiptideProperties properties) { - final GlobalOAuth oAuth = properties.getOauth(); - - final URI accessTokenUrl = getAccessTokenUrl(oAuth); - @Nullable final Path directory = oAuth.getCredentialsDirectory(); - final TimeSpan connectTimeout = oAuth.getConnectTimeout(); - final TimeSpan socketTimeout = oAuth.getSocketTimeout(); - - this.builder = Tokens.createAccessTokensWithUri(accessTokenUrl) - .usingClientCredentialsProvider(getClientCredentialsProvider(directory)) - .usingUserCredentialsProvider(getUserCredentialsProvider(directory)) - .schedulingPeriod((int) oAuth.getSchedulingPeriod().getAmount()) - .schedulingTimeUnit(oAuth.getSchedulingPeriod().getUnit()) - .connectTimeout((int) connectTimeout.to(TimeUnit.MILLISECONDS)) - .socketTimeout((int) socketTimeout.to(TimeUnit.MILLISECONDS)); - - properties.getClients().forEach((id, client) -> { - @Nullable final OAuth clientOAuth = client.getOauth(); - - if (clientOAuth == null) { - return; - } - - builder.manageToken(id) - .addScopesTypeSafe(clientOAuth.getScopes()) - .done(); - }); - } - - private JsonFileBackedClientCredentialsProvider getClientCredentialsProvider(@Nullable final Path directory) { - return directory == null ? - new JsonFileBackedClientCredentialsProvider() : - new JsonFileBackedClientCredentialsProvider(directory.resolve("client.json").toFile()); - } - - private JsonFileBackedUserCredentialsProvider getUserCredentialsProvider(@Nullable final Path directory) { - return directory == null ? - new JsonFileBackedUserCredentialsProvider() : - new JsonFileBackedUserCredentialsProvider(directory.resolve("user.json").toFile()); - } - - private URI getAccessTokenUrl(final GlobalOAuth oauth) { - @Nullable final URI accessTokenUrl = oauth.getAccessTokenUrl(); - - checkArgument(accessTokenUrl != null, "" + - "Neither 'riptide.oauth.access-token-url' nor 'ACCESS_TOKEN_URL' was set, " + - "but at least one client requires OAuth"); - - return accessTokenUrl; - } - - @Override - protected AccessTokens createInstance() { - return builder.start(); - } - - @Override - public Class getObjectType() { - return AccessTokens.class; - } - - @Override - protected void destroyInstance(final AccessTokens tokens) { - tokens.stop(); - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/CircuitBreakerFactoryBean.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/CircuitBreakerFactoryBean.java deleted file mode 100644 index 45eb86ab9..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/CircuitBreakerFactoryBean.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import net.jodah.failsafe.CircuitBreaker; -import org.springframework.beans.factory.FactoryBean; -import org.zalando.riptide.failsafe.CircuitBreakerListener; - -import java.util.Optional; - -final class CircuitBreakerFactoryBean implements FactoryBean { - - private final CircuitBreaker circuitBreaker = new CircuitBreaker(); - - public void setTimeout(final TimeSpan timeout) { - timeout.applyTo(circuitBreaker::withTimeout); - } - - public void setConfiguration(final RiptideProperties.CircuitBreaker config) { - Optional.ofNullable(config.getFailureThreshold()) - .ifPresent(threshold -> threshold.applyTo(circuitBreaker::withFailureThreshold)); - - Optional.ofNullable(config.getDelay()) - .ifPresent(delay -> delay.applyTo(circuitBreaker::withDelay)); - - Optional.ofNullable(config.getSuccessThreshold()) - .ifPresent(threshold -> threshold.applyTo(circuitBreaker::withSuccessThreshold)); - } - - public void setListener(final CircuitBreakerListener listener) { - circuitBreaker - .onOpen(listener::onOpen) - .onHalfOpen(listener::onHalfOpen) - .onClose(listener::onClose); - } - - @Override - public CircuitBreaker getObject() { - return circuitBreaker; - } - - @Override - public Class getObjectType() { - return CircuitBreaker.class; - } - - @Override - public boolean isSingleton() { - return true; - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactory.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactory.java new file mode 100644 index 000000000..c51712db0 --- /dev/null +++ b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactory.java @@ -0,0 +1,70 @@ +package org.zalando.riptide.autoconfigure; + +import lombok.AllArgsConstructor; +import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.AsyncClientHttpRequest; +import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequest; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpResponse; +import org.springframework.lang.Nullable; +import org.springframework.util.concurrent.ListenableFuture; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; + +// TODO better name! +// TODO move somewhere else? +@AllArgsConstructor +final class ConcurrentClientHttpRequestFactory implements AsyncClientHttpRequestFactory { + + private final ClientHttpRequestFactory requestFactory; + private final AsyncListenableTaskExecutor executor; + + @Override + public AsyncClientHttpRequest createAsyncRequest(final URI uri, final HttpMethod httpMethod) throws IOException { + return new ConcurrentClientHttpRequest(requestFactory.createRequest(uri, httpMethod)); + } + + @AllArgsConstructor + private final class ConcurrentClientHttpRequest implements AsyncClientHttpRequest { + + private final ClientHttpRequest request; + + @Override + @Nullable + public HttpMethod getMethod() { + return request.getMethod(); + } + + @Override + public String getMethodValue() { + return request.getMethodValue(); + } + + @Override + public URI getURI() { + return request.getURI(); + } + + @Override + public HttpHeaders getHeaders() { + return request.getHeaders(); + } + + @Override + public OutputStream getBody() throws IOException { + return request.getBody(); + } + + @Override + public ListenableFuture executeAsync() { + return executor.submitListenable(request::execute); + } + + } + +} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java index 6fdb9edbc..f0b8cbe66 100644 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java +++ b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java @@ -14,6 +14,7 @@ import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; @@ -32,14 +33,9 @@ import org.zalando.riptide.failsafe.RetryListener; import org.zalando.riptide.faults.FaultClassifier; import org.zalando.riptide.faults.TransientFaultPlugin; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import org.zalando.riptide.httpclient.GzipHttpRequestInterceptor; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; import org.zalando.riptide.metrics.MetricsPlugin; -<<<<<<< HEAD:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java -======= -import org.zalando.riptide.autoconfigure.RiptideProperties.Client; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client.Keystore; ->>>>>>> 4236f76... Reorganized starter:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java import org.zalando.riptide.stream.Streams; import org.zalando.riptide.timeout.TimeoutPlugin; import org.zalando.stups.oauth2.httpcomponents.AccessTokensRequestInterceptor; @@ -62,11 +58,8 @@ import static org.springframework.beans.factory.support.BeanDefinitionBuilder.genericBeanDefinition; import static org.zalando.riptide.autoconfigure.Dependencies.ifPresent; import static org.zalando.riptide.autoconfigure.Registry.generateBeanName; -<<<<<<< HEAD:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java import static org.zalando.riptide.autoconfigure.Registry.list; import static org.zalando.riptide.autoconfigure.Registry.ref; -======= ->>>>>>> 4236f76... Reorganized starter:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java @Slf4j @AllArgsConstructor @@ -80,29 +73,19 @@ public void register() { properties.getClients().forEach((id, client) -> { final String factoryId = registerAsyncClientHttpRequestFactory(id, client); final BeanDefinition converters = registerHttpMessageConverters(id); - final String baseUrl = client.getBaseUrl(); final List plugins = registerPlugins(id, client); registerHttp(id, client, factoryId, converters, plugins); - registerTemplate(id, RestTemplate.class, factoryId, baseUrl, converters, plugins); - registerTemplate(id, AsyncRestTemplate.class, factoryId, baseUrl, converters, plugins); + registerRestTemplate(id, factoryId, client, converters, plugins); + registerAsyncRestTemplate(id, factoryId, client, converters, plugins); }); } private String registerAsyncClientHttpRequestFactory(final String id, final Client client) { - return registry.registerIfAbsent(id, AsyncClientHttpRequestFactory.class, () -> { + return registry.registerIfAbsent(id, ClientHttpRequestFactory.class, () -> { log.debug("Client [{}]: Registering RestAsyncClientHttpRequestFactory", id); - - final BeanDefinitionBuilder factory = - genericBeanDefinition(RestAsyncClientHttpRequestFactory.class); - - factory.addConstructorArgReference(registerHttpClient(id, client)); - factory.addConstructorArgValue(genericBeanDefinition(ConcurrentTaskExecutor.class) - // we allow users to use their own ExecutorService, but they don't have to configure tracing - .addConstructorArgValue(registerExecutor(id, client)) - .getBeanDefinition()); - - return factory; + return genericBeanDefinition(ApacheClientHttpRequestFactory.class) + .addConstructorArgReference(registerHttpClient(id, client)); }); } @@ -131,7 +114,7 @@ private BeanDefinition registerHttpMessageConverters(final String id) { // we want exampleHttpMessageConverters, rather than exampleClientHttpMessageConverters final String convertersId = registry.registerIfAbsent(id, HttpMessageConverters.class, () -> { - final List list = Registry.list(); + final List list = list(); log.debug("Client [{}]: Registering StringHttpMessageConverter", id); list.add(genericBeanDefinition(StringHttpMessageConverter.class) @@ -170,9 +153,10 @@ private void registerHttp(final String id, final Client client, final String fac return genericBeanDefinition(HttpFactory.class) .setFactoryMethod("create") + .addConstructorArgValue(registerExecutor(id, client)) + .addConstructorArgReference(factoryId) .addConstructorArgValue(client.getBaseUrl()) .addConstructorArgValue(client.getUrlResolution()) - .addConstructorArgReference(factoryId) .addConstructorArgValue(converters) .addConstructorArgValue(plugins.stream() .map(Registry::ref) @@ -180,29 +164,53 @@ private void registerHttp(final String id, final Client client, final String fac }); } - private void registerTemplate(final String id, final Class type, final String factoryId, - @Nullable final String baseUrl, final BeanDefinition converters, final List plugins) { - registry.registerIfAbsent(id, type, () -> { - log.debug("Client [{}]: Registering AsyncRestTemplate", id); + private void registerRestTemplate(final String id, final String factoryId, final Client client, + final BeanDefinition converters, final List plugins) { + registry.registerIfAbsent(id, RestTemplate.class, () -> { + log.debug("Client [{}]: Registering RestTemplate", id); + + final BeanDefinitionBuilder template = genericBeanDefinition(RestTemplate.class); + template.addConstructorArgReference(factoryId); + configureTemplate(template, client.getBaseUrl(), converters, plugins); + + return template; + }); + } - final DefaultUriBuilderFactory factory = baseUrl == null ? - new DefaultUriBuilderFactory() : - new DefaultUriBuilderFactory(baseUrl); + private void registerAsyncRestTemplate(final String id, final String factoryId, final Client client, + final BeanDefinition converters, final List plugins) { + registry.registerIfAbsent(id, AsyncRestTemplate.class, () -> { + log.debug("Client [{}]: Registering AsyncRestTemplate", id); - final BeanDefinitionBuilder template = genericBeanDefinition(type); + final BeanDefinitionBuilder template = genericBeanDefinition(AsyncRestTemplate.class); + template.addConstructorArgReference(registry.registerIfAbsent(id, AsyncClientHttpRequestFactory.class, () -> + genericBeanDefinition(ConcurrentClientHttpRequestFactory.class) + .addConstructorArgReference(factoryId) + .addConstructorArgValue(genericBeanDefinition(ConcurrentTaskExecutor.class) + .addConstructorArgValue(registerExecutor(id, client)) + .getBeanDefinition()))); template.addConstructorArgReference(factoryId); - template.addPropertyValue("uriTemplateHandler", factory); - template.addPropertyValue("messageConverters", converters); - template.addPropertyValue("interceptors", plugins.stream() - .map(plugin -> genericBeanDefinition(PluginInterceptor.class) - .addConstructorArgReference(plugin) - .getBeanDefinition()) - .collect(toCollection(Registry::list))); + configureTemplate(template, client.getBaseUrl(), converters, plugins); return template; }); } + private void configureTemplate(final BeanDefinitionBuilder template, @Nullable final String baseUrl, + final BeanDefinition converters, + final List plugins) { + final DefaultUriBuilderFactory factory = baseUrl == null ? + new DefaultUriBuilderFactory() : + new DefaultUriBuilderFactory(baseUrl); + template.addPropertyValue("uriTemplateHandler", factory); + template.addPropertyValue("messageConverters", converters); + template.addPropertyValue("interceptors", plugins.stream() + .map(plugin -> genericBeanDefinition(PluginInterceptor.class) + .addConstructorArgReference(plugin) + .getBeanDefinition()) + .collect(toCollection(Registry::list))); + } + private String findObjectMapper(final String id) { final String beanName = generateBeanName(id, ObjectMapper.class); return registry.isRegistered(beanName) ? beanName : "jacksonObjectMapper"; @@ -219,7 +227,7 @@ private String registerAccessTokens(final String id, final RiptideProperties set } private List registerPlugins(final String id, final Client client) { - final List plugins = Registry.list(); + final List plugins = list(); if (client.getRecordMetrics()) { log.debug("Client [{}]: Registering [{}]", id, MetricsPlugin.class.getSimpleName()); @@ -289,7 +297,7 @@ private String findFaultClassifier(final String id) { return generateBeanName(FaultClassifier.class); } else { return registry.registerIfAbsent(FaultClassifier.class, () -> { - final List> predicates = Registry.list(); + final List> predicates = list(); predicates.addAll(FaultClassifier.defaults()); predicates.add(ConnectionClosedException.class::isInstance); @@ -390,7 +398,7 @@ private BeanMetadataElement trace(final String executor) { .addConstructorArgReference("tracer") .getBeanDefinition(); } else { - return Registry.ref(executor); + return ref(executor); } } @@ -410,20 +418,8 @@ private String registerHttpClient(final String id, final Client client) { }); } -<<<<<<< HEAD:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java private List configureFirstRequestInterceptors(final String id, final Client client) { final List interceptors = list(); -======= - private void configure(final BeanDefinitionBuilder bean, final String id, final String name, final Object value) { - log.debug("Client [{}]: Configuring {}: [{}]", id, name, value); - bean.addPropertyValue(name, value); - } - - private void configureInterceptors(final BeanDefinitionBuilder builder, final String id, - final Client client) { - final List requestInterceptors = Registry.list(); - final List responseInterceptors = Registry.list(); ->>>>>>> 4236f76... Reorganized starter:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java if (client.getOauth() != null) { log.debug("Client [{}]: Registering AccessTokensRequestInterceptor", id); @@ -435,7 +431,6 @@ private void configureInterceptors(final BeanDefinitionBuilder builder, final St if (registry.isRegistered("tracerHttpRequestInterceptor")) { log.debug("Client [{}]: Registering TracerHttpRequestInterceptor", id); -<<<<<<< HEAD:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java interceptors.add(ref("tracerHttpRequestInterceptor")); } @@ -448,21 +443,6 @@ private List configureLastRequestInterceptors(final String if (registry.isRegistered("logbookHttpRequestInterceptor")) { log.debug("Client [{}]: Registering LogbookHttpRequestInterceptor", id); interceptors.add(ref("logbookHttpRequestInterceptor")); -======= - requestInterceptors.add(Registry.ref("tracerHttpRequestInterceptor")); - } - - if (registry.isRegistered("logbookHttpResponseInterceptor")) { - log.debug("Client [{}]: Registering LogbookHttpResponseInterceptor", id); - responseInterceptors.add(Registry.ref("logbookHttpResponseInterceptor")); - } - - final List lastRequestInterceptors = Registry.list(); - - if (registry.isRegistered("logbookHttpRequestInterceptor")) { - log.debug("Client [{}]: Registering LogbookHttpRequestInterceptor", id); - lastRequestInterceptors.add(Registry.ref("logbookHttpRequestInterceptor")); ->>>>>>> 4236f76... Reorganized starter:riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/DefaultRiptideRegistrar.java } if (client.isCompressRequest()) { diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBean.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBean.java deleted file mode 100644 index 2b817c45c..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBean.java +++ /dev/null @@ -1,107 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import org.apache.http.HttpRequestInterceptor; -import org.apache.http.HttpResponseInterceptor; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.SSLContexts; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.config.AbstractFactoryBean; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client.Keystore; - -import java.io.FileNotFoundException; -import java.net.URL; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static java.lang.String.format; -import static org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier; - -final class HttpClientFactoryBean extends AbstractFactoryBean { - - private static final Logger LOG = LoggerFactory.getLogger(HttpClientFactoryBean.class); - - private final HttpClientBuilder builder = HttpClientBuilder.create(); - private final RequestConfig.Builder config = RequestConfig.custom(); - private HttpClientCustomizer customizer = $ -> { - }; - - public void setFirstRequestInterceptors(final List interceptors) { - interceptors.forEach(builder::addInterceptorFirst); - } - - public void setLastRequestInterceptors(final List interceptors) { - interceptors.forEach(builder::addInterceptorLast); - } - - public void setLastResponseInterceptors(final List interceptors) { - interceptors.forEach(builder::addInterceptorLast); - } - - public void setConnectTimeout(final TimeSpan connectTimeout) { - config.setConnectTimeout((int) connectTimeout.to(TimeUnit.MILLISECONDS)); - } - - public void setSocketTimeout(final TimeSpan socketTimeout) { - config.setSocketTimeout((int) socketTimeout.to(TimeUnit.MILLISECONDS)); - } - - public void setConnectionTimeToLive(final TimeSpan timeToLive) { - builder.setConnectionTimeToLive(timeToLive.getAmount(), timeToLive.getUnit()); - } - - public void setMaxConnectionsPerRoute(final int maxConnectionsPerRoute) { - builder.setMaxConnPerRoute(maxConnectionsPerRoute); - } - - public void setMaxConnectionsTotal(final int maxConnectionsTotal) { - builder.setMaxConnTotal(maxConnectionsTotal); - } - - public void setTrustedKeystore(final Keystore keystore) throws Exception { - final SSLContextBuilder ssl = SSLContexts.custom(); - - final String path = keystore.getPath(); - final String password = keystore.getPassword(); - - final URL resource = HttpClientFactoryBean.class.getClassLoader().getResource(path); - - if (resource == null) { - throw new FileNotFoundException(format("Keystore [%s] not found.", path)); - } - - try { - ssl.loadTrustMaterial(resource, password == null ? null : password.toCharArray()); - builder.setSSLSocketFactory(new SSLConnectionSocketFactory(ssl.build(), getDefaultHostnameVerifier())); - } catch (final Exception e) { - LOG.error("Error loading keystore [{}]:", path, e); // log full exception, bean initialization code swallows it - throw e; - } - } - - public void setCustomizer(final HttpClientCustomizer customizer) { - this.customizer = customizer; - } - - @Override - public Class getObjectType() { - return CloseableHttpClient.class; - } - - @Override - protected CloseableHttpClient createInstance() throws Exception { - builder.setDefaultRequestConfig(config.build()); - customizer.customize(builder); - return builder.build(); - } - - @Override - protected void destroyInstance(final CloseableHttpClient client) throws Exception { - client.close(); - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpFactory.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpFactory.java index db7dcf030..d325ed29b 100644 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpFactory.java +++ b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/HttpFactory.java @@ -1,12 +1,13 @@ package org.zalando.riptide.autoconfigure; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.HttpMessageConverter; import org.zalando.riptide.Http; import org.zalando.riptide.Plugin; import org.zalando.riptide.UrlResolution; import java.util.List; +import java.util.concurrent.Executor; @SuppressWarnings("unused") final class HttpFactory { @@ -16,13 +17,15 @@ private HttpFactory() { } public static Http create( + final Executor executor, + final ClientHttpRequestFactory requestFactory, final String baseUrl, final UrlResolution urlResolution, - final AsyncClientHttpRequestFactory requestFactory, final List> converters, final List plugins) { return Http.builder() + .executor(executor) .requestFactory(requestFactory) .baseUrl(baseUrl) .urlResolution(urlResolution) diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Metrics.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Metrics.java deleted file mode 100644 index ab62cf18e..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Metrics.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import com.google.common.collect.ImmutableList; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import org.zalando.riptide.Plugin; -import org.zalando.riptide.failsafe.CircuitBreakerListener; -import org.zalando.riptide.failsafe.CompoundRetryListener; -import org.zalando.riptide.failsafe.LoggingRetryListener; -import org.zalando.riptide.failsafe.RetryListener; -import org.zalando.riptide.failsafe.metrics.MetricsCircuitBreakerListener; -import org.zalando.riptide.failsafe.metrics.MetricsRetryListener; -import org.zalando.riptide.metrics.MetricsPlugin; - -final class Metrics { - - private Metrics() { - - } - - public static Plugin createMetricsPlugin(final MeterRegistry registry, - final ImmutableList tags) { - return new MetricsPlugin(registry).withDefaultTags(tags); - } - - public static CircuitBreakerListener createCircuitBreakerListener(final MeterRegistry registry, - final ImmutableList defaultTags) { - return new MetricsCircuitBreakerListener(registry).withDefaultTags(defaultTags); - } - - public static CircuitBreakerListener getDefaultCircuitBreakerListener() { - return CircuitBreakerListener.DEFAULT; - } - - public static RetryListener createRetryListener(final MeterRegistry registry, - final ImmutableList defaultTags) { - return new CompoundRetryListener( - new MetricsRetryListener(registry).withDefaultTags(defaultTags), - new LoggingRetryListener() - ); - } - - public static RetryListener getDefaultRetryListener() { - return new LoggingRetryListener(); - } -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Resilience.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Resilience.java deleted file mode 100644 index 7528d9486..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Resilience.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import net.jodah.failsafe.CircuitBreaker; -import net.jodah.failsafe.RetryPolicy; -import org.zalando.riptide.Plugin; -import org.zalando.riptide.failsafe.FailsafePlugin; -import org.zalando.riptide.failsafe.RetryListener; - -import java.util.concurrent.ScheduledExecutorService; - -final class Resilience { - - private Resilience() { - - } - - public static Plugin createFailsafePlugin(final ScheduledExecutorService scheduler, - final RetryPolicy retryPolicy, final CircuitBreaker circuitBreaker, final RetryListener listener) { - return new FailsafePlugin(scheduler) - .withRetryPolicy(retryPolicy) - .withCircuitBreaker(circuitBreaker) - .withListener(listener); - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBean.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBean.java deleted file mode 100644 index 6d1a544c0..000000000 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBean.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import net.jodah.failsafe.RetryPolicy; -import org.springframework.beans.factory.FactoryBean; -import org.zalando.riptide.failsafe.RetryAfterDelayFunction; -import org.zalando.riptide.failsafe.RetryException; -import org.zalando.riptide.faults.TransientFaultException; -import org.zalando.riptide.autoconfigure.RiptideProperties.Retry; - -import javax.annotation.Nullable; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static java.time.Clock.systemUTC; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -final class RetryPolicyFactoryBean implements FactoryBean { - - private final RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(0); - - public void setConfiguration(final Retry config) { - Optional.ofNullable(config.getFixedDelay()) - .ifPresent(delay -> delay.applyTo(retryPolicy::withDelay)); - - Optional.ofNullable(config.getBackoff()).ifPresent(backoff -> { - final TimeSpan delay = backoff.getDelay(); - final TimeSpan maxDelay = backoff.getMaxDelay(); - final TimeUnit unit = MILLISECONDS; - - @Nullable final Double delayFactor = backoff.getDelayFactor(); - - if (delayFactor == null) { - retryPolicy.withBackoff(delay.to(unit), maxDelay.to(unit), unit); - } else { - retryPolicy.withBackoff(delay.to(unit), maxDelay.to(unit), unit, delayFactor); - } - }); - - retryPolicy.withMaxRetries(Optional.ofNullable(config.getMaxRetries()).orElse(-1)); - - Optional.ofNullable(config.getMaxDuration()) - .ifPresent(duration -> duration.applyTo(retryPolicy::withMaxDuration)); - - Optional.ofNullable(config.getJitterFactor()) - .ifPresent(retryPolicy::withJitter); - - Optional.ofNullable(config.getJitter()) - .ifPresent(jitter -> jitter.applyTo(retryPolicy::withJitter)); - - retryPolicy.retryOn(TransientFaultException.class); - retryPolicy.retryOn(RetryException.class); - retryPolicy.withDelay(new RetryAfterDelayFunction(systemUTC())); - } - - @Override - public RetryPolicy getObject() { - return retryPolicy; - } - - @Override - public Class getObjectType() { - return RetryPolicy.class; - } - - @Override - public boolean isSingleton() { - return true; - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideTestAutoConfiguration.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideTestAutoConfiguration.java index 838e8d879..980a5083f 100644 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideTestAutoConfiguration.java +++ b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideTestAutoConfiguration.java @@ -6,9 +6,14 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.web.client.MockRestServiceServer; +import org.springframework.test.web.client.MockRestServiceServer.MockRestServiceServerBuilder; import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; + +import java.lang.reflect.Field; import static org.apiguardian.api.API.Status.EXPERIMENTAL; +import static org.springframework.test.web.client.MockRestServiceServer.bindTo; @API(status = EXPERIMENTAL) @Configuration @@ -16,7 +21,8 @@ public class RiptideTestAutoConfiguration { static final String SERVER_BEAN_NAME = "mockRestServiceServer"; - static final String TEMPLATE_BEAN_NAME = "_mockAsyncRestTemplate"; + static final String REST_TEMPLATE_BEAN_NAME = "_mockRestTemplate"; + static final String ASYNC_REST_TEMPLATE_BEAN_NAME = "_mockAsyncRestTemplate"; @Bean public static RiptidePostProcessor restClientTestPostProcessor() { @@ -26,14 +32,32 @@ public static RiptidePostProcessor restClientTestPostProcessor() { @Configuration static class MockConfiguration { - @Bean(name = TEMPLATE_BEAN_NAME) + @Bean(name = REST_TEMPLATE_BEAN_NAME) + RestTemplate mockRestTemplate() { + return new RestTemplate(); + } + + @Bean(name = ASYNC_REST_TEMPLATE_BEAN_NAME) AsyncRestTemplate mockAsyncRestTemplate() { return new AsyncRestTemplate(); } @Bean(name = SERVER_BEAN_NAME) - MockRestServiceServer mockRestServiceServer(@Qualifier(TEMPLATE_BEAN_NAME) final AsyncRestTemplate template) { - return MockRestServiceServer.createServer(template); + MockRestServiceServer mockRestServiceServer( + @Qualifier(REST_TEMPLATE_BEAN_NAME) final RestTemplate restTemplate, + @Qualifier(ASYNC_REST_TEMPLATE_BEAN_NAME) final AsyncRestTemplate asyncRestTemplate) + throws NoSuchFieldException, IllegalAccessException { + + final MockRestServiceServerBuilder builder = bindTo(restTemplate); + bind(builder, asyncRestTemplate); + return builder.build(); + } + + private void bind(final MockRestServiceServerBuilder builder, final AsyncRestTemplate asyncRestTemplate) + throws NoSuchFieldException, IllegalAccessException { + final Field field = builder.getClass().getDeclaredField("asyncRestTemplate"); + field.setAccessible(true); + field.set(builder, asyncRestTemplate); } } diff --git a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/TestRiptideRegistrar.java b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/TestRiptideRegistrar.java index 5fca10414..8384ce0fd 100644 --- a/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/TestRiptideRegistrar.java +++ b/riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/TestRiptideRegistrar.java @@ -4,8 +4,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequestFactory; import static org.springframework.beans.factory.support.BeanDefinitionBuilder.genericBeanDefinition; +import static org.zalando.riptide.autoconfigure.RiptideTestAutoConfiguration.ASYNC_REST_TEMPLATE_BEAN_NAME; +import static org.zalando.riptide.autoconfigure.RiptideTestAutoConfiguration.REST_TEMPLATE_BEAN_NAME; +import static org.zalando.riptide.autoconfigure.RiptideTestAutoConfiguration.SERVER_BEAN_NAME; @Slf4j @AllArgsConstructor @@ -17,15 +21,23 @@ class TestRiptideRegistrar implements RiptideRegistrar { @Override public void register() { properties.getClients().forEach((id, client) -> - registerAsyncClientHttpRequestFactory(id)); + registerRequestFactories(id)); } - private void registerAsyncClientHttpRequestFactory(final String id) { + private void registerRequestFactories(final String id) { + registry.registerIfAbsent(id, ClientHttpRequestFactory.class, () -> { + log.debug("Client [{}]: Registering mocked ClientHttpRequestFactory", id); + final BeanDefinitionBuilder factory = genericBeanDefinition(ClientHttpRequestFactory.class); + factory.addDependsOn(SERVER_BEAN_NAME); + factory.setFactoryMethodOnBean("getRequestFactory", REST_TEMPLATE_BEAN_NAME); + return factory; + }); + registry.registerIfAbsent(id, AsyncClientHttpRequestFactory.class, () -> { log.debug("Client [{}]: Registering mocked AsyncClientHttpRequestFactory", id); final BeanDefinitionBuilder factory = genericBeanDefinition(AsyncClientHttpRequestFactory.class); - factory.addDependsOn(RiptideTestAutoConfiguration.SERVER_BEAN_NAME); - factory.setFactoryMethodOnBean("getAsyncRequestFactory", RiptideTestAutoConfiguration.TEMPLATE_BEAN_NAME); + factory.addDependsOn(SERVER_BEAN_NAME); + factory.setFactoryMethodOnBean("getAsyncRequestFactory", ASYNC_REST_TEMPLATE_BEAN_NAME); return factory; }); } diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBeanTest.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBeanTest.java deleted file mode 100644 index 49a4912a0..000000000 --- a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AccessTokensFactoryBeanTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import org.junit.Test; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client.OAuth; -import org.zalando.riptide.autoconfigure.RiptideProperties.Defaults; -import org.zalando.riptide.autoconfigure.RiptideProperties.GlobalOAuth; - -import java.net.URI; - -import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; - -public final class AccessTokensFactoryBeanTest { - - private final AccessTokensFactoryBean unit = new AccessTokensFactoryBean(new RiptideProperties( - new Defaults(), - new GlobalOAuth( - URI.create("http://localhost"), - null, - TimeSpan.of(5, MINUTES), - TimeSpan.of(1, SECONDS), - TimeSpan.of(1, SECONDS) - ), - singletonMap( - "example", new Client( - null, null, null, null, null, null, null, null, - new OAuth(singletonList("example")), - null, null, null, null, null, null, null, false, null) - ) - )); - - // just because spring sometimes fails to destroy properly during tests - @Test - public void shouldDestroy() throws Exception { - unit.afterPropertiesSet(); - unit.getObject(); - unit.destroy(); - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AsyncClientHttpRequestFactoryTest.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AsyncClientHttpRequestFactoryTest.java index d9831db12..820b945f1 100644 --- a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AsyncClientHttpRequestFactoryTest.java +++ b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/AsyncClientHttpRequestFactoryTest.java @@ -12,7 +12,7 @@ import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.stereotype.Component; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import org.zalando.stups.tokens.AccessTokens; import static org.hamcrest.Matchers.is; @@ -46,12 +46,12 @@ public AccessTokens accessTokens() { @Test public void shouldAutowireSync() { - assertThat(sync.getClass(), is(RestAsyncClientHttpRequestFactory.class)); + assertThat(sync.getClass(), is(ApacheClientHttpRequestFactory.class)); } @Test public void shouldAutowireAsync() { - assertThat(async.getClass(), is(RestAsyncClientHttpRequestFactory.class)); + assertThat(async.getClass(), is(ConcurrentClientHttpRequestFactory.class)); } } diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactoryTest.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactoryTest.java new file mode 100644 index 000000000..88fff3fbb --- /dev/null +++ b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ConcurrentClientHttpRequestFactoryTest.java @@ -0,0 +1,72 @@ +package org.zalando.riptide.autoconfigure; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.http.client.AsyncClientHttpRequest; +import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpRequest; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.ClientHttpResponse; +import org.springframework.mock.http.client.MockClientHttpRequest; +import org.springframework.mock.http.client.MockClientHttpResponse; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; + +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.springframework.http.HttpMethod.GET; +import static org.springframework.http.HttpStatus.OK; + +@RunWith(MockitoJUnitRunner.class) +public final class ConcurrentClientHttpRequestFactoryTest { + + @Captor + private ArgumentCaptor> captor; + + private final ClientHttpRequestFactory delegate = mock(ClientHttpRequestFactory.class); + private final AsyncListenableTaskExecutor executor = mock(AsyncListenableTaskExecutor.class); + private final AsyncClientHttpRequestFactory unit = new ConcurrentClientHttpRequestFactory(delegate, executor); + + @Test + public void shouldDelegate() throws IOException { + final ClientHttpRequest original = new MockClientHttpRequest(); + when(delegate.createRequest(any(), any())).thenReturn(original); + + final AsyncClientHttpRequest request = unit.createAsyncRequest(URI.create("/"), GET); + + assertThat(request.getMethod(), is(sameInstance(original.getMethod()))); + assertThat(request.getMethodValue(), is(sameInstance(original.getMethodValue()))); + assertThat(request.getURI(), is(sameInstance(original.getURI()))); + assertThat(request.getHeaders(), is(sameInstance(original.getHeaders()))); + assertThat(request.getBody(), is(sameInstance(original.getBody()))); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldExecute() throws Exception { + final MockClientHttpRequest original = new MockClientHttpRequest(); + final MockClientHttpResponse response = new MockClientHttpResponse(new byte[0], OK); + + original.setResponse(response); + when(delegate.createRequest(any(), any())).thenReturn(original); + + unit.createAsyncRequest(URI.create("/"), GET).executeAsync(); + + verify(executor).submitListenable(captor.capture()); + + assertThat(captor.getValue().call(), is(sameInstance(response))); + } + +} diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBeanTest.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBeanTest.java deleted file mode 100644 index 146a3f3c5..000000000 --- a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/HttpClientFactoryBeanTest.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.zalando.riptide.autoconfigure.RiptideProperties.Client.Keystore; - -import java.io.FileNotFoundException; -import java.io.IOException; - -public class HttpClientFactoryBeanTest { - - private HttpClientFactoryBean unit = new HttpClientFactoryBean(); - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @Test - public void shouldFailOnKeystoreNotFound() throws Exception { - exception.expect(FileNotFoundException.class); - exception.expectMessage("i-do-not-exist.keystore"); - - final Keystore nonExistingKeystore = new Keystore(); - nonExistingKeystore.setPath("i-do-not-exist.keystore"); - unit.setTrustedKeystore(nonExistingKeystore); - } - - @Test - public void shouldFailOnInvalidKeystore() throws Exception { - exception.expect(IOException.class); - - final Keystore invalidKeystore = new Keystore(); - invalidKeystore.setPath("application-default.yml"); - unit.setTrustedKeystore(invalidKeystore); - } - - // just because spring sometimes fails to destroy properly during tests - @Test - public void shouldDestroy() throws Exception { - unit.afterPropertiesSet(); - unit.getObject(); - unit.destroy(); - } - -} diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ManualConfiguration.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ManualConfiguration.java index 7b97e70c8..12f53bc74 100644 --- a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ManualConfiguration.java +++ b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/ManualConfiguration.java @@ -17,7 +17,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.http.client.AsyncClientHttpRequestFactory; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; @@ -39,14 +39,14 @@ import org.zalando.riptide.failsafe.CircuitBreakerListener; import org.zalando.riptide.failsafe.FailsafePlugin; import org.zalando.riptide.failsafe.RetryException; +import org.zalando.riptide.failsafe.metrics.MetricsCircuitBreakerListener; +import org.zalando.riptide.failsafe.metrics.MetricsRetryListener; import org.zalando.riptide.faults.FaultClassifier; import org.zalando.riptide.faults.TransientFaultException; import org.zalando.riptide.faults.TransientFaultPlugin; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import org.zalando.riptide.httpclient.GzipHttpRequestInterceptor; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; import org.zalando.riptide.metrics.MetricsPlugin; -import org.zalando.riptide.failsafe.metrics.MetricsCircuitBreakerListener; -import org.zalando.riptide.failsafe.metrics.MetricsRetryListener; import org.zalando.riptide.stream.Streams; import org.zalando.riptide.timeout.TimeoutPlugin; import org.zalando.stups.oauth2.httpcomponents.AccessTokensRequestInterceptor; @@ -64,6 +64,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -115,10 +116,11 @@ public AccessTokens tokens() { static class ExampleClientConfiguration { @Bean - public Http exampleHttp(final AsyncClientHttpRequestFactory requestFactory, + public Http exampleHttp(final Executor executor, final ClientHttpRequestFactory requestFactory, final ClientHttpMessageConverters converters, final List plugins) { return Http.builder() + .executor(executor) .requestFactory(requestFactory) .baseUrl("https://www.example.com") .urlResolution(UrlResolution.RFC) @@ -182,13 +184,14 @@ public RestTemplate exampleRestTemplate(final ClientHttpRequestFactory requestFa } @Bean - public AsyncRestTemplate exampleAsyncRestTemplate(final AsyncClientHttpRequestFactory requestFactory, - final ClientHttpMessageConverters converters, final List plugins) { + public AsyncRestTemplate exampleAsyncRestTemplate(final ClientHttpRequestFactory requestFactory, + final Executor executor, final ClientHttpMessageConverters converters, final List plugins) { final AsyncRestTemplate template = new AsyncRestTemplate(); + final AsyncListenableTaskExecutor taskExecutor = new ConcurrentTaskExecutor(executor); final DefaultUriBuilderFactory handler = new DefaultUriBuilderFactory("https://www.example.com"); template.setUriTemplateHandler(handler); - template.setAsyncRequestFactory(requestFactory); + template.setAsyncRequestFactory(new ConcurrentClientHttpRequestFactory(requestFactory, taskExecutor)); template.setMessageConverters(converters.getConverters()); template.setInterceptors(plugins.stream().map(PluginInterceptor::new).collect(toList())); @@ -196,10 +199,9 @@ public AsyncRestTemplate exampleAsyncRestTemplate(final AsyncClientHttpRequestFa } @Bean - public RestAsyncClientHttpRequestFactory exampleAsyncClientHttpRequestFactory( - final AccessTokens tokens, final Tracer tracer, final Logbook logbook, - final ExecutorService executor) throws Exception { - return new RestAsyncClientHttpRequestFactory( + public ApacheClientHttpRequestFactory exampleAsyncClientHttpRequestFactory( + final AccessTokens tokens, final Tracer tracer, final Logbook logbook) throws Exception { + return new ApacheClientHttpRequestFactory( HttpClientBuilder.create() .setDefaultRequestConfig(RequestConfig.custom() .setConnectTimeout(5000) @@ -220,8 +222,7 @@ public RestAsyncClientHttpRequestFactory exampleAsyncClientHttpRequestFactory( "password".toCharArray()) .build(), getDefaultHostnameVerifier())) - .build(), - new ConcurrentTaskExecutor(executor)); + .build()); } @Bean(destroyMethod = "shutdown") diff --git a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBeanTest.java b/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBeanTest.java deleted file mode 100644 index 183eabba0..000000000 --- a/riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/RetryPolicyFactoryBeanTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.zalando.riptide.autoconfigure; - -import org.junit.Test; - -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -public class RetryPolicyFactoryBeanTest { - - private final RetryPolicyFactoryBean unit = new RetryPolicyFactoryBean(); - - @Test - public void shouldRetryNeverIfNotConfigured() { - assertThat(unit.getObject().getMaxRetries(), is(0)); - } - - @Test - public void shouldRetryAsConfigured() { - final RiptideProperties.Retry config = new RiptideProperties.Retry(); - config.setMaxRetries(42); - unit.setConfiguration(config); - - assertThat(unit.getObject().getMaxRetries(), is(42)); - } - - @Test - public void shouldRetryForeverIfNotSpecified() { - final RiptideProperties.Retry config = new RiptideProperties.Retry(); - config.setMaxDuration(TimeSpan.of(1, MINUTES)); - unit.setConfiguration(config); - - assertThat(unit.getObject().getMaxRetries(), is(-1)); - } -} diff --git a/riptide-stream/src/test/java/org/zalando/riptide/stream/MockSetup.java b/riptide-stream/src/test/java/org/zalando/riptide/stream/MockSetup.java index d6bf64c9e..66d64f36a 100644 --- a/riptide-stream/src/test/java/org/zalando/riptide/stream/MockSetup.java +++ b/riptide-stream/src/test/java/org/zalando/riptide/stream/MockSetup.java @@ -2,19 +2,22 @@ import org.springframework.http.converter.HttpMessageConverter; import org.springframework.test.web.client.MockRestServiceServer; -import org.springframework.web.client.AsyncRestTemplate; +import org.springframework.web.client.RestTemplate; import org.zalando.riptide.Http; +import java.util.concurrent.Executors; + public final class MockSetup { private final MockRestServiceServer server; private final Http http; public MockSetup(final String baseUrl, final Iterable> converters) { - final AsyncRestTemplate template = new AsyncRestTemplate(); + final RestTemplate template = new RestTemplate(); this.server = MockRestServiceServer.createServer(template); this.http = Http.builder() - .requestFactory(template.getAsyncRequestFactory()) + .executor(Executors.newSingleThreadExecutor()) + .requestFactory(template.getRequestFactory()) .converters(converters) .baseUrl(baseUrl) .build(); diff --git a/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamIOTest.java b/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamIOTest.java index f5b59c62e..bddb118d6 100644 --- a/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamIOTest.java +++ b/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamIOTest.java @@ -10,10 +10,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.springframework.http.client.ClientHttpResponse; -import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.util.List; @@ -21,7 +19,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -67,7 +64,8 @@ public String getLogin() { private final ExecutorService executor = newSingleThreadExecutor(); private final Http http = Http.builder() - .requestFactory(new RestAsyncClientHttpRequestFactory(client, new ConcurrentTaskExecutor(executor))) + .executor(executor) + .requestFactory(new ApacheClientHttpRequestFactory(client)) .baseUrl(driver.getBaseUrl()) .converter(streamConverter(new ObjectMapper().disable(FAIL_ON_UNKNOWN_PROPERTIES), singletonList(APPLICATION_JSON))) .build(); @@ -98,13 +96,24 @@ public void shouldReadContributors() throws IOException { @Test public void shouldCancelRequest() throws IOException { + driver.addExpectation(onRequestTo("/repos/zalando/riptide/contributors"), + giveResponseAsBytes(getResource("contributors.json").openStream(), "application/json")); + final CompletableFuture future = http.get("/repos/{org}/{repo}/contributors", "zalando", "riptide") .dispatch(series(), on(SUCCESSFUL).call(pass())); future.cancel(true); - Thread.sleep(1000); + try { + future.join(); + } catch (final CancellationException e) { + // expected + } + + // we don't care whether the request was actually made or not, but by default the driver will verify + // all expectations after every tests + driver.reset(); } @Test diff --git a/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamsTest.java b/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamsTest.java index 821c41984..6106c5447 100644 --- a/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamsTest.java +++ b/riptide-stream/src/test/java/org/zalando/riptide/stream/StreamsTest.java @@ -18,7 +18,6 @@ import java.net.URI; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import static java.util.Collections.singletonList; @@ -64,18 +63,20 @@ public StreamsTest() { } @Test - public void shouldCallConsumerWithList() throws Exception { + public void shouldCallConsumerWithList() { server.expect(requestTo(url)).andRespond( withSuccess() .body(new ClassPathResource("account-list.json")) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer, Exception> verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer, Exception> verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(listOf(AccountBody.class)), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(listOf(AccountBody.class)), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(Arrays.asList( new AccountBody("1234567890", "Acme Corporation"), @@ -86,40 +87,44 @@ public void shouldCallConsumerWithList() throws Exception { } @Test - public void shouldCallConsumerWithArray() throws Exception { + public void shouldCallConsumerWithArray() { server.expect(requestTo(url)).andRespond( withSuccess() .body(new ClassPathResource("account-list.json")) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody[].class), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody[].class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); - verify(verifier).accept(new AccountBody[] { + verify(verifier).accept(new AccountBody[]{ new AccountBody("1234567890", "Acme Corporation"), new AccountBody("1234567891", "Acme Company"), new AccountBody("1234567892", "Acme GmbH"), - new AccountBody("1234567893", "Acme SE") }); + new AccountBody("1234567893", "Acme SE")}); verifyNoMoreInteractions(verifier); } @Test - public void shouldCallConsumerWithJsonList() throws Exception { + public void shouldCallConsumerWithJsonList() { server.expect(requestTo(url)).andRespond( withSuccess() .body(new ClassPathResource("account-list.json")) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(new AccountBody("1234567890", "Acme Corporation")); verify(verifier).accept(new AccountBody("1234567891", "Acme Company")); @@ -129,18 +134,20 @@ public void shouldCallConsumerWithJsonList() throws Exception { } @Test - public void shouldCallConsumerWithXJsonStream() throws Exception { + public void shouldCallConsumerWithXJsonStream() { server.expect(requestTo(url)).andRespond( withSuccess() .body(new ClassPathResource("account-stream.json")) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(new AccountBody("1234567890", "Acme Corporation")); verify(verifier).accept(new AccountBody("1234567891", "Acme Company")); @@ -150,18 +157,20 @@ public void shouldCallConsumerWithXJsonStream() throws Exception { } @Test - public void shouldCallConsumerWithJsonSequence() throws Exception { + public void shouldCallConsumerWithJsonSequence() { server.expect(requestTo(url)).andRespond( withSuccess() .body(new ClassPathResource("account-sequence.json")) .contentType(APPLICATION_JSON_SEQ)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(new AccountBody("1234567890", "Acme Corporation")); verify(verifier).accept(new AccountBody("1234567891", "Acme Company")); @@ -181,12 +190,14 @@ public void shouldNotCallConsumerForEmptyStream() { .body(new InputStreamResource(new ByteArrayInputStream(new byte[0]))) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)).join(); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verifyZeroInteractions(verifier); } @@ -202,26 +213,26 @@ public void shouldFailOnCallWithConsumerException() throws Exception { .body(new ClassPathResource("account-sequence.json")) .contentType(APPLICATION_JSON_SEQ)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); doCallRealMethod().when(verifier).accept(any()); doThrow(new IOException()).when(verifier).tryAccept(new AccountBody("1234567892", "Acme GmbH")); - final CompletableFuture future = unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(new AccountBody("1234567890", "Acme Corporation")); verify(verifier).accept(new AccountBody("1234567891", "Acme Company")); verify(verifier).accept(new AccountBody("1234567892", "Acme GmbH")); verify(verifier, times(3)).tryAccept(any()); verifyNoMoreInteractions(verifier); - - future.join(); } @Test - public void shouldFailOnCallWithInvalidStream() throws Exception { + public void shouldFailOnCallWithInvalidStream() { exception.expect(CompletionException.class); exception.expectCause(instanceOf(UncheckedIOException.class)); @@ -230,17 +241,17 @@ public void shouldFailOnCallWithInvalidStream() throws Exception { .body(new ClassPathResource("account-fail.json")) .contentType(APPLICATION_X_JSON_STREAM)); - @SuppressWarnings("unchecked") - final ThrowingConsumer verifier = mock(ThrowingConsumer.class); + @SuppressWarnings("unchecked") final ThrowingConsumer verifier = mock( + ThrowingConsumer.class); - final CompletableFuture future = unit.get("/accounts").dispatch(status(), - on(OK).call(streamOf(AccountBody.class), forEach(verifier)), - anyStatus().call(this::fail)); + unit.get("/accounts") + .dispatch(status(), + on(OK).call(streamOf(AccountBody.class), forEach(verifier)), + anyStatus().call(this::fail)) + .join(); verify(verifier).accept(new AccountBody("1234567890", "Acme Corporation")); verifyNoMoreInteractions(verifier); - - future.join(); } private void fail(final ClientHttpResponse response) throws IOException { diff --git a/riptide-timeout/src/main/java/org/zalando/riptide/timeout/TimeoutPlugin.java b/riptide-timeout/src/main/java/org/zalando/riptide/timeout/TimeoutPlugin.java index f3226e122..2b4cf1ab7 100644 --- a/riptide-timeout/src/main/java/org/zalando/riptide/timeout/TimeoutPlugin.java +++ b/riptide-timeout/src/main/java/org/zalando/riptide/timeout/TimeoutPlugin.java @@ -1,5 +1,6 @@ package org.zalando.riptide.timeout; +import com.google.gag.annotation.remark.ThisWouldBeOneLineIn; import lombok.AllArgsConstructor; import org.apiguardian.api.API; import org.springframework.http.client.ClientHttpResponse; @@ -8,6 +9,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; @@ -22,29 +26,55 @@ */ @API(status = STABLE) @AllArgsConstructor +@ThisWouldBeOneLineIn(language = "Java 9", toWit = "return () -> execution.execute().orTimeout(timeout, unit)") public final class TimeoutPlugin implements Plugin { + private final ScheduledExecutorService scheduler; private final long timeout; private final TimeUnit unit; private final Executor executor; - public TimeoutPlugin(final long timeout, final TimeUnit unit) { - this(timeout, unit, Runnable::run); + public TimeoutPlugin(final ScheduledExecutorService scheduler, final long timeout, final TimeUnit unit) { + this(scheduler, timeout, unit, Runnable::run); } @Override public RequestExecution beforeDispatch(final RequestExecution execution) { return arguments -> { final CompletableFuture upstream = execution.execute(arguments); - final CompletableFuture downstream = upstream.orTimeout(timeout, unit); - - return downstream.whenCompleteAsync((response, throwable) -> { - // TODO make sure this works with nested exceptions - if (throwable instanceof TimeoutException) { - upstream.cancel(true); - } - }, executor); + + final CompletableFuture downstream = preserveCancelability(upstream); + upstream.whenCompleteAsync(forwardTo(downstream), executor); + + final ScheduledFuture scheduledTimeout = delay(timeout(downstream), cancel(upstream)); + upstream.whenCompleteAsync(cancel(scheduledTimeout), executor); + + return downstream; }; } + private Runnable cancel(final CompletableFuture future) { + return () -> future.cancel(true); + } + + private Runnable timeout(final CompletableFuture future) { + return () -> future.completeExceptionally(new TimeoutException()); + } + + private ScheduledFuture delay(final Runnable... tasks) { + return scheduler.schedule(run(executor, tasks), timeout, unit); + } + + private Runnable run(final Executor executor, final Runnable... tasks) { + return () -> executor.execute(run(tasks)); + } + + private Runnable run(final Runnable... tasks) { + return () -> stream(tasks).forEach(Runnable::run); + } + + private BiConsumer cancel(final Future future) { + return (result, throwable) -> future.cancel(true); + } + } diff --git a/riptide-timeout/src/test/java/org/zalando/riptide/timeout/EnforceCoverageTest.java b/riptide-timeout/src/test/java/org/zalando/riptide/timeout/EnforceCoverageTest.java index c01e62798..b71592c55 100644 --- a/riptide-timeout/src/test/java/org/zalando/riptide/timeout/EnforceCoverageTest.java +++ b/riptide-timeout/src/test/java/org/zalando/riptide/timeout/EnforceCoverageTest.java @@ -4,15 +4,18 @@ import com.google.gag.annotation.remark.OhNoYouDidnt; import org.junit.Test; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; + @Hack @OhNoYouDidnt public final class EnforceCoverageTest { @Test public void shouldUsePrimaryConstructor() { - new TimeoutPlugin(1, TimeUnit.SECONDS); + new TimeoutPlugin(mock(ScheduledExecutorService.class), 1, TimeUnit.SECONDS); } } diff --git a/riptide-timeout/src/test/java/org/zalando/riptide/timeout/TimeoutPluginTest.java b/riptide-timeout/src/test/java/org/zalando/riptide/timeout/TimeoutPluginTest.java index 4bbc7ab67..a8bcaa834 100644 --- a/riptide-timeout/src/test/java/org/zalando/riptide/timeout/TimeoutPluginTest.java +++ b/riptide-timeout/src/test/java/org/zalando/riptide/timeout/TimeoutPluginTest.java @@ -8,19 +8,20 @@ import org.junit.After; import org.junit.Rule; import org.junit.Test; -import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.zalando.riptide.Http; -import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory; +import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory; import java.io.IOException; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static com.github.restdriver.clientdriver.RestClientDriver.giveEmptyResponse; import static com.github.restdriver.clientdriver.RestClientDriver.onRequestTo; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.junit.Assert.fail; import static org.zalando.riptide.PassRoute.pass; @@ -30,14 +31,15 @@ public class TimeoutPluginTest { public final ClientDriverRule driver = new ClientDriverRule(); private final CloseableHttpClient client = HttpClientBuilder.create().build(); - private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(); - private final RestAsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor); + private final Executor executor = Executors.newCachedThreadPool(); + private final ApacheClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client); private final Http unit = Http.builder() + .executor(executor) .requestFactory(factory) .baseUrl(driver.getBaseUrl()) .converter(createJsonConverter()) - .plugin(new TimeoutPlugin(1, TimeUnit.SECONDS, executor)) + .plugin(new TimeoutPlugin(newSingleThreadScheduledExecutor(), 1, TimeUnit.SECONDS, executor)) .build(); private static MappingJackson2HttpMessageConverter createJsonConverter() {