From 2febc51f126ab30d19002685a713ce3d994100a2 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Thu, 1 Aug 2024 16:44:04 -0600 Subject: [PATCH] fix: fixing threading issue with unsubscribe Signed-off-by: Matt Peterson --- .../consumer/ConsumerBlockItemObserver.java | 22 +++---- .../mediator/LiveStreamMediatorImpl.java | 16 +++-- .../block/server/BlockStreamServiceIT.java | 63 +++++++++++-------- .../mediator/LiveStreamMediatorImplTest.java | 12 +++- 4 files changed, 68 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 153bc50a..46ab9a71 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -111,18 +111,16 @@ public ConsumerBlockItemObserver( public void onEvent( final ObjectEvent event, final long l, final boolean b) { - final long currentMillis = producerLivenessClock.millis(); - if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) { - streamMediator.unsubscribe(this); - LOGGER.log( - System.Logger.Level.DEBUG, - "Unsubscribed ConsumerBlockItemObserver due to producer timeout"); - } else { - - // Only send the response if the consumer has not cancelled - // or closed the stream. - if (isResponsePermitted.get()) { - + // Only send the response if the consumer has not cancelled + // or closed the stream. + if (isResponsePermitted.get()) { + final long currentMillis = producerLivenessClock.millis(); + if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) { + streamMediator.unsubscribe(this); + LOGGER.log( + System.Logger.Level.DEBUG, + "Unsubscribed ConsumerBlockItemObserver due to producer timeout"); + } else { // Refresh the producer liveness and pass the BlockItem to the downstream observer. producerLivenessMillis = currentMillis; diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 7870e908..eddbd4b6 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -153,13 +153,19 @@ public void subscribe(final EventHandler> h public void unsubscribe(final EventHandler> handler) { // Remove the subscriber - final var batchEventProcessor = subscribers.remove(handler); + if (subscribers.containsKey(handler)) { - // Stop the processor - batchEventProcessor.halt(); + final var batchEventProcessor = subscribers.remove(handler); - // Remove the gating sequence from the ring buffer - ringBuffer.removeGatingSequence(batchEventProcessor.getSequence()); + // Stop the processor + batchEventProcessor.halt(); + + // Remove the gating sequence from the ring buffer + ringBuffer.removeGatingSequence(batchEventProcessor.getSequence()); + + } else { + LOGGER.log(System.Logger.Level.ERROR, "Subscriber not found: {0}", handler); + } } @Override diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 094c3241..a6344120 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -86,6 +86,8 @@ public class BlockStreamServiceIT { private Path testPath; private Config testConfig; + private static final int testTimeout = 100; + @BeforeEach public void setUp() throws IOException { testPath = Files.createTempDirectory(TEMP_DIR); @@ -103,11 +105,11 @@ public void tearDown() { @Test public void testPublishBlockStreamRegistrationAndExecution() - throws InterruptedException, IOException, NoSuchAlgorithmException { + throws IOException, NoSuchAlgorithmException { final BlockStreamService blockStreamService = new BlockStreamService( - 50L, + 1500L, new ItemAckBuilder(), streamMediator, blockPersistenceHandler, @@ -131,21 +133,22 @@ public void testPublishBlockStreamRegistrationAndExecution() PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); // Verify the BlockItem message is sent to the mediator - verify(streamMediator, timeout(50).times(1)).publishEvent(blockItem); + verify(streamMediator, timeout(testTimeout).times(1)).publishEvent(blockItem); // Verify our custom StreamObserver implementation builds and sends // a response back to the producer - verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(publishStreamResponse); + verify(publishStreamResponseObserver, timeout(testTimeout).times(1)) + .onNext(publishStreamResponse); // Close the stream as Helidon does streamObserver.onCompleted(); // verify the onCompleted() method is invoked on the wrapped StreamObserver - verify(publishStreamResponseObserver, timeout(50).times(1)).onCompleted(); + verify(publishStreamResponseObserver, timeout(testTimeout).times(1)).onCompleted(); } @Test - public void testSubscribeBlockStream() throws InterruptedException { + public void testSubscribeBlockStream() { final ServiceStatus serviceStatus = new ServiceStatusImpl(); serviceStatus.setWebServer(webServer); @@ -185,13 +188,16 @@ public void testSubscribeBlockStream() throws InterruptedException { final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build(); - verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse); - verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse); - verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); } @Test - public void testFullHappyPath() throws IOException, InterruptedException { + public void testFullHappyPath() throws IOException { int numberOfBlocks = 100; final BlockStreamService blockStreamService = buildBlockStreamService(); @@ -226,7 +232,7 @@ public void testFullHappyPath() throws IOException, InterruptedException { } @Test - public void testFullWithSubscribersAddedDynamically() throws IOException, InterruptedException { + public void testFullWithSubscribersAddedDynamically() throws IOException { int numberOfBlocks = 100; @@ -296,7 +302,7 @@ public void testFullWithSubscribersAddedDynamically() throws IOException, Interr } @Test - public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedException { + public void testSubAndUnsubWhileStreaming() throws IOException { int numberOfBlocks = 100; @@ -384,8 +390,7 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep } @Test - public void testMediatorExceptionHandlingWhenPersistenceFailure() - throws IOException, InterruptedException { + public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException { final Map< EventHandler>, BatchEventProcessor>> @@ -444,9 +449,12 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() // before the IOException was thrown. final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build(); - verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse); - verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse); - verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) + .onNext(subscribeStreamResponse); // Verify all the consumers received the end of stream response // TODO: Fix the response code when it's available @@ -456,9 +464,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() SubscribeStreamResponse.SubscribeStreamResponseCode .READ_STREAM_SUCCESS) .build(); - verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(endStreamResponse); - verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(endStreamResponse); - verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(endStreamResponse); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(endStreamResponse); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(endStreamResponse); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(endStreamResponse); // Verify all the consumers were unsubscribed for (final var s : subscribers.keySet()) { @@ -473,8 +481,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() .build(); final var endOfStreamResponse = PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); - verify(publishStreamResponseObserver, timeout(50).times(2)).onNext(endOfStreamResponse); - verify(webServer, timeout(50).times(1)).stop(); + verify(publishStreamResponseObserver, timeout(testTimeout).times(2)) + .onNext(endOfStreamResponse); + verify(webServer, timeout(testTimeout).times(1)).stop(); // Now verify the block was removed from the file system. final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); @@ -489,7 +498,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() SingleBlockResponse.SingleBlockResponseCode .READ_BLOCK_NOT_AVAILABLE) .build(); - verify(singleBlockResponseStreamObserver, timeout(50).times(1)) + verify(singleBlockResponseStreamObserver, timeout(testTimeout).times(1)) .onNext(expectedSingleBlockNotAvailable); // TODO: Fix the response code when it's available @@ -499,7 +508,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() SubscribeStreamResponse.SubscribeStreamResponseCode .READ_STREAM_SUCCESS) .build(); - verify(subscribeStreamObserver4, timeout(50).times(1)) + verify(subscribeStreamObserver4, timeout(testTimeout).times(1)) .onNext(expectedSubscriberStreamNotAvailable); } @@ -535,9 +544,9 @@ private static void verifySubscribeStreamResponse( final SubscribeStreamResponse stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); - verify(streamObserver, timeout(50).times(1)).onNext(headerSubStreamResponse); - verify(streamObserver, timeout(50).times(8)).onNext(bodySubStreamResponse); - verify(streamObserver, timeout(50).times(1)).onNext(stateProofStreamResponse); + verify(streamObserver, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); + verify(streamObserver, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); + verify(streamObserver, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); } } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index bb80d6b3..26c9c230 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -60,7 +60,8 @@ public class LiveStreamMediatorImplTest { private final long TEST_TIME = 1_719_427_664_950L; @Test - public void testUnsubscribeEach() { + // @Disabled + public void testUnsubscribeEach() throws InterruptedException { final var streamMediator = new LiveStreamMediatorImpl( @@ -81,6 +82,8 @@ public void testUnsubscribeEach() { streamMediator.isSubscribed(observer3), "Expected the mediator to have observer3 subscribed"); + Thread.sleep(50L); + streamMediator.unsubscribe(observer1); assertFalse( streamMediator.isSubscribed(observer1), @@ -168,6 +171,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup } @Test + // @Disabled public void testSubAndUnsubHandling() { final var streamMediator = new LiveStreamMediatorImpl( @@ -220,6 +224,9 @@ public void testOnCancelSubscriptionHandling() throws IOException { final List blockItems = generateBlockItems(1); streamMediator.publishEvent(blockItems.getFirst()); + // Verify the event made it to the consumer + verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any()); + // Simulate the consumer cancelling the stream testConsumerBlockItemObserver.getOnCancel().run(); @@ -251,6 +258,9 @@ public void testOnCloseSubscriptionHandling() throws IOException { final List blockItems = generateBlockItems(1); streamMediator.publishEvent(blockItems.getFirst()); + // Verify the event made it to the consumer + verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any()); + // Simulate the consumer completing the stream testConsumerBlockItemObserver.getOnClose().run();