-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Migration Guides
As with every release cycle, APIs that have been marked as @Deprecated
in the past have been evaluated for removal according to our deprecation policy.
Users should already have taken these deprecations into consideration, but they are listed here again just in case:
-
Kotlin extensions: these now live in the
reactor-kotlin-extensions
library -
Mono SuccessOrError side effects
-
The
Mono.doOnSuccessOrError
recommended alternatives includedoOnNext(Consumer)
,doOnError(Consumer)
,doOnTerminate(Runnable)
anddoOnSuccess(Consumer)
. -
The
Mono.doAfterSuccessOrError
recommended alternatives aredoAfterTerminate(Runnable)
anddoFinally(Consumer)
.
-
-
Some context-related operators
-
deferWithContext(Function)
→ usedeferContextual(Function)
instead. -
subscriberContext(Context)
andsubscriberContext(Function)
→ usecontextWrite(ContextView)
andcontextWrite(Function)
instead. -
Signal::getContext()
→ useSignal::getContextView()
instead. -
Context::putAll(Context)
→ useContext::putAll(ContextView)
instead.
-
-
Elastic Scheduler
-
Use BoundedElasticScheduler (
Schedulers.boundedElastic()
) instead.
-
These will be removed in 3.6.0 at the earliest:
-
FluxSink
,MonoSink
,SynchronousSink
#currentContext()
: use#contextView()
instead. -
Scheduler#start()
:#init()
method should be used instead. Restart capability is discouraged and the new method is allowed to throw in case of initializing a disposed instance. Please create a newScheduler
instance when necessary. -
Flux#metrics()
,Mono#metrics()
,Schedulers#enableMetrics()
,Metrics
: Metrics-related operators have been deprecated inreactor-core
in favor of newly introducedreactor-core-micrometer:1.0.0
module (see Notable New Features below)
-
Most
Mono
source and aggregating operators are now lazier: they defer requesting from the upstream until they have themselves received arequest(long)
. -
Operators that had an alternative behavior option introduced in the previous release train now use this behavior as the default:
-
switchOnNext
,switchMap
,concatMap
now default to 0-prefetch unless a prefetch amount is explicitly provided. -
take(n)
now onlyrequest(n)
from the upstream. Usetake(n,false)
to get the old behavior of requesting an unbounded amount.
-
-
Windowing operators that receive an error from the upstream main sequence notify their open windows with a
SourceException
.-
This exception wraps the original exception as the
#getCause()
. -
It allows to distinguish between an upstream error and and error in the window consumption logic.
-
-
Future Cancellation: Converting a
CompletionStage
that is also aFuture
to aMono
now ensures that cancelling theMono
also cancels theFuture
.-
For
CompletableFuture
one can opt-out of this by using theMono.fromFuture(CompletableFuture, boolean)
variant.
-
-
Fusion Negotiation in some Mono operators:
-
A number of
Mono
operators derived from theOperators.MonoSubscriber
have stopped negotiating fusion (even though theirPublisher
still implementsFuseable
). -
When downstream tries to negotiate fusion by calling
requestFusion(int)
with any mode, these operators always replyNONE
. -
The highest impact is likely to be on tests that validate the fusion negotiation (eg.
StepVerifier’s `expectFusion(…)
).
-
The tap
operator is a side-effect operator that deals with all signals / side-effects in a single class, SignalListener
.
It enables the creation of one SignalListener
instance per subscription by providing a factory, and that instance will be reused throughout the lifecycle of the subscription.
SignalListener
instances can thus store state and react to new signals differently depending on said state.
This is a generalisation of the implementation of the now deprecated Flux#metrics()
operator (see below).
The Flux#metrics()
and Mono#metrics()
operators are now deprecated in favor of using the new reactor-core-micrometer
module.
Their replacement builds upon the tap
operator introduced above, combined with an off-the-shelf SignalListener
factory provided in the module: Micrometer.metrics(MeterRegistry, String)
.
This approach stops relying on an intrinsic dependency on the MeterRegistry#globalRegistry()
without any way of using a different registry.
Instead, it requires that a MeterRegistry
be explicitly provided.
Metrics names and tag names have been reworked to follow conventions and recommendations from the Micrometer team.
The name
and tag
operators are still detected upstream for meter name prefixing and additonal tagging.
See the Exposing Reactor metrics section for more details on how to use this, and Micrometer.metrics() section for a list of meter and tag names.
The Schedulers.enableMetrics()
method is now deprecated in favor of using the new reactor-core-micrometer
module (see the Exposing Reactor metrics section).
The new paradigm consist in instrumenting Scheduler
instance on a case by case basis, using Micrometer.timedScheduler
method.
The previous approach was flawed in several ways:
-
intrinsic dependency on the
MeterRegistry#globalRegistry()
without any way of using a different registry. -
wrong level of abstraction and limited instrumentation:
-
it was not the schedulers themselves that were instrumented, but individual
ExecutorService
instances assumed to back the schedulers. -
schedulers NOT backed by any
ExecutorService
couldn’t be instrumented. -
schedulers backed by MULTIPLE
ExecutorService
(eg. a pool of workers) would produce multiple levels of metrics difficult to aggregate. -
instrumentation was all-or-nothing, potentially polluting metrics backend with metrics from global or irrelevant schedulers.
-
A deliberate constraint of the new approach is that each Scheduler
must be explicitly wrapped, which ensures that the correct MeterRegistry
is used and that metrics are recognizable and aggregated for that particular Scheduler
(thanks to the mandatory metricsPrefix
).
See Micrometer.timedScheduler() for a list of meter and tag names.
The context-propagation
library is a new and important part of the Reactor and Micrometer backbone to allow bridging between reactive and ThreadLocal
contextual metadata.
Reactor supports this bridging as soon as the library is present at runtime.
Rather than storing and restoring ThreadLocal
values in the Reactor Context
manually, users are encouraged to rely on context-propagation
and the supporting operators in Reactor:
-
register a
ThreadLocalAccessor
for your relevant custom thread locals -
capture all registered thread locals via
Flux|Mono
#contextCapture()
-
upstream in the chain, find captured threadlocal values in the
ContextView
with the usual context-reading operators -
alternatively, any
handle
ortap
operator upstream in the chain will restore relevantThreadLocal
around their handling function orSignalListener
methods
Read more in the Context-Propagation Support section.
This feature allows frameworks and libraries to define a single call site as a point of observation, and Micrometer decouples that observation call site from the actual steps to perform: lightweight metrics, tracing, logging…
See Micrometer’s documentation and read more in the Observation section. See also Micrometer.observation() for a list of observation meter and tag names.
These notes are relevant if you implement your own custom Scheduler
.
start()
is deprecated and Schedulers are encouraged to phase out support for restarting a disposed Scheduler
.
The simplest (and default) implementation for the init()
method is likely to be the same as start()
, but implementors are encouraged to have it detect that the scheduler has been disposed (via dispose
or disposeGracefully
) and throw an IllegalStateException
.
This new method defaults to just calling Mono.fromRunnable(this::dispose)
, but implementors are encouraged to add support for true graceful shutdown (eg. calling ExecutorService#shutdown()
rather than shutdownNow()
on underlying `ExecutorService`s).
Once initiated, the disposal cannot be cancelled and the Scheduler
should be considered unusable and MUST reject new tasks.
The returned Mono<Void>
SHOULD be the same in case of multiple concurrent calls (ie. all callers get the notification that graceful disposal has succeeded, at the same time).
It MUST support cancellation via timeout(Duration)
as well as retry(…)
attempts, but these only relate to the question "has the Scheduler
finished shutting down gracefully?" (give the scheduler x seconds to shutdown gracefully, retry to give it a bit more time, fallback to hard shutdown, etc…).
Dealing with concurrent calls to dipose
/disposeGracefully
can be tricky, especially given the above constraints.
Internally, vanilla Reactor schedulers use a state object SchedulerState
and DisposeAwaiter
to support this.
These classes are private, but you can have a look at the source code for inspiration.