Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/request io thread #459

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import org.junit.Rule;
import org.junit.Test;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.zalando.riptide.Http;
import org.zalando.riptide.capture.Completion;
import org.zalando.riptide.httpclient.RestAsyncClientHttpRequestFactory;
import org.zalando.riptide.httpclient.ApacheClientHttpRequestFactory;

import java.io.IOException;
import java.util.concurrent.Executors;
Expand All @@ -37,9 +37,10 @@ public final class BackupRequestPluginTest {

private final CloseableHttpClient client = HttpClientBuilder.create().build();
private final AsyncListenableTaskExecutor executor = new ConcurrentTaskExecutor(Executors.newFixedThreadPool(2));
private final AsyncClientHttpRequestFactory factory = new RestAsyncClientHttpRequestFactory(client, executor);
private final ClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client);

private final Http unit = Http.builder()
.executor(executor)
.requestFactory(factory)
.baseUrl(driver.getBaseUrl())
.plugin(new BackupRequestPlugin(newSingleThreadScheduledExecutor(), 1, SECONDS, executor))
Expand Down Expand Up @@ -125,6 +126,7 @@ public void shouldSendBackupRequestsForGetWithBody() throws Throwable {
@Test
public void shouldSendBackupRequestForCustomSafeDetectedRequest() throws Throwable {
final Http unit = Http.builder()
.executor(executor)
.requestFactory(factory)
.baseUrl(driver.getBaseUrl())
.plugin(new BackupRequestPlugin(newSingleThreadScheduledExecutor(), 1, SECONDS, executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.test.web.client.MockRestServiceServer;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;
import org.zalando.riptide.Http;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand All @@ -42,10 +43,11 @@ public final class CaptureTest {
private final MockRestServiceServer server;

public CaptureTest() {
final AsyncRestTemplate template = new AsyncRestTemplate();
final RestTemplate template = new RestTemplate();
this.server = MockRestServiceServer.createServer(template);
this.unit = Http.builder()
.requestFactory(template.getAsyncRequestFactory())
.executor(Executors.newSingleThreadExecutor())
.requestFactory(template.getRequestFactory())
.converter(createJsonConverter())
.converter(new StringHttpMessageConverter())
.baseUrl("https://api.example.com")
Expand Down
18 changes: 11 additions & 7 deletions riptide-core/src/main/java/org/zalando/riptide/DefaultHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,35 @@
import com.google.common.collect.ImmutableMultimap;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;

import java.net.URI;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.springframework.http.HttpHeaders.readOnlyHttpHeaders;

final class DefaultHttp implements Http {

private static final HttpHeaders EMPTY = readOnlyHttpHeaders(new HttpHeaders());

private final AsyncClientHttpRequestFactory requestFactory;
private final Executor executor;
private final ClientHttpRequestFactory requestFactory;
private final MessageWorker worker;
private final Supplier<URI> baseUrlProvider;
private final RequestArguments arguments;
private final Plugin plugin;

DefaultHttp(final AsyncClientHttpRequestFactory requestFactory, final List<HttpMessageConverter<?>> converters,
DefaultHttp(final Executor executor, final ClientHttpRequestFactory requestFactory,
final List<HttpMessageConverter<?>> converters,
final Supplier<URI> baseUrlProvider, final UrlResolution resolution, final Plugin plugin) {
this.requestFactory = checkNotNull(requestFactory, "request factory");
this.executor = requireNonNull(executor, "executor");
this.requestFactory = requireNonNull(requestFactory, "request factory");
this.worker = new MessageWorker(converters);
this.baseUrlProvider = checkNotNull(baseUrlProvider, "base url provider");
this.baseUrlProvider = requireNonNull(baseUrlProvider, "base url provider");
this.arguments = RequestArguments.create().withUrlResolution(resolution);
this.plugin = plugin;
}
Expand Down Expand Up @@ -178,7 +182,7 @@ public Requester execute(final HttpMethod method) {
}

private Requester execute(final RequestArguments arguments) {
return new Requester(requestFactory, worker, arguments, plugin, ImmutableMultimap.of(), EMPTY);
return new Requester(executor, requestFactory, worker, arguments, plugin, ImmutableMultimap.of(), EMPTY);
}

}
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package org.zalando.riptide;

import com.google.common.collect.ImmutableList;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import org.zalando.riptide.Http.ConfigurationStage;
import org.zalando.riptide.Http.ExecutorStage;
import org.zalando.riptide.Http.FinalStage;
import org.zalando.riptide.Http.RequestFactoryStage;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static org.zalando.riptide.Plugin.compound;

final class DefaultHttpBuilder implements RequestFactoryStage, ConfigurationStage, FinalStage {
final class DefaultHttpBuilder implements ExecutorStage, RequestFactoryStage, ConfigurationStage, FinalStage {

// package private so we can trick code coverage
static class Converters {
Expand All @@ -41,7 +44,8 @@ private Plugins() {

private static final UrlResolution DEFAULT_RESOLUTION = UrlResolution.RFC;

private AsyncClientHttpRequestFactory requestFactory;
private Executor executor;
private ClientHttpRequestFactory requestFactory;
private final List<HttpMessageConverter<?>> converters = new ArrayList<>();
private Supplier<URI> baseUrlProvider = () -> null;
private UrlResolution resolution = DEFAULT_RESOLUTION;
Expand All @@ -52,7 +56,13 @@ private Plugins() {
}

@Override
public ConfigurationStage requestFactory(final AsyncClientHttpRequestFactory requestFactory) {
public RequestFactoryStage executor(final Executor executor) {
this.executor = executor;
return this;
}

@Override
public ConfigurationStage requestFactory(final ClientHttpRequestFactory requestFactory) {
this.requestFactory = requestFactory;
return this;
}
Expand All @@ -63,7 +73,7 @@ public ConfigurationStage defaultConverters() {
}

@Override
public ConfigurationStage converters(final Iterable<HttpMessageConverter<?>> converters) {
public ConfigurationStage converters(@Nonnull final Iterable<HttpMessageConverter<?>> converters) {
converters.forEach(this::converter);
return this;
}
Expand Down Expand Up @@ -108,7 +118,7 @@ public ConfigurationStage defaultPlugins() {
}

@Override
public ConfigurationStage plugins(final Iterable<Plugin> plugins) {
public ConfigurationStage plugins(@Nonnull final Iterable<Plugin> plugins) {
plugins.forEach(this::plugin);
return this;
}
Expand All @@ -121,7 +131,7 @@ public ConfigurationStage plugin(final Plugin plugin) {

@Override
public Http build() {
return new DefaultHttp(requestFactory, converters(), baseUrlProvider, resolution, compound(plugins()));
return new DefaultHttp(executor, requestFactory, converters(), baseUrlProvider, resolution, compound(plugins()));
}

private List<HttpMessageConverter<?>> converters() {
Expand Down
21 changes: 8 additions & 13 deletions riptide-core/src/main/java/org/zalando/riptide/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@

import org.apiguardian.api.API;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static org.apiguardian.api.API.Status.STABLE;
Expand All @@ -26,7 +23,6 @@
* .dispatch(..)}</pre>
*
* @see RestTemplate
* @see AsyncRestTemplate
*/
@API(status = STABLE)
public interface Http {
Expand Down Expand Up @@ -67,17 +63,16 @@ public interface Http {
Requester execute(HttpMethod method, URI uri);
Requester execute(HttpMethod method);

static RequestFactoryStage builder() {
static ExecutorStage builder() {
return new DefaultHttpBuilder();
}

interface ExecutorStage {
RequestFactoryStage executor(Executor executor);
}

interface RequestFactoryStage {
ConfigurationStage requestFactory(AsyncClientHttpRequestFactory requestFactory);
default ConfigurationStage simpleRequestFactory(final ExecutorService executor) {
final SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setTaskExecutor(new ConcurrentTaskExecutor(executor));
return requestFactory(factory);
}
ConfigurationStage requestFactory(ClientHttpRequestFactory requestFactory);
}

interface ConfigurationStage extends FinalStage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.AsyncClientHttpRequest;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.client.HttpMessageConverterExtractor;
import org.springframework.web.client.ResponseExtractor;
import org.springframework.web.client.RestClientException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
Expand Down Expand Up @@ -66,7 +67,8 @@ private <I> I cast(final Object result) {
}

@Override
public <T> void write(final AsyncClientHttpRequest request, final HttpEntity<T> entity) throws IOException {
public <T> void write(@Nonnull final ClientHttpRequest request, @Nonnull final HttpEntity<T> entity)
throws IOException {
final HttpHeaders headers = entity.getHeaders();
request.getHeaders().putAll(headers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.apiguardian.api.API;
import org.springframework.http.HttpEntity;
import org.springframework.http.client.AsyncClientHttpRequest;
import org.springframework.http.client.ClientHttpRequest;

import java.io.IOException;

Expand All @@ -11,6 +11,6 @@
@API(status = STABLE)
interface MessageWriter {

<T> void write(AsyncClientHttpRequest request, HttpEntity<T> entity) throws IOException;
<T> void write(ClientHttpRequest request, HttpEntity<T> entity) throws IOException;

}
Loading