diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 25b9ba1a0..50510cb74 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -45,7 +45,7 @@ public class BlockStreamService implements GrpcService { private final long timeoutThresholdMillis; private final ItemAckBuilder itemAckBuilder; - private final StreamMediator, BlockItem> streamMediator; + private final StreamMediator> streamMediator; /** * Constructor for the BlockStreamService class. @@ -55,7 +55,7 @@ public class BlockStreamService implements GrpcService { public BlockStreamService( final long timeoutThresholdMillis, final ItemAckBuilder itemAckBuilder, - final StreamMediator, BlockItem> streamMediator) { + final StreamMediator> streamMediator) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.itemAckBuilder = itemAckBuilder; this.streamMediator = streamMediator; diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 35c8c983d..bfb3f7958 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -29,6 +29,7 @@ import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcRouting; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; /** Main class for the block node server */ public class Server { @@ -54,60 +55,61 @@ private Server() {} */ public static void main(final String[] args) { - try { + // Set the global configuration + final Config config = Config.create(); + Config.global(config); + + // Build the gRPC service + final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config); + + // Start the web server + WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build(); + + webServer.start(); + // .start(); + + } + + private static GrpcRouting.Builder buildGrpcRouting(final Config config) { - // Set the global configuration - final Config config = Config.create(); - Config.global(config); - - // Get Timeout threshold from configuration - final long consumerTimeoutThreshold = - config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY) - .asLong() - .orElse(1500L); - - // Initialize the reader and writer for the block storage - final BlockWriter blockWriter = - new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - final BlockReader blockReader = - new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - - final BlockStreamService blockStreamService = - new BlockStreamService( - consumerTimeoutThreshold, - new ItemAckBuilder(), - new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), - (streamMediator) -> { - LOGGER.log( - System.Logger.Level.ERROR, - "Shutting down the server due to an error."); - })); - - // Start the web server - WebServer.builder() - .port(8080) - .addRouting( - GrpcRouting.builder() - .service(blockStreamService) - .bidi( - com.hedera.block.protos.BlockStreamService - .getDescriptor(), - SERVICE_NAME, - CLIENT_STREAMING_METHOD_NAME, - clientBidiStreamingMethod) - .serverStream( - com.hedera.block.protos.BlockStreamService - .getDescriptor(), - SERVICE_NAME, - SERVER_STREAMING_METHOD_NAME, - serverStreamingMethod)) - .build() - .start(); - - } catch (IOException e) { - LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e); - throw new RuntimeException(e); + try { + final BlockStreamService blockStreamService = buildBlockStreamService(config); + return GrpcRouting.builder() + .service(blockStreamService) + .bidi( + com.hedera.block.protos.BlockStreamService.getDescriptor(), + SERVICE_NAME, + CLIENT_STREAMING_METHOD_NAME, + clientBidiStreamingMethod) + .serverStream( + com.hedera.block.protos.BlockStreamService.getDescriptor(), + SERVICE_NAME, + SERVER_STREAMING_METHOD_NAME, + serverStreamingMethod); + } catch (IOException io) { + LOGGER.log( + System.Logger.Level.ERROR, "An exception was thrown starting the server", io); + throw new RuntimeException(io); } } + + private static BlockStreamService buildBlockStreamService(final Config config) + throws IOException { + // Get Timeout threshold from configuration + final long consumerTimeoutThreshold = + config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L); + + // Initialize the reader and writer for the block storage + final BlockWriter blockWriter = + new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + final BlockReader blockReader = + new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + + return new BlockStreamService( + consumerTimeoutThreshold, + new ItemAckBuilder(), + new LiveStreamMediatorImpl( + new ConcurrentHashMap<>(32), + new FileSystemPersistenceHandler(blockReader, blockWriter))); + } } diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 4ce5334d3..64f704238 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -24,13 +24,15 @@ import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.time.InstantSource; +import java.util.concurrent.atomic.AtomicBoolean; /** * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to * the downstream consumer via the notify method and manage the bidirectional stream to the consumer * via the onNext, onError, and onCompleted methods. */ -public class ConsumerBlockItemObserver implements BlockItemEventHandler> { +public class ConsumerBlockItemObserver + implements BlockItemEventHandler> { private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -40,9 +42,10 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler, BlockItem> streamMediator; + private final StreamMediator> streamMediator; private boolean streamStarted; + private AtomicBoolean isEventTransitioning = new AtomicBoolean(false); /** * Constructor for the LiveStreamObserverImpl class. @@ -52,7 +55,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler, BlockItem> streamMediator, + final StreamMediator> streamMediator, final StreamObserver subscribeStreamResponseObserver) { this.timeoutThresholdMillis = timeoutThresholdMillis; @@ -84,7 +87,10 @@ public ConsumerBlockItemObserver( /** Pass the block to the observer provided by Helidon */ @Override - public void onEvent(final ObjectEvent event, final long l, final boolean b) { + public void onEvent( + final ObjectEvent event, final long l, final boolean b) { + + isEventTransitioning.set(true); final long currentMillis = producerLivenessClock.millis(); if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) { @@ -97,22 +103,31 @@ public void onEvent(final ObjectEvent event, final long l, final bool // Only start sending BlockItems after we've reached // the beginning of a block. - final BlockItem blockItem = event.get(); + final SubscribeStreamResponse subscribeStreamResponse = event.get(); + final BlockItem blockItem = subscribeStreamResponse.getBlockItem(); if (!streamStarted && blockItem.hasHeader()) { streamStarted = true; } if (streamStarted) { - final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); - LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem); - subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } } + + isEventTransitioning.set(false); } @Override - public void awaitShutdown() throws InterruptedException {} + public void awaitShutdown() throws InterruptedException { + while (isEventTransitioning.get()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + LOGGER.log( + System.Logger.Level.ERROR, + "Interrupted while waiting for event to complete"); + } + } + } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 6c7481d1a..1e742bdd0 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -16,8 +16,7 @@ package com.hedera.block.server.mediator; -import static com.hedera.block.protos.BlockStreamService.Block; -import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.protos.BlockStreamService.*; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; @@ -27,12 +26,13 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; +import io.helidon.webserver.WebServer; import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicBoolean; /** * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible @@ -40,20 +40,23 @@ * live blocks to the subscribers as they arrive and persists the blocks to the block persistence * store. */ -public class LiveStreamMediatorImpl implements StreamMediator, BlockItem> { +public class LiveStreamMediatorImpl + implements StreamMediator> { private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final RingBuffer> ringBuffer; + private final RingBuffer> ringBuffer; private final ExecutorService executor; private final Map< - BlockItemEventHandler>, - BatchEventProcessor>> + BlockItemEventHandler>, + BatchEventProcessor>> subscribers; private final BlockPersistenceHandler blockPersistenceHandler; - private final Consumer, BlockItem>> shutdownCallback; + + private final AtomicBoolean isPublishing = new AtomicBoolean(true); + private WebServer webserver; /** * Constructor for the LiveStreamMediatorImpl class. @@ -62,47 +65,75 @@ public class LiveStreamMediatorImpl implements StreamMediator>, - BatchEventProcessor>> + BlockItemEventHandler>, + BatchEventProcessor>> subscribers, - final BlockPersistenceHandler blockPersistenceHandler, - final Consumer, BlockItem>> shutdownCallback) { + final BlockPersistenceHandler blockPersistenceHandler) { + this.subscribers = subscribers; this.blockPersistenceHandler = blockPersistenceHandler; // Initialize and start the disruptor - final Disruptor> disruptor = + final Disruptor> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE); this.ringBuffer = disruptor.start(); this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); - this.shutdownCallback = shutdownCallback; } public LiveStreamMediatorImpl( - final BlockPersistenceHandler blockPersistenceHandler, - final Consumer, BlockItem>> shutdownCallback) { - this(new HashMap<>(), blockPersistenceHandler, shutdownCallback); + final BlockPersistenceHandler blockPersistenceHandler) { + this(new ConcurrentHashMap<>(), blockPersistenceHandler); } @Override - public void publishEvent(BlockItem blockItem) { + public void publishEvent(final BlockItem blockItem) throws IOException { - // Publish the block for all subscribers to receive - LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); - ringBuffer.publishEvent((event, sequence) -> event.set(blockItem)); - - // Block persistence try { - blockPersistenceHandler.persist(blockItem); + if (isPublishing.get()) { + + // Publish the block for all subscribers to receive + LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); + final var subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse)); + blockPersistenceHandler.persist(blockItem); + + } else { + LOGGER.log(System.Logger.Level.ERROR, "StreamMediator is not accepting BlockItems"); + } } catch (IOException e) { - // TODO: Push back on the producer? - LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e); - shutdownCallback.accept(this); + // Disable publishing BlockItems from upstream producers + isPublishing.set(false); + LOGGER.log( + System.Logger.Level.ERROR, + "An exception occurred while attempting to persist the BlockItem: {0}", + blockItem, + e); + + LOGGER.log(System.Logger.Level.INFO, "Send a response to end the stream"); + + // Publish the block for all subscribers to receive + final var endStreamResponse = buildEndStreamResponse(); + ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse)); + + // Unsubscribe all downstream consumers + for (final var subscriber : subscribers.keySet()) { + LOGGER.log(System.Logger.Level.INFO, "Unsubscribing: {0}", subscriber); + unsubscribe(subscriber); + } + + throw e; } } @Override - public void subscribe(final BlockItemEventHandler> handler) { + public boolean isPublishing() { + return isPublishing.get(); + } + + @Override + public void subscribe( + final BlockItemEventHandler> handler) { // Initialize the batch event processor and set it on the ring buffer final var batchEventProcessor = @@ -117,15 +148,16 @@ public void subscribe(final BlockItemEventHandler> handle } @Override - public void unsubscribe(final BlockItemEventHandler> handler) { - final var batchEventProcessor = subscribers.remove(handler); - - // Stop the processor - batchEventProcessor.halt(); + public void unsubscribe( + final BlockItemEventHandler> handler) { - // Wait for shutdown the complete + final var batchEventProcessor = subscribers.remove(handler); try { + // Wait for shutdown the complete handler.awaitShutdown(); + + // Stop the processor + batchEventProcessor.halt(); } catch (InterruptedException e) { LOGGER.log(System.Logger.Level.ERROR, "Error occurred while waiting for shutdown", e); } @@ -135,7 +167,20 @@ public void unsubscribe(final BlockItemEventHandler> hand } @Override - public boolean isSubscribed(BlockItemEventHandler> handler) { + public boolean isSubscribed( + BlockItemEventHandler> handler) { return subscribers.containsKey(handler); } + + @Override + public void register(final WebServer webServer) { + this.webserver = webServer; + } + + private static SubscribeStreamResponse buildEndStreamResponse() { + // The current spec does not contain a generic error code for + // SubscribeStreamResponseCode. + // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? + return SubscribeStreamResponse.newBuilder().setStatus(2).build(); + } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index d597faa8e..ec4647e75 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -17,6 +17,8 @@ package com.hedera.block.server.mediator; import com.hedera.block.server.consumer.BlockItemEventHandler; +import io.helidon.webserver.WebServer; +import java.io.IOException; /** * The StreamMediator interface represents a one-to-many bridge between a bidirectional stream of @@ -28,15 +30,18 @@ * for the StreamObserver and the Block types to vary independently. * * @param The type required by the RingBuffer implementation - * @param The type of the BlockItem */ public interface StreamMediator { - void publishEvent(final V blockItem); + void publishEvent(final U blockItem) throws IOException; - void subscribe(final BlockItemEventHandler handler); + boolean isPublishing(); - void unsubscribe(final BlockItemEventHandler handler); + void subscribe(final BlockItemEventHandler handler); - boolean isSubscribed(final BlockItemEventHandler handler); + void unsubscribe(final BlockItemEventHandler handler); + + boolean isSubscribed(final BlockItemEventHandler handler); + + void register(final WebServer webServer); } diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 5ba551a5a..ada3c1a00 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -35,7 +35,7 @@ public class ProducerBlockItemObserver implements StreamObserver publishStreamResponseObserver; - private final StreamMediator, BlockItem> streamMediator; + private final StreamMediator> streamMediator; private final ItemAckBuilder itemAckBuilder; /** @@ -44,7 +44,7 @@ public class ProducerBlockItemObserver implements StreamObserver, BlockItem> streamMediator, + final StreamMediator> streamMediator, final StreamObserver publishStreamResponseObserver, final ItemAckBuilder itemAckBuilder) { @@ -62,27 +62,49 @@ public ProducerBlockItemObserver( public void onNext(final PublishStreamRequest publishStreamRequest) { final BlockItem blockItem = publishStreamRequest.getBlockItem(); - streamMediator.publishEvent(blockItem); try { - final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem); - final PublishStreamResponse publishStreamResponse = - PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); - publishStreamResponseObserver.onNext(publishStreamResponse); - - } catch (IOException | NoSuchAlgorithmException e) { - - final EndOfStream endOfStream = - EndOfStream.newBuilder() - .setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) - .build(); - final PublishStreamResponse errorResponse = - PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); + // Publish the block to all the subscribers unless + // there's an issue with the StreamMediator. + if (streamMediator.isPublishing()) { + + // Publish the block to the mediator + streamMediator.publishEvent(blockItem); + + try { + // Build the response + final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem); + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); + publishStreamResponseObserver.onNext(publishStreamResponse); + + } catch (IOException | NoSuchAlgorithmException e) { + final var errorResponse = buildErrorStreamResponse(); + publishStreamResponseObserver.onNext(errorResponse); + LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e); + } + + } else { + // Close the upstream connection to the producer(s) + final var errorResponse = buildErrorStreamResponse(); + publishStreamResponseObserver.onNext(errorResponse); + LOGGER.log(System.Logger.Level.DEBUG, "StreamMediator is not accepting BlockItems"); + } + } catch (IOException io) { + final var errorResponse = buildErrorStreamResponse(); publishStreamResponseObserver.onNext(errorResponse); - LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e); + LOGGER.log(System.Logger.Level.ERROR, "Exception thrown publishing BlockItem", io); } } + private static PublishStreamResponse buildErrorStreamResponse() { + final EndOfStream endOfStream = + EndOfStream.newBuilder() + .setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .build(); + return PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); + } + /** * Helidon triggers this method when an error occurs on the bidirectional stream to the upstream * producer. diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 310441adf..558a22083 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -17,10 +17,12 @@ package com.hedera.block.server; import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.EndOfStream; import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement; +import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.PublishStreamResponseCode; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.*; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; @@ -41,12 +43,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; import java.security.NoSuchAlgorithmException; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -60,7 +61,7 @@ public class BlockStreamServiceIT { private final System.Logger LOGGER = System.getLogger(getClass().getName()); private final Object lock = new Object(); - @Mock private StreamMediator, BlockItem> streamMediator; + @Mock private StreamMediator> streamMediator; @Mock private StreamObserver publishStreamResponseObserver; @@ -76,7 +77,6 @@ public class BlockStreamServiceIT { @Mock private BlockReader blockReader; @Mock private BlockWriter blockWriter; - @Mock private Consumer, BlockItem>> testCallback; private static final String TEMP_DIR = "block-node-unit-test-dir"; private static final String JUNIT = "my-junit-test"; @@ -100,16 +100,18 @@ public void tearDown() { } @Test - public void testPublishBlockStreamRegistrationAndExec() + public void testPublishBlockStreamRegistrationAndExecution() throws InterruptedException, IOException, NoSuchAlgorithmException { final BlockStreamService blockStreamService = new BlockStreamService(50L, new ItemAckBuilder(), streamMediator); + when(streamMediator.isPublishing()).thenReturn(true); + final StreamObserver streamObserver = blockStreamService.publishBlockStream(publishStreamResponseObserver); - final BlockItem blockItem = generateBlockItems(1).get(0); + final BlockItem blockItem = generateBlockItems(1).getFirst(); final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder().setBlockItem(blockItem).build(); @@ -147,8 +149,7 @@ public void testSubscribeBlockStream() throws InterruptedException { final var streamMediator = new LiveStreamMediatorImpl( new HashMap<>(), - new FileSystemPersistenceHandler(blockReader, blockWriter), - testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); // Build the BlockStreamService final BlockStreamService blockStreamService = @@ -166,7 +167,7 @@ public void testSubscribeBlockStream() throws InterruptedException { // Build the BlockItem final List blockItems = generateBlockItems(1); final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build(); + PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build(); // Calling onNext() with a BlockItem streamObserver.onNext(publishStreamRequest); @@ -176,7 +177,7 @@ public void testSubscribeBlockStream() throws InterruptedException { } final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build(); + SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build(); verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse); verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse); @@ -296,8 +297,8 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep int numberOfBlocks = 100; final LinkedHashMap< - BlockItemEventHandler>, - BatchEventProcessor>> + BlockItemEventHandler>, + BatchEventProcessor>> subscribers = new LinkedHashMap<>(); final var streamMediator = buildStreamMediator(subscribers); final var blockStreamService = buildBlockStreamService(streamMediator); @@ -378,6 +379,78 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep streamObserver.onCompleted(); } + @Test + public void testMediatorExceptionHandlingWhenPersistenceFailure() + throws IOException, InterruptedException { + final Map< + BlockItemEventHandler>, + BatchEventProcessor>> + subscribers = new ConcurrentHashMap<>(); + final var streamMediator = buildStreamMediator(subscribers); + final var blockStreamService = buildBlockStreamService(streamMediator); + + // Subscribe the consumers + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); + + // Initialize the producer + final StreamObserver streamObserver = + blockStreamService.publishBlockStream(publishStreamResponseObserver); + + // Change the permissions on the file system to trigger an + // IOException when the BlockPersistenceHandler tries to write + // the first BlockItem to the file system. + removeRootPathWritePerms(testConfig); + + // Transmit a BlockItem + final List blockItems = generateBlockItems(1); + final PublishStreamRequest publishStreamRequest = + PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build(); + + streamObserver.onNext(publishStreamRequest); + + synchronized (lock) { + lock.wait(50); + } + + // The BlockItem passed through since it was published + // before the IOException was thrown. + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build(); + verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse); + + // Verify all the consumers received the end of stream response + final SubscribeStreamResponse endStreamResponse = + SubscribeStreamResponse.newBuilder().setStatus(2).build(); + verify(subscribeStreamObserver1, times(1)).onNext(endStreamResponse); + verify(subscribeStreamObserver2, times(1)).onNext(endStreamResponse); + verify(subscribeStreamObserver3, times(1)).onNext(endStreamResponse); + + // Verify all the consumers were unsubscribed + for (final var s : subscribers.keySet()) { + assertFalse(streamMediator.isSubscribed(s)); + } + + final EndOfStream endOfStream = + EndOfStream.newBuilder() + .setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .build(); + final var endOfStreamResponse = + PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); + verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse); + } + + private static final String NO_WRITE = "r-xr-xr-x"; + + private void removeRootPathWritePerms(final Config config) throws IOException { + final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get()); + final Set perms = PosixFilePermissions.fromString(NO_WRITE); + Files.setPosixFilePermissions(blockNodeRootPath, perms); + } + private static void verifySubscribeStreamResponse( int numberOfBlocks, int blockItemsToWait, @@ -415,10 +488,15 @@ private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem bl return SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); } - private StreamMediator, BlockItem> buildStreamMediator( + private StreamMediator> buildStreamMediator() + throws IOException { + return buildStreamMediator(new ConcurrentHashMap<>()); + } + + private StreamMediator> buildStreamMediator( final Map< - BlockItemEventHandler>, - BatchEventProcessor>> + BlockItemEventHandler>, + BatchEventProcessor>> subscribers) throws IOException { @@ -426,12 +504,7 @@ private StreamMediator, BlockItem> buildStreamMediator( final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); return new LiveStreamMediatorImpl( - subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); - } - - private StreamMediator, BlockItem> buildStreamMediator() - throws IOException { - return buildStreamMediator(new HashMap<>()); + subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter)); } private BlockStreamService buildBlockStreamService() throws IOException { @@ -442,7 +515,7 @@ private BlockStreamService buildBlockStreamService() throws IOException { } private BlockStreamService buildBlockStreamService( - StreamMediator, BlockItem> streamMediator) { + StreamMediator> streamMediator) { return new BlockStreamService(2000, new ItemAckBuilder(), streamMediator); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java index 87c8b4abc..a37a0a3a3 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -17,6 +17,7 @@ package com.hedera.block.server; import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; @@ -38,11 +39,7 @@ public class BlockStreamServiceTest { @Mock private ItemAckBuilder itemAckBuilder; - @Mock - private StreamMediator< - ObjectEvent, - com.hedera.block.protos.BlockStreamService.BlockItem> - streamMediator; + @Mock private StreamMediator> streamMediator; @Test public void testServiceName() throws IOException, NoSuchAlgorithmException { diff --git a/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java index f2f71ce1c..937ed265b 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java @@ -36,11 +36,11 @@ public class ConsumerBlockItemObserverTest { private final long TIMEOUT_THRESHOLD_MILLIS = 50L; private final long TEST_TIME = 1_719_427_664_950L; - private Object lock = new Object(); + private final Object lock = new Object(); - @Mock private StreamMediator, BlockItem> streamMediator; + @Mock private StreamMediator> streamMediator; @Mock private StreamObserver responseStreamObserver; - @Mock private ObjectEvent objectEvent; + @Mock private ObjectEvent objectEvent; @Test public void testProducerTimeoutWithinWindow() { @@ -54,11 +54,11 @@ public void testProducerTimeoutWithinWindow() { final BlockStreamService.BlockHeader blockHeader = BlockStreamService.BlockHeader.newBuilder().setBlockNumber(1).build(); final BlockItem blockItem = BlockItem.newBuilder().setHeader(blockHeader).build(); - when(objectEvent.get()).thenReturn(blockItem); - final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + when(objectEvent.get()).thenReturn(subscribeStreamResponse); + consumerBlockItemObserver.onEvent(objectEvent, 0, true); // verify the observer is called with the next BlockItem @@ -100,12 +100,16 @@ public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException EventMetadata.newBuilder().setCreatorId(i).build(); final BlockItem blockItem = BlockItem.newBuilder().setStartEvent(eventMetadata).build(); - when(objectEvent.get()).thenReturn(blockItem); + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + when(objectEvent.get()).thenReturn(subscribeStreamResponse); } else { final BlockProof blockProof = BlockProof.newBuilder().setBlock(i).build(); final BlockItem blockItem = BlockItem.newBuilder().setStateProof(blockProof).build(); - when(objectEvent.get()).thenReturn(blockItem); + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + when(objectEvent.get()).thenReturn(subscribeStreamResponse); } consumerBlockItemObserver.onEvent(objectEvent, 0, true); diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index a042dfc1b..7ceb8993f 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -20,18 +20,17 @@ import static com.hedera.block.server.util.TestClock.buildClockInsideWindow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.consumer.ConsumerBlockItemObserver; import com.hedera.block.server.data.ObjectEvent; -import com.hedera.block.server.persistence.BlockPersistenceHandler; import com.hedera.block.server.persistence.FileSystemPersistenceHandler; import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.persistence.storage.BlockWriter; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -42,9 +41,9 @@ public class LiveStreamMediatorImplTest { private final Object lock = new Object(); - @Mock private BlockItemEventHandler> observer1; - @Mock private BlockItemEventHandler> observer2; - @Mock private BlockItemEventHandler> observer3; + @Mock private BlockItemEventHandler> observer1; + @Mock private BlockItemEventHandler> observer2; + @Mock private BlockItemEventHandler> observer3; @Mock private BlockReader blockReader; @Mock private BlockWriter blockWriter; @@ -53,15 +52,14 @@ public class LiveStreamMediatorImplTest { @Mock private StreamObserver streamObserver2; @Mock private StreamObserver streamObserver3; - @Mock private Consumer, BlockItem>> testCallback; - @Mock private BlockPersistenceHandler blockPersistenceHandler; + // @Mock private BlockPersistenceHandler blockPersistenceHandler; @Test public void testUnsubscribeEach() { final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); // Set up the subscribers streamMediator.subscribe(observer1); @@ -99,7 +97,7 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); final BlockItem blockItem = BlockItem.newBuilder().build(); @@ -118,7 +116,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); final var concreteObserver1 = new ConsumerBlockItemObserver( @@ -177,22 +175,23 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup verify(blockWriter).write(blockItem); } - @Test - public void testPublishEventExceptionHandling() throws IOException, InterruptedException { - final var streamMediator = - new LiveStreamMediatorImpl(blockPersistenceHandler, testCallback); - - final BlockItem blockItem = BlockItem.newBuilder().build(); - doThrow(new IOException("Test exception")).when(blockPersistenceHandler).persist(blockItem); - - streamMediator.publishEvent(blockItem); - - synchronized (lock) { - lock.wait(50); - } - - verify(testCallback, times(1)).accept(streamMediator); - } + // @Test + // public void testPublishEventExceptionHandling() throws IOException, InterruptedException { + // final var streamMediator = + // new LiveStreamMediatorImpl(blockPersistenceHandler); + // + // final BlockHeader blockHeader = BlockHeader.newBuilder().setBlockNumber(1).build(); + // final BlockItem blockItem = BlockItem.newBuilder().setHeader(blockHeader).build(); + // doThrow(new IOException("Read Permission + // Exception")).when(blockPersistenceHandler).persist(blockItem); + // when(blockPersistenceHandler + // + // streamMediator.publishEvent(blockItem); + // + // synchronized (lock) { + // lock.wait(50); + // } + // } @Test public void testSubAndUnsubHandling() { @@ -200,7 +199,7 @@ public void testSubAndUnsubHandling() { final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); final var concreteObserver1 = new ConsumerBlockItemObserver( diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java index b425a16b8..c100b0b0d 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -80,7 +80,8 @@ public void testReadRangeWhenBlocksLessThanWindow() throws IOException { private static void verifyReadRange( int window, int numOfWindows, - BlockPersistenceHandler blockPersistenceHandler) throws IOException { + BlockPersistenceHandler blockPersistenceHandler) + throws IOException { for (int j = 0; j < numOfWindows; ++j) { diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index ab4342d86..6eece47a5 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.List; -import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -49,7 +48,7 @@ public class ProducerBlockItemObserverTest { private final Object lock = new Object(); @Mock private ItemAckBuilder itemAckBuilder; - @Mock private StreamMediator, BlockItem> streamMediator; + @Mock private StreamMediator> streamMediator; @Mock private StreamObserver publishStreamResponseObserver; @Mock private BlockReader blockReader; @@ -59,8 +58,6 @@ public class ProducerBlockItemObserverTest { @Mock private StreamObserver streamObserver2; @Mock private StreamObserver streamObserver3; - @Mock private Consumer, BlockItem>> testCallback; - @Test public void testProducerOnNext() throws InterruptedException, IOException, NoSuchAlgorithmException { @@ -70,7 +67,9 @@ public void testProducerOnNext() new ProducerBlockItemObserver( streamMediator, publishStreamResponseObserver, new ItemAckBuilder()); - BlockItem blockHeader = blockItems.get(0); + when(streamMediator.isPublishing()).thenReturn(true); + + BlockItem blockHeader = blockItems.getFirst(); PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build(); producerBlockItemObserver.onNext(publishStreamRequest); @@ -103,7 +102,7 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter), testCallback); + new FileSystemPersistenceHandler(blockReader, blockWriter)); final var concreteObserver1 = new ConsumerBlockItemObserver( @@ -187,11 +186,12 @@ public void testItemAckBuilderExceptionTest() new ProducerBlockItemObserver( streamMediator, publishStreamResponseObserver, itemAckBuilder); + when(streamMediator.isPublishing()).thenReturn(true); when(itemAckBuilder.buildAck(any())) .thenThrow(new NoSuchAlgorithmException("Test exception")); List blockItems = generateBlockItems(1); - BlockItem blockHeader = blockItems.get(0); + BlockItem blockHeader = blockItems.getFirst(); PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build(); producerBlockItemObserver.onNext(publishStreamRequest);