Spring WebFlux includes a reactive, non-blocking WebClient
for HTTP requests. The client
has a functional, fluent API with reactive types for declarative composition, see
web-reactive.adoc. WebFlux client and server rely on the
same non-blocking codecs to encode and decode request
and response content.
Internally WebClient
delegates to an HTTP client library. By default, it uses
Reactor Netty, there is built-in support for
the Jetty reactive HttpClient,
and others can be plugged in through a ClientHttpConnector
.
The simplest way to create a WebClient
is through one of the static factory methods:
-
WebClient.create()
-
WebClient.create(String baseUrl)
The above methods use the Reactor Netty HttpClient
with default settings and expect
io.projectreactor.netty:reactor-netty
to be on the classpath.
You can also use WebClient.builder()
with further options:
-
uriBuilderFactory
: CustomizedUriBuilderFactory
to use as a base URL. -
defaultHeader
: Headers for every request. -
defaultCookie
: Cookies for every request. -
defaultRequest
:Consumer
to customize every request. -
filter
: Client filter for every request. -
exchangeStrategies
: HTTP message reader/writer customizations. -
clientConnector
: HTTP client library settings.
The following example configures HTTP codecs:
WebClient client = WebClient.builder()
.exchangeStrategies(builder -> {
return builder.codecs(codecConfigurer -> {
//...
});
})
.build();
val webClient = WebClient.builder()
.exchangeStrategies { strategies ->
strategies.codecs {
//...
}
}
.build()
Once built, a WebClient
instance is immutable. However, you can clone it and build a
modified copy without affecting the original instance, as the following example shows:
WebClient client1 = WebClient.builder()
.filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate()
.filter(filterC).filter(filterD).build();
// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filterD
val client1 = WebClient.builder()
.filter(filterA).filter(filterB).build()
val client2 = client1.mutate()
.filter(filterC).filter(filterD).build()
// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filterD
Spring WebFlux configures limits for buffering data in-memory in codec to avoid application memory issues. By the default this is configured to 256KB and if that’s not enough for your use case, you’ll see the following:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer
You can configure this limit on all default codecs with the following code sample:
WebClient webClient = WebClient.builder()
.exchangeStrategies(builder ->
builder.codecs(codecs ->
codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
)
)
.build();
val webClient = WebClient.builder()
.exchangeStrategies { builder ->
builder.codecs {
it.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
}
}
.build()
To customize Reactor Netty settings, simple provide a pre-configured HttpClient
:
HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
val httpClient = HttpClient.create().secure { ... }
val webClient = WebClient.builder()
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
By default, HttpClient
participates in the global Reactor Netty resources held in
reactor.netty.http.HttpResources
, including event loop threads and a connection pool.
This is the recommended mode, since fixed, shared resources are preferred for event loop
concurrency. In this mode global resources remain active until the process exits.
If the server is timed with the process, there is typically no need for an explicit
shutdown. However, if the server can start or stop in-process (for example, a Spring MVC
application deployed as a WAR), you can declare a Spring-managed bean of type
ReactorResourceFactory
with globalResources=true
(the default) to ensure that the Reactor
Netty global resources are shut down when the Spring ApplicationContext
is closed,
as the following example shows:
@Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory();
}
@Bean
fun reactorResourceFactory() = ReactorResourceFactory()
You can also choose not to participate in the global Reactor Netty resources. However, in this mode, the burden is on you to ensure that all Reactor Netty client and server instances use shared resources, as the following example shows:
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false); // (1)
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper); // (2)
return WebClient.builder().clientConnector(connector).build(); // (3)
}
-
Create resources independent of global ones.
-
Use the
ReactorClientHttpConnector
constructor with resource factory. -
Plug the connector into the
WebClient.Builder
.
@Bean
fun resourceFactory() = ReactorResourceFactory().apply {
isUseGlobalResources = false // (1)
}
@Bean
fun webClient(): WebClient {
val mapper: (HttpClient) -> HttpClient = {
// Further customizations...
}
val connector = ReactorClientHttpConnector(resourceFactory(), mapper) // (2)
return WebClient.builder().clientConnector(connector).build() // (3)
}
-
Create resources independent of global ones.
-
Use the
ReactorClientHttpConnector
constructor with resource factory. -
Plug the connector into the
WebClient.Builder
.
To configure a connection timeout:
import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
import io.netty.channel.ChannelOption
val httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
val webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
To configure a read or write timeout:
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
// Create WebClient...
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
val httpClient = HttpClient.create()
.doOnConnected { conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10))
}
// Create WebClient...
To configure a response timeout for all requests:
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create WebClient...
val httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create WebClient...
To configure a response timeout for a specific request:
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest { httpRequest: ClientHttpRequest ->
val reactorRequest = httpRequest.getNativeRequest<HttpClientRequest>()
reactorRequest.responseTimeout(Duration.ofSeconds(2))
}
.retrieve()
.bodyToMono(String::class.java)
The following example shows how to customize Jetty HttpClient
settings:
HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);
WebClient webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
val httpClient = HttpClient()
httpClient.cookieStore = ...
val webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
By default, HttpClient
creates its own resources (Executor
, ByteBufferPool
, Scheduler
),
which remain active until the process exits or stop()
is called.
You can share resources between multiple instances of the Jetty client (and server) and
ensure that the resources are shut down when the Spring ApplicationContext
is closed by
declaring a Spring-managed bean of type JettyResourceFactory
, as the following example
shows:
@Bean
public JettyResourceFactory resourceFactory() {
return new JettyResourceFactory();
}
@Bean
public WebClient webClient() {
HttpClient httpClient = new HttpClient();
// Further customizations...
ClientHttpConnector connector =
new JettyClientHttpConnector(httpClient, resourceFactory()); (1)
return WebClient.builder().clientConnector(connector).build(); (2)
}
-
Use the
JettyClientHttpConnector
constructor with resource factory. -
Plug the connector into the
WebClient.Builder
.
@Bean
fun resourceFactory() = JettyResourceFactory()
@Bean
fun webClient(): WebClient {
val httpClient = HttpClient()
// Further customizations...
val connector = JettyClientHttpConnector(httpClient, resourceFactory()) // (1)
return WebClient.builder().clientConnector(connector).build() // (2)
}
-
Use the
JettyClientHttpConnector
constructor with resource factory. -
Plug the connector into the
WebClient.Builder
.
The following example shows how to customize Apache HttpComponents HttpClient
settings:
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
clientBuilder.setDefaultRequestConfig(...);
CloseableHttpAsyncClient client = clientBuilder.build();
ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client);
WebClient webClient = WebClient.builder().clientConnector(connector).build();
val client = HttpAsyncClients.custom().apply {
setDefaultRequestConfig(...)
}.build()
val connector = HttpComponentsClientHttpConnector(client)
val webClient = WebClient.builder().clientConnector(connector).build()
The retrieve()
method is the easiest way to get a response body and decode it.
The following example shows how to do so:
WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
val client = WebClient.create("https://example.org")
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.awaitBody<Person>()
You can also get a stream of objects decoded from the response, as the following example shows:
Flux<Quote> result = client.get()
.uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Quote.class);
val result = client.get()
.uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlow<Quote>()
By default, responses with 4xx or 5xx status codes result in an
WebClientResponseException
or one of its HTTP status specific sub-classes, such as
WebClientResponseException.BadRequest
, WebClientResponseException.NotFound
, and others.
You can also use the onStatus
method to customize the resulting exception,
as the following example shows:
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> ...)
.onStatus(HttpStatus::is5xxServerError, response -> ...)
.bodyToMono(Person.class);
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError) { ... }
.onStatus(HttpStatus::is5xxServerError) { ... }
.awaitBody<Person>()
When onStatus
is used, if the response is expected to have content, then the onStatus
callback should consume it. If not, the content will be automatically drained to ensure
resources are released.
The exchange()
method provides more control than the retrieve
method. The following example is equivalent
to retrieve()
but also provides access to the ClientResponse
:
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(Person.class));
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Person>()
At this level, you can also create a full ResponseEntity
:
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntity(Person.class));
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.toEntity<Person>()
Note that (unlike retrieve()
), with exchange()
, there are no automatic error signals for
4xx and 5xx responses. You have to check the status code and decide how to proceed.
Caution
|
Unlike |
The request body can be encoded from any asynchronous type handled by ReactiveAdapterRegistry
,
like Mono
or Kotlin Coroutines Deferred
as the following example shows:
Mono<Person> personMono = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(personMono, Person.class)
.retrieve()
.bodyToMono(Void.class);
val personDeferred: Deferred<Person> = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body<Person>(personDeferred)
.retrieve()
.awaitBody<Unit>()
You can also have a stream of objects be encoded, as the following example shows:
Flux<Person> personFlux = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(personFlux, Person.class)
.retrieve()
.bodyToMono(Void.class);
val people: Flow<Person> = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(people)
.retrieve()
.awaitBody<Unit>()
Alternatively, if you have the actual value, you can use the bodyValue
shortcut method,
as the following example shows:
Person person = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.bodyToMono(Void.class);
val person: Person = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.awaitBody<Unit>()
To send form data, you can provide a MultiValueMap<String, String>
as the body. Note that the
content is automatically set to application/x-www-form-urlencoded
by the
FormHttpMessageWriter
. The following example shows how to use MultiValueMap<String, String>
:
MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.bodyToMono(Void.class);
val formData: MultiValueMap<String, String> = ...
client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.awaitBody<Unit>()
You can also supply form data in-line by using BodyInserters
, as the following example shows:
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.bodyToMono(Void.class);
import org.springframework.web.reactive.function.BodyInserters.*
client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.awaitBody<Unit>()
To send multipart data, you need to provide a MultiValueMap<String, ?>
whose values are
either Object
instances that represent part content or HttpEntity
instances that represent the content and
headers for a part. MultipartBodyBuilder
provides a convenient API to prepare a
multipart request. The following example shows how to create a MultiValueMap<String, ?>
:
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request
MultiValueMap<String, HttpEntity<?>> parts = builder.build();
val builder = MultipartBodyBuilder().apply {
part("fieldPart", "fieldValue")
part("filePart1", new FileSystemResource("...logo.png"))
part("jsonPart", new Person("Jason"))
part("myPart", part) // Part from a server request
}
val parts = builder.build()
In most cases, you do not have to specify the Content-Type
for each part. The content
type is determined automatically based on the HttpMessageWriter
chosen to serialize it
or, in the case of a Resource
, based on the file extension. If necessary, you can
explicitly provide the MediaType
to use for each part through one of the overloaded
builder part
methods.
Once a MultiValueMap
is prepared, the easiest way to pass it to the WebClient
is
through the body
method, as the following example shows:
MultipartBodyBuilder builder = ...;
Mono<Void> result = client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.bodyToMono(Void.class);
val builder: MultipartBodyBuilder = ...
client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.awaitBody<Unit>()
If the MultiValueMap
contains at least one non-String
value, which could also
represent regular form data (that is, application/x-www-form-urlencoded
), you need not
set the Content-Type
to multipart/form-data
. This is always the case when using
MultipartBodyBuilder
, which ensures an HttpEntity
wrapper.
As an alternative to MultipartBodyBuilder
, you can also provide multipart content,
inline-style, through the built-in BodyInserters
, as the following example shows:
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.bodyToMono(Void.class);
import org.springframework.web.reactive.function.BodyInserters.*
client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.awaitBody<Unit>()
You can register a client filter (ExchangeFilterFunction
) through the WebClient.Builder
in order to intercept and modify requests, as the following example shows:
WebClient client = WebClient.builder()
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.build();
val client = WebClient.builder()
.filter { request, next ->
val filtered = ClientRequest.from(request)
.header("foo", "bar")
.build()
next.exchange(filtered)
}
.build()
This can be used for cross-cutting concerns, such as authentication. The following example uses a filter for basic authentication through a static factory method:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build();
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication
val client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build()
Filters apply globally to every request. To change a filter’s behavior for a specific
request, you can add request attributes to the ClientRequest
that can then be accessed
by all filters in the chain, as the following example shows:
WebClient client = WebClient.builder()
.filter((request, next) -> {
Optional<Object> usr = request.attribute("myAttribute");
// ...
})
.build();
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.bodyToMono(Void.class);
}
val client = WebClient.builder()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}.build()
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.awaitBody<Unit>()
You can also replicate an existing WebClient
, insert new filters, or remove already
registered filters. The following example, inserts a basic authentication filter at
index 0:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
WebClient
can be used in synchronous style by blocking at the end for the result:
Person person = client.get().uri("/person/{id}", i).retrieve()
.bodyToMono(Person.class)
.block();
List<Person> persons = client.get().uri("/persons").retrieve()
.bodyToFlux(Person.class)
.collectList()
.block();
val person = runBlocking {
client.get().uri("/person/{id}", i).retrieve()
.awaitBody<Person>()
}
val persons = runBlocking {
client.get().uri("/persons").retrieve()
.bodyToFlow<Person>()
.toList()
}
However if multiple calls need to be made, it’s more efficient to avoid blocking on each response individually, and instead wait for the combined result:
Mono<Person> personMono = client.get().uri("/person/{id}", personId)
.retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
Map<String, String> map = new LinkedHashMap<>();
map.put("person", person);
map.put("hobbies", hobbies);
return map;
})
.block();
val data = runBlocking {
val personDeferred = async {
client.get().uri("/person/{id}", personId)
.retrieve().awaitBody<Person>()
}
val hobbiesDeferred = async {
client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlow<Hobby>().toList()
}
mapOf("person" to personDeferred.await(), "hobbies" to hobbiesDeferred.await())
}
The above is merely one example. There are lots of other patterns and operators for putting together a reactive pipeline that makes many remote calls, potentially some nested, inter-dependent, without ever blocking until the end.
Note
|
With |
To test code that uses the WebClient
, you can use a mock web server, such as the
OkHttp MockWebServer. To see an example
of its use, check out
WebClientIntegrationTests
in the Spring Framework test suite or the
static-server
sample in the OkHttp repository.