Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver.consumeManualAck does not seem compatible with take(int) reactor operator #176

Open
chibenwa opened this issue Aug 1, 2022 · 0 comments
Labels
type/bug A general bug

Comments

@chibenwa
Copy link
Contributor

chibenwa commented Aug 1, 2022

As part of the Apache James project, in some of our tests we want to dequeue only some items from a rabbitMQ queue while still keeping the others enqueue.

In order to achieve this, our first approach was to use the take(int) operator.

This proves to be unstable.

Expected Behavior

I expect to be able to use the take operator out of reactor-rabbitmq library primitives.

I expect violations of the TCK to be well documented when not achievable with a list of impcted oerators, if this issue cannot be fixed.

Actual Behavior

take operator is buggy, and this limitation is undocumented.

Steps to Reproduce

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTakeAndFlatMap() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .flatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Sometime fails by ignoring some elements...

Also fails with contatMap.

        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTake() throws Exception {
            ReceiverProvider receiverProvider = rabbitMQExtension.getReceiverProvider();
            int counter = 5;
            CountDownLatch countDownLatch = new CountDownLatch(counter);

            Flux.using(receiverProvider::createReceiver,
                    receiver -> receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions()),
                    Receiver::close)
                .filter(getResponse -> getResponse.getBody() != null)
                .take(counter)
                .concatMap(acknowledgableDelivery -> Mono.fromCallable(() -> {
                    acknowledgableDelivery.ack(true);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                    return acknowledgableDelivery;
                }).subscribeOn(Schedulers.elastic()))
                .subscribe();

            assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
        }

=> Fails reliably with:

09:03:46.066 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:439)
	at reactor.rabbitmq.AcknowledgableDelivery.basicAck(AcknowledgableDelivery.java:110)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:62)
	... 10 common frames omitted
Wrapped by: reactor.rabbitmq.RabbitFluxException: Not retryable exception, cannot retry
	at reactor.rabbitmq.ExceptionHandlers$SimpleRetryTemplate.retry(ExceptionHandlers.java:125)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:143)
	at reactor.rabbitmq.ExceptionHandlers$RetryAcknowledgmentExceptionHandler.accept(ExceptionHandlers.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.retry(AcknowledgableDelivery.java:130)
	at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:64)
	at org.apache.james.backends.rabbitmq.RabbitMQTest$ConcurrencyTest.lambda$consumingShouldSuccessWhenAckConcurrentWithFluxTake$7(RabbitMQTest.java:665)
	at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
	at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:227)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

Possible Solution

No idea here. We complexified our testing code to leverage this situation...

Your Environment

Reactor versions: (BOM) 2020.0.19
JVM 11
OS Ubuntu 2020.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants