diff --git a/server/src/main/java/com/hedera/block/server/validator/StreamValidatorBuilder.java b/server/src/main/java/com/hedera/block/server/validator/StreamValidatorBuilder.java index ef47caea5..453914cb5 100644 --- a/server/src/main/java/com/hedera/block/server/validator/StreamValidatorBuilder.java +++ b/server/src/main/java/com/hedera/block/server/validator/StreamValidatorBuilder.java @@ -16,6 +16,7 @@ package com.hedera.block.server.validator; +import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; @@ -29,20 +30,25 @@ public class StreamValidatorBuilder { private final BlockWriter blockWriter; private final BlockNodeContext blockNodeContext; + private final ServiceStatus serviceStatus; + private SubscriptionHandler subscriptionHandler; private Notifier notifier; private StreamValidatorBuilder( @NonNull final BlockWriter blockWriter, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { this.blockWriter = blockWriter; this.blockNodeContext = blockNodeContext; + this.serviceStatus = serviceStatus; } public static StreamValidatorBuilder newBuilder( @NonNull final BlockWriter blockWriter, - @NonNull final BlockNodeContext blockNodeContext) { - return new StreamValidatorBuilder(blockWriter, blockNodeContext); + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { + return new StreamValidatorBuilder(blockWriter, blockNodeContext, serviceStatus); } public StreamValidatorBuilder subscriptionHandler( @@ -58,6 +64,6 @@ public StreamValidatorBuilder notifier(@NonNull final Notifier notifier) { public BlockNodeEventHandler> build() { return new StreamValidatorImpl( - subscriptionHandler, blockWriter, notifier, blockNodeContext); + subscriptionHandler, blockWriter, notifier, blockNodeContext, serviceStatus); } } diff --git a/server/src/main/java/com/hedera/block/server/validator/StreamValidatorImpl.java b/server/src/main/java/com/hedera/block/server/validator/StreamValidatorImpl.java index 5cc3d0f5b..0726585c5 100644 --- a/server/src/main/java/com/hedera/block/server/validator/StreamValidatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/validator/StreamValidatorImpl.java @@ -16,6 +16,7 @@ package com.hedera.block.server.validator; +import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; @@ -38,16 +39,19 @@ public class StreamValidatorImpl private final BlockWriter blockWriter; private final Notifier notifier; private final MetricsService metricsService; + private final ServiceStatus serviceStatus; public StreamValidatorImpl( @NonNull final SubscriptionHandler subscriptionHandler, @NonNull final BlockWriter blockWriter, @NonNull final Notifier notifier, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { this.subscriptionHandler = subscriptionHandler; this.blockWriter = blockWriter; this.notifier = notifier; this.metricsService = blockNodeContext.metricsService(); + this.serviceStatus = serviceStatus; } @Override @@ -64,6 +68,9 @@ public void onEvent( } catch (IOException e) { + // Trigger the server to stop accepting new requests + serviceStatus.stopRunning(getClass().getName()); + // Unsubscribe from the mediator to avoid additional onEvent calls. subscriptionHandler.unsubscribe(this); diff --git a/server/src/main/java/com/hedera/block/server/validator/ValidatorInjectionModule.java b/server/src/main/java/com/hedera/block/server/validator/ValidatorInjectionModule.java index 65d201080..538578fc0 100644 --- a/server/src/main/java/com/hedera/block/server/validator/ValidatorInjectionModule.java +++ b/server/src/main/java/com/hedera/block/server/validator/ValidatorInjectionModule.java @@ -16,6 +16,7 @@ package com.hedera.block.server.validator; +import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.persistence.storage.write.BlockWriter; import com.hedera.hapi.block.stream.BlockItem; @@ -31,7 +32,8 @@ public interface ValidatorInjectionModule { @Singleton static StreamValidatorBuilder providesStreamValidatorBuilder( @NonNull final BlockWriter blockWriter, - @NonNull final BlockNodeContext blockNodeContext) { - return StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { + return StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java index 5644ab43b..8b139a477 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java @@ -169,7 +169,7 @@ public void testPublishBlockStreamRegistrationAndExecution() final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -270,7 +270,7 @@ public void testSubscribeBlockStream() throws IOException { // Build the BlockStreamService final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -724,7 +724,7 @@ private BlockStreamService buildBlockStreamService( final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); return new BlockStreamService( streamMediator, blockReader, diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java index d552d9b31..53ce36c07 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -113,7 +113,7 @@ public void tearDown() { public void testServiceName() throws IOException { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -130,10 +130,10 @@ public void testServiceName() throws IOException { } @Test - public void testProto() throws IOException { + public void testProto() { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -154,7 +154,7 @@ public void testProto() throws IOException { void testSingleBlockHappyPath() throws IOException, ParseException { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockReader blockReader = BlockAsDirReaderBuilder.newBuilder(config).build(); final BlockStreamService blockStreamService = new BlockStreamService( @@ -215,7 +215,7 @@ void testSingleBlockNotFoundPath() throws IOException, ParseException { // Call the service final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -235,7 +235,7 @@ void testSingleBlockNotFoundPath() throws IOException, ParseException { void testSingleBlockServiceNotAvailable() { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -262,7 +262,7 @@ void testSingleBlockServiceNotAvailable() { @Test public void testSingleBlockIOExceptionPath() throws IOException, ParseException { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -289,7 +289,7 @@ public void testSingleBlockIOExceptionPath() throws IOException, ParseException @Test public void testSingleBlockParseExceptionPath() throws IOException, ParseException { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -317,7 +317,7 @@ public void testSingleBlockParseExceptionPath() throws IOException, ParseExcepti public void testUpdateInvokesRoutingWithLambdas() { final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, @@ -344,7 +344,7 @@ public void testProtocParseExceptionHandling() { // TODO: We might be able to remove this test once we can remove the Translator class final var streamValidatorBuilder = - StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext); + StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus); final BlockStreamService blockStreamService = new BlockStreamService( streamMediator, diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index d35f8c27d..3ad002e01 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -159,7 +158,12 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { // register the stream validator when(blockWriter.write(blockItem)).thenReturn(Optional.empty()); final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, + blockWriter, + notifier, + blockNodeContext, + new ServiceStatusImpl()); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -218,7 +222,12 @@ public void testMediatorPublishEventToSubscribers() throws IOException { // register the stream validator when(blockWriter.write(blockItem)).thenReturn(Optional.empty()); final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, + blockWriter, + notifier, + blockNodeContext, + new ServiceStatusImpl()); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -288,7 +297,12 @@ public void testOnCancelSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty()); final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, + blockWriter, + notifier, + blockNodeContext, + new ServiceStatusImpl()); streamMediator.subscribe(streamValidator); // register the test observer @@ -337,7 +351,12 @@ public void testOnCloseSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty()); final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, + blockWriter, + notifier, + blockNodeContext, + new ServiceStatusImpl()); streamMediator.subscribe(streamValidator); final var testConsumerBlockItemObserver = @@ -372,7 +391,7 @@ public void testOnCloseSubscriptionHandling() throws IOException { @Test public void testMediatorBlocksPublishAfterException() throws IOException { - final ServiceStatus serviceStatus = mock(ServiceStatus.class); + final ServiceStatus serviceStatus = new ServiceStatusImpl(); final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); @@ -399,7 +418,8 @@ public void testMediatorBlocksPublishAfterException() throws IOException { .blockStreamService(blockStreamService) .build(); final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus); // Set up the stream verifier streamMediator.subscribe(streamValidator); @@ -407,8 +427,6 @@ public void testMediatorBlocksPublishAfterException() throws IOException { final List blockItems = generateBlockItems(1); final BlockItem firstBlockItem = blockItems.getFirst(); - when(serviceStatus.isRunning()).thenReturn(true); - // Right now, only a single producer calls publishEvent. In // that case, they will get an IOException bubbled up to them. // However, we will need to support multiple producers in the @@ -419,8 +437,6 @@ public void testMediatorBlocksPublishAfterException() throws IOException { streamMediator.publish(firstBlockItem); verify(blockStreamService, timeout(testTimeout).times(1)).notifyUnrecoverableError(); - verify(serviceStatus, timeout(testTimeout).times(1)).isRunning(); - verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any()); // Confirm the counter was incremented only once assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); @@ -446,7 +462,7 @@ public void testMediatorBlocksPublishAfterException() throws IOException { verify(streamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse)); verify(streamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse)); - // once despite the second block being published. + // verify write method only called once despite the second block being published. verify(blockWriter, timeout(testTimeout).times(1)).write(firstBlockItem); } @@ -460,7 +476,12 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { // register the stream validator final var streamValidator = - new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext); + new StreamValidatorImpl( + streamMediator, + blockWriter, + notifier, + blockNodeContext, + new ServiceStatusImpl()); streamMediator.subscribe(streamValidator); final var testConsumerBlockItemObserver =