Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting "ReactorDispatcher instance is closed" error when trying to close the azure service bus connection after receiving message #418

Open
Rawjyot opened this issue Oct 14, 2021 · 14 comments

Comments

@Rawjyot
Copy link

Rawjyot commented Oct 14, 2021

Actual Behavior

  1. When I read a message from azure service bus using ServiceBusReceiverClient, I am successfully receiving the message, but when I try to close the connection it says ReactorDispatcher instance is closed as an error.
  2. I am using receiver.close() to close the connection after processing the received messages.
02:05:09,514 | ERROR | boundedElastic-6 |  -  -  |  ReactorDispatcher instance is closed.
02:05:09,514 | ERROR | parallel-1       |  -  -  |  ReactorDispatcher instance is closed.

Expected Behavior

  1. I should be able to close the connections after receiving the message successfully.

I am using:

<spring.boot.version>2.4.11</spring.boot.version>
<azure-messaging-servicebus>7.4.1</azure-messaging-servicebus>

Versions

  • OS platform and version: Mac OS 11.6
  • Maven package version or commit ID: Apache Maven 3.2.5
@anuchandy
Copy link
Member

@Rawjyot, thanks for sharing the observation. We have already noticed via many git-issues that this log message with the "error" level is misleading users. The level of this log message changed to"warning" in 7.4.2 (in Oct release).

There are multiple threads running in parallel; hence it's possible that when the connection close is in progress, another thread may attempt to "dispatch" work to this connection, those attempts will be dropped with this "warning".

@lordarcy
Copy link

@anuchandy the problem we're having is that after receiving this warning message subsequent attempts to retrieve a message with a new ServiceBusReceiverClient will fail for several minutes and generate the same error message. Eventually an attempt is successful but, after polling a few times we get the same error again when calling close().

You mentioned the library had multiple threads running in parallel. When our code is closing and destroying a ServiceBusReceiverClient instance and then shortly creating a new instance could we be instead getting the same instance that was previously closed but for some reason not destroyed? Could the ServiceBusClientBuilder factory be reusing an already closed instance?

This behavior is not seen using the older 7.1.0 version of azure-messaging-servicebus.

@anuchandy
Copy link
Member

I see; yes Jim, ServiceBusClientBuilder shares a few resources among the clients built from it. It is possible that after the closing of the first client but before the completion of underlying async resources cleanup, a newing up of the second client might be getting the endpoint from the cache. Due to the async cleanup/notification, it takes some time for the second client to detect it.

Can you unblock for now by using a new builder ?

@lordarcy
Copy link

@anuchandy the code is using a new ServiceClientBuilder every time:

receiver = new ServiceBusClientBuilder()
.connectionString(azureServiceBusConfig.azureBusConnectionString)
.receiver()
.disableAutoComplete()
.topicName(azureServiceBusConfig.azureBusTopicName)
.subscriptionName(azureServiceBusConfig.azureBusSubscriptionName)
.buildClient();

IterableStream messages = receiver.receiveMessages(1, Duration.ofSeconds(5));

// Examine and act on any message

if (message != null) {
receiver.complete(message);
}
if (receiver != null) {
receiver.close();
}

@anuchandy
Copy link
Member

Hi @lordarcy, I tried to repro the behavior using the code below, where we can see "create-client, receive-message, and close-client" in a loop (10 times).

import java.time.Duration;
import java.time.OffsetDateTime;
import  com.azure.messaging.servicebus.ServiceBusClientBuilder;
import  com.azure.messaging.servicebus.ServiceBusReceiverClient;
import  com.azure.messaging.servicebus.ServiceBusReceivedMessage;

public class ReceiveCloseLoopSample {
    public static void main(String[] args) {
        final String connectionString = System.getenv("CON_STR");
        final String queueName = System.getenv("QUEUE_NAME");

        // Loop for 10 times.
        for (int i = 0; i < 10; i++) {

            System.out.println(OffsetDateTime.now() + ": Creating Client#" + i);
            ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .receiver()
                .disableAutoComplete()
                .queueName(queueName)
                .buildClient();

            IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(1, Duration.ofSeconds(5));
            messages.forEach(message -> {
                System.out.println(OffsetDateTime.now() + ": Received Message [seq_number:" + message.getSequenceNumber() + "]");
                if (message != null) {
                    try {
                        receiver.complete(message);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                    System.out.println(OffsetDateTime.now() + ": complete call finished for Message [seq_number:" + message.getSequenceNumber() + "]");
                }
            });

            System.out.println(OffsetDateTime.now() + ": Closing Client#" + i);
            receiver.close();
            System.out.println("");
        }
    }
}

But I don't see the issue you mentioned happening, the output of the execution looks like this

2021-11-10T09:12:12.401284-08:00: Creating Client#0
2021-11-10T09:12:14.074138-08:00: Received Message [seq_number:6]
2021-11-10T09:12:14.260127-08:00: complete call finished for Message [seq_number:6]
2021-11-10T09:12:14.260669-08:00: Closing Client#0

2021-11-10T09:12:14.272972-08:00: Creating Client#1
2021-11-10T09:12:14.613589-08:00: Received Message [seq_number:7]
2021-11-10T09:12:14.660616-08:00: complete call finished for Message [seq_number:7]
2021-11-10T09:12:14.660782-08:00: Closing Client#1

2021-11-10T09:12:14.662825-08:00: Creating Client#2
2021-11-10T09:12:14.967993-08:00: Received Message [seq_number:8]
2021-11-10T09:12:15.006407-08:00: complete call finished for Message [seq_number:8]
2021-11-10T09:12:15.006554-08:00: Closing Client#2

2021-11-10T09:12:15.008189-08:00: Creating Client#3
2021-11-10T09:12:15.318079-08:00: Received Message [seq_number:9]
2021-11-10T09:12:15.374040-08:00: complete call finished for Message [seq_number:9]
2021-11-10T09:12:15.374187-08:00: Closing Client#3

2021-11-10T09:12:15.375971-08:00: Creating Client#4
2021-11-10T09:12:15.668797-08:00: Received Message [seq_number:10]
2021-11-10T09:12:15.718561-08:00: complete call finished for Message [seq_number:10]
2021-11-10T09:12:15.718686-08:00: Closing Client#4

2021-11-10T09:12:15.719595-08:00: Creating Client#5
2021-11-10T09:12:16.004302-08:00: Received Message [seq_number:11]
2021-11-10T09:12:16.049758-08:00: complete call finished for Message [seq_number:11]
2021-11-10T09:12:16.049846-08:00: Closing Client#5

2021-11-10T09:12:16.050655-08:00: Creating Client#6
2021-11-10T09:12:16.346686-08:00: Received Message [seq_number:12]
2021-11-10T09:12:16.383272-08:00: complete call finished for Message [seq_number:12]
2021-11-10T09:12:16.383415-08:00: Closing Client#6

2021-11-10T09:12:16.385344-08:00: Creating Client#7
2021-11-10T09:12:16.625618-08:00: Received Message [seq_number:13]
2021-11-10T09:12:16.660944-08:00: complete call finished for Message [seq_number:13]
2021-11-10T09:12:16.661100-08:00: Closing Client#7

2021-11-10T09:12:16.662135-08:00: Creating Client#8
2021-11-10T09:12:16.901708-08:00: Received Message [seq_number:14]
2021-11-10T09:12:16.945340-08:00: complete call finished for Message [seq_number:14]
2021-11-10T09:12:16.945430-08:00: Closing Client#8

2021-11-10T09:12:16.946211-08:00: Creating Client#9
2021-11-10T09:12:17.161688-08:00: Received Message [seq_number:15]
2021-11-10T09:12:17.204560-08:00: complete call finished for Message [seq_number:15]
2021-11-10T09:12:17.204756-08:00: Closing Client#9

Could you please double check my code and also see if this is how your application is attempting to use the client.

@lordarcy
Copy link

I see three main differences:

  1. The class that is instantiating the ServiceBusClientBuilder is in turn getting instantiated each time a polling transaction occurs. This happens because the polling is started from an Apache Camel route that calls a Processor which in turn creates a new MessageReceiver. The MessageReceiver has a try/catch block containing the code from comment Getting "ReactorDispatcher instance is closed" error when trying to close the azure service bus connection after receiving message #418 (comment). This pattern is used because each polling transaction is supposed to be stateless.
  2. If a message is not found then a new ServiceBusClientBuilder is created with the subscription name for the DLQ and a similar call to receiveMessages is made to poll the DLQ.
  3. Each polling transaction has a fifteen second timer before starting a new transaction.

@anuchandy
Copy link
Member

Thanks, I'm trying to map your points back to code -

so you have a concept known as "polling transaction." Each time the "polling transaction" executes, it "create-client instance, receive a message, and close-client instance", meaning there is a 1:1 relationship between a client instance and an execution of "polling transaction".

Isn't the sample code I posted doing, where one iteration of the loop corresponds to one execution of "polling transaction."?

The part missing in the my sample code is - If we find that no message was received, then the code should create a receiver client for DLQ then attempt to receive a message from it? Is this understanding correct?

@lordarcy
Copy link

The two things missing from your sample are:

  • The time difference between each ServiceBusClientBuilder getting created
  • Because our code is coming from a new Camel route for each poll it's also likely using a different thread each time the pair of ServiceBusClientBuilder instances (main queue and DLQ) are getting created

@anuchandy
Copy link
Member

@lordarcy Sorry for the late follow-up, Is it possible for your team to update the code I shared so far to match the way you are using the library, so that this can be reproduced on our end? I've pushed the code to git we've been discussing above to here https://github.com/anuchandy/attempt-repro-polling-bug. It can be cloned, ready to run once you set the connection string and queue in the env vars. You and Rowjyot have access to the git.

@lordarcy
Copy link

@Rawjyot is this something you can try?

@Rawjyot
Copy link
Author

Rawjyot commented Dec 11, 2021 via email

@lordarcy
Copy link

@Rawjyot have you had a chance to try updating the code sample with the library to try and reproduce the problem?

@Rawjyot
Copy link
Author

Rawjyot commented Dec 20, 2021

@anuchandy can you please provide me access to https://github.com/anuchandy/attempt-repro-polling-bug again? I tried accessing it and it says the invitation has expired.

@Rawjyot
Copy link
Author

Rawjyot commented Dec 23, 2021

@anuchandy I don't have access to the above private repository you shared, meanwhile, @lordarcy shared the code base with zip. I have gone through the codebase, here are a few observations.

  1. The code shared in the above repo is using ServiceBusReceiverClient to poll the queues and the actual code with the issue is using ServiceBusClientBuilder to poll the topics.
  2. The above codebase is not using spring boot as boilerplate and the actual code with the issue is using the latest version of spring boot as boilerplate.
  3. The above codebase is just calling the queues on the single thread but the actual code with the issue is calling the topics to read messages in asynchronous fashion, which means it will have multi threads and multiple threads will be closed, which lands in the actual issue.

I will try to replicate these parameters and provide you with updated code. cc: @lordarcy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants