-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
by default reactor rabbit is not setup well for rabbitmq connection loss #142
Comments
Sorry for the delay, I was away for a few weeks. Can you be more specific about the scenarios you'd like to cover in case of connection/channel failure? Possible network problems are handled in different ways in different locations of the code, with (I guess) reasonable defaults. We had long discussions about retries and came to the conclusion that a default behavior that would satisfy everyone or even 90 % of the cases is impossible to find. This is why we introduced some settings to configure it (or something else) on a case-by-case basis, e.g. for connection creation. The case you mention is by default protected from channel error/closing with the Please provide samples or even better failing tests to explain that you'd like the library to do for the scenarios you're thinking of. |
My objective was to have a Flux that can survive RabbitMQ cluster crush/restart (all nodes) and just resume streaming when Rabbit is back up (auto-recover from the RabbitMQ failures). Basically needed to keep retrying the connection, redeclaring the topology (if it's not durable), etc. AutoRecoverable connection did not really work well for me in this scenario.
public static Mono<Connection> connectionMono(final ConnectionFactory connectionFactory, String connectionName) {
AtomicReference<Connection> connectionHolder = new AtomicReference<>();
Mono<Connection> result = Mono.create(sink -> {
try {
Connection conn = connectionHolder.updateAndGet(c -> {
try {
return c != null && c.isOpen() ? c : connectionFactory.newConnection(connectionName);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (conn == null) {
sink.error(new RuntimeException("Failed to create RabbitMq connection"));
}
sink.success(conn);
} catch (Exception e) {
sink.error(e);
}
});
return result;
}
protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
channel.addShutdownListener(reason -> {
// no check here, this will send complete Flux signal on any Socket problem with the connection
emitter.complete();
});
}
// convert Complete signal to EOF error, retry will handle all errors including the EOF.
flux = flux.flatMap(Mono::just, Mono::error, () -> Mono.error(new EOFException()));
// when connection to Rabbit is lost retry with new connection
flux = flux.retryWhen(retryConnection()); // retryConnection has exponential back-off logic, etc. All together this creates a robust Flux that survives Rabbit failures and auto-recovers the processing. I recommend trying any changes against a local RabbitMQ node that you can kill and restart, see what actually happens with the message processing. |
You lose the benefits of topology recovery, how did you handle it?
This is pretty neat, thanks for sharing.
Yes, the original condition checks assume automatic connection recovery is enabled. We could check if the connection is a
Again, retry is very application-specific, I think the documentation covers a reasonable way to deal with connection retry.
I see your point, I'd say the default behavior provides a reasonably robust mechanism, and the possibility to provide the
The test suite has some recovery tests, but it only closes connections, so a node restart could trigger some slightly different behavior. Feel free to experiment with tests for now (the test suite needs a system property to know how to call |
The topology declaration (re-declaration) was part of the chain. Flux flux = declareTopology(sender).flatMapMany(ok -> receiver.consumeManualAck(queueName, consumeOptions)).flatMap(Mono::just, Mono::error, () -> Mono.error(new EOFException())).retryWhen(retryConnection())... So basically declaration is part of the retry. |
So may be at least do not cache() permanently non-recoverable connections and channels. And I think that Receiver also assumes the auto recoverable connection and does not work well when RabbitMQ node is killed:
|
I created a follow-up issue #143. I hope to be able to work on it in the new few weeks or feel free to submit a PR. |
@a701440 I pushed a change for this one. Not caching non-recoverable connections and channels is not possible with |
Just want to +1 here to say that I encountered the same problem. I have implemented a solution that builds upon the approach from @a701440. Specifically, it supports a callback to explicitly close a connection (rather than relying just on /**
* Reference to the current active rabbit {@link Connection}.
*/
private static class ConnectionReference {
private final AtomicReference<Connection> connectionRef = new AtomicReference<>();
private final ConnectionFactory connectionFactory;
private final String connectionName;
private final Duration closeTimeout;
private ConnectionReference(ConnectionFactory connectionFactory, String connectionName, Duration closeTimeout) {
this.connectionFactory = Objects.requireNonNull(connectionFactory, "connectionFactory must not be null")
this.connectionName = Objects.requireNonNull(connectionName, "connectionName must not be null");
this.closeTimeout = Objects.requireNonNull(closeTimeout, "closeTimeout must not be null");
}
/**
* Returns the current active rabbit {@link Connection}.
*
* <p>If no connection is currently active, then creates and
* returns a new {@link Connection}, and sets it as the active connection.</p>
*
* @return the current active rabbit {@link Connection}.
* @throws Exception if there was a problem retrieving/creating a connection
*/
public Connection get() throws Exception {
try {
return connectionRef.updateAndGet(existingConnection -> {
try {
if (existingConnection != null) {
if (existingConnection.isOpen()) {
return existingConnection;
}
closeConnection(existingConnection);
}
return connectionFactory.newConnection(connectionName);
} catch (Exception e) {
throw Exceptions.propagate(e);
}
});
} catch (RuntimeException e) {
throw (Exception) Exceptions.unwrap(e);
}
}
/**
* Closes the current active rabbit {@link Connection} (if any).
*
* <p>Causes the next call to {@link #get()} to create a new rabbit {@link Connection}.</p>
*/
public void close() {
Connection oldConnection = connectionRef.getAndSet(null);
if (oldConnection != null) {
closeConnection(oldConnection);
}
}
private void closeConnection(Connection connection) {
try {
connection.close((int) closeTimeout.toMillis());
} catch (ShutdownSignalException e) {
// ignore
} catch (IOException e) {
LOGGER.warn("Exception occurred while closing amqp connection: {}", connection, e);
}
}
} My connectionMono looks like this: Mono.fromCallable(connectionRef::get)
.subscribeOn(connectionSubscriptionScheduler) And my repeat/retry logic looks like this: // ... snip .... Upstream declares queues, and sends/receives messages, via Sender/Receiver.
/*
* Retry if there was a failure.
* This can occur if rabbit is not available on startup,
* or if the connection breaks.
*/
.retryWhen(Retry
.backoff(Integer.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30))
.doBeforeRetry(retrySignal -> {
LOGGER.warn(
"Restarting after exception. Retry# {}",
retrySignal.totalRetriesInARow() + 1,
retrySignal.failure());
connectionRef.close();
}))
/*
* If the upstream completes, but we have not stopped yet, then resubscribe.
* The resubscribe will cause the queue to be recreated since the queues are declared upstream.
*
* The upstream will complete if the queue is deleted while this stream is consuming from it.
*/
.repeatWhen(Repeat.onlyIf(context -> state != State.STOPPED)
.doOnRepeat(context -> {
LOGGER.warn(
"Restarting after completion. Restart# {}",
context.iteration());
connectionRef.close();
})) |
I noticed that Sender and Receiver components unconditionally cache connections and channels . his makes it harder to write code that is resilient when socket connections to Rabbit are closed and/or go bad.
For example in Sender:
connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).transform(this::cache)
Unless the resourceManagementChannelMono is passed with options
the default channel is created and cached. It does not react well in the situations when connections are severed, etc.
Ideally it needs to detect that cached channel is bad and re-subscribe to the connection Mono, etc.
The same situation exists with connection Mono in other places.
The text was updated successfully, but these errors were encountered: