diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index d5353c2b4..0a9a83483 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -71,22 +71,24 @@ public ConsumerBlockItemObserver( ServerCallStreamObserver serverCallStreamObserver) { - onCancel = () -> { - allowPublish.set(false); - streamMediator.unsubscribe(this); - LOGGER.log( - System.Logger.Level.INFO, - "Consumer cancelled stream. Observer unsubscribed."); - }; + onCancel = + () -> { + allowPublish.set(false); + streamMediator.unsubscribe(this); + LOGGER.log( + System.Logger.Level.INFO, + "Consumer cancelled stream. Observer unsubscribed."); + }; serverCallStreamObserver.setOnCancelHandler(onCancel); - onClose = () -> { - allowPublish.set(false); - streamMediator.unsubscribe(this); - LOGGER.log( - System.Logger.Level.INFO, - "Consumer completed stream. Observer unsubscribed."); - }; + onClose = + () -> { + allowPublish.set(false); + streamMediator.unsubscribe(this); + LOGGER.log( + System.Logger.Level.INFO, + "Consumer completed stream. Observer unsubscribed."); + }; serverCallStreamObserver.setOnCloseHandler(onClose); } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index f1084b026..c0bd742b8 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -32,10 +32,8 @@ import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.time.Clock; import java.time.InstantSource; import java.util.List; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -229,7 +227,6 @@ public void testOnCancelSubscriptionHandling() throws IOException { new LiveStreamMediatorImpl( new FileSystemPersistenceHandler(blockReader, blockWriter)); - final var testConsumerBlockItemObserver = new TestConsumerBlockItemObserver( TIMEOUT_THRESHOLD_MILLIS, @@ -260,7 +257,6 @@ public void testOnCloseSubscriptionHandling() throws IOException { new LiveStreamMediatorImpl( new FileSystemPersistenceHandler(blockReader, blockWriter)); - final var testConsumerBlockItemObserver = new TestConsumerBlockItemObserver( TIMEOUT_THRESHOLD_MILLIS, @@ -286,9 +282,14 @@ private static class TestConsumerBlockItemObserver extends ConsumerBlockItemObse public TestConsumerBlockItemObserver( long timeoutThresholdMillis, final InstantSource producerLivenessClock, - final StreamMediator> streamMediator, + final StreamMediator> + streamMediator, final StreamObserver responseStreamObserver) { - super(timeoutThresholdMillis, producerLivenessClock, streamMediator, responseStreamObserver); + super( + timeoutThresholdMillis, + producerLivenessClock, + streamMediator, + responseStreamObserver); } public Runnable getOnCancel() {