diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 4f00126b6..c296556fe 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -28,9 +28,9 @@ import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.service.ServiceStatus; +import com.hedera.hapi.block.BlockItemSet; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseCode; -import com.hedera.hapi.block.SubscribeStreamResponseSet; import com.hedera.hapi.block.stream.BlockItem; import com.lmax.disruptor.BatchEventProcessor; import edu.umd.cs.findbugs.annotations.NonNull; @@ -45,8 +45,7 @@ * subscribers as they arrive via a RingBuffer maintained in the base class and persists the block * items to a store. */ -class LiveStreamMediatorImpl extends SubscriptionHandlerBase - implements LiveStreamMediator { +class LiveStreamMediatorImpl extends SubscriptionHandlerBase implements LiveStreamMediator { private final Logger LOGGER = System.getLogger(getClass().getName()); @@ -102,10 +101,11 @@ public void publish(@NonNull final List blockItems) { // Publish the block for all subscribers to receive LOGGER.log(DEBUG, "Publishing BlockItem"); - final SubscribeStreamResponseSet blockItemsSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); - final var subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(blockItemsSet).build(); + final BlockItemSet blockItemsSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); + final var subscribeStreamResponse = SubscribeStreamResponse.newBuilder() + .blockItems(blockItemsSet) + .build(); ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse)); // Increment the block item counter by all block items published diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index d54616ff4..d651bf162 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -59,8 +59,7 @@ public class ProducerBlockItemObserver private final Logger LOGGER = System.getLogger(getClass().getName()); - private final StreamObserver - publishStreamResponseObserver; + private final StreamObserver publishStreamResponseObserver; private final SubscriptionHandler subscriptionHandler; private final Publisher> publisher; private final ServiceStatus serviceStatus; @@ -108,13 +107,12 @@ public ProducerBlockItemObserver( @NonNull final BlockNodeContext blockNodeContext, @NonNull final ServiceStatus serviceStatus) { - this.livenessCalculator = - new LivenessCalculator( - producerLivenessClock, - blockNodeContext - .configuration() - .getConfigData(ConsumerConfig.class) - .timeoutThresholdMillis()); + this.livenessCalculator = new LivenessCalculator( + producerLivenessClock, + blockNodeContext + .configuration() + .getConfigData(ConsumerConfig.class) + .timeoutThresholdMillis()); this.publisher = publisher; this.publishStreamResponseObserver = publishStreamResponseObserver; @@ -127,24 +125,22 @@ public ProducerBlockItemObserver( ServerCallStreamObserver serverCallStreamObserver) { - onCancel = - () -> { - // The producer has cancelled the stream. - // Do not allow additional responses to be sent. - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); - LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed."); - }; + onCancel = () -> { + // The producer has cancelled the stream. + // Do not allow additional responses to be sent. + isResponsePermitted.set(false); + subscriptionHandler.unsubscribe(this); + LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed."); + }; serverCallStreamObserver.setOnCancelHandler(onCancel); - onClose = - () -> { - // The producer has closed the stream. - // Do not allow additional responses to be sent. - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); - LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed."); - }; + onClose = () -> { + // The producer has closed the stream. + // Do not allow additional responses to be sent. + isResponsePermitted.set(false); + subscriptionHandler.unsubscribe(this); + LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed."); + }; serverCallStreamObserver.setOnCloseHandler(onClose); } } @@ -157,26 +153,21 @@ public ProducerBlockItemObserver( * @param publishStreamRequest the PublishStreamRequest received from the upstream producer */ @Override - public void onNext( - @NonNull final com.hedera.hapi.block.protoc.PublishStreamRequest publishStreamRequest) { + public void onNext(@NonNull final com.hedera.hapi.block.protoc.PublishStreamRequest publishStreamRequest) { LOGGER.log(DEBUG, "Received PublishStreamRequest from producer"); final List blockItemsPbj = new ArrayList<>(); final Counter liveBlockItemsReceived = metricsService.get(LiveBlockItemsReceived); for (final com.hedera.hapi.block.stream.protoc.BlockItem blockItemProtoc : - publishStreamRequest.getBlockItemsList()) { + publishStreamRequest.getBlockItems().getBlockItemsList()) { try { - final BlockItem blockItem = - toPbj(BlockItem.PROTOBUF, blockItemProtoc.toByteArray()); + final BlockItem blockItem = toPbj(BlockItem.PROTOBUF, blockItemProtoc.toByteArray()); blockItemsPbj.add(blockItem); } catch (ParseException e) { final var errorResponse = buildErrorStreamResponse(); publishStreamResponseObserver.onNext(errorResponse); - LOGGER.log( - ERROR, - "Error parsing inbound block item from a producer: " + blockItemProtoc, - e); + LOGGER.log(ERROR, "Error parsing inbound block item from a producer: " + blockItemProtoc, e); // Stop the server serviceStatus.stopWebServer(getClass().getName()); @@ -206,15 +197,12 @@ public void onNext( } @Override - public void onEvent( - ObjectEvent event, long sequence, boolean endOfBatch) { + public void onEvent(ObjectEvent event, long sequence, boolean endOfBatch) { if (isResponsePermitted.get()) { if (isTimeoutExpired()) { subscriptionHandler.unsubscribe(this); - LOGGER.log( - DEBUG, - "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver."); + LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver."); } else { LOGGER.log(DEBUG, "Publishing response to upstream producer: " + this); publishStreamResponseObserver.onNext(fromPbj(event.get())); @@ -226,10 +214,9 @@ public void onEvent( @NonNull private static com.hedera.hapi.block.protoc.PublishStreamResponse buildErrorStreamResponse() { // TODO: Replace this with a real error enum. - final EndOfStream endOfStream = - EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) - .build(); + final EndOfStream endOfStream = EndOfStream.newBuilder() + .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .build(); return fromPbj(PublishStreamResponse.newBuilder().status(endOfStream).build()); } diff --git a/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java b/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java index 3ef939b07..184903c60 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java @@ -29,8 +29,8 @@ import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.BlockItemSet; import com.hedera.hapi.block.SubscribeStreamResponse; -import com.hedera.hapi.block.SubscribeStreamResponseSet; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.hapi.block.stream.BlockProof; import com.hedera.hapi.block.stream.input.EventHeader; @@ -56,28 +56,26 @@ public class ConsumerStreamResponseObserverTest { private static final int testTimeout = 1000; - @Mock private StreamMediator streamMediator; + @Mock + private StreamMediator streamMediator; @Mock - private StreamObserver - responseStreamObserver; + private StreamObserver responseStreamObserver; - @Mock private ObjectEvent objectEvent; + @Mock + private ObjectEvent objectEvent; @Mock - private ServerCallStreamObserver - serverCallStreamObserver; + private ServerCallStreamObserver serverCallStreamObserver; - @Mock private InstantSource testClock; + @Mock + private InstantSource testClock; final BlockNodeContext testContext; public ConsumerStreamResponseObserverTest() throws IOException { - this.testContext = - TestConfigUtil.getTestBlockNodeContext( - Map.of( - TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, - String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); + this.testContext = TestConfigUtil.getTestBlockNodeContext( + Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); } @Test @@ -86,15 +84,15 @@ public void testProducerTimeoutWithinWindow() { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var consumerBlockItemObserver = - new ConsumerStreamResponseObserver( - testClock, streamMediator, responseStreamObserver, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, responseStreamObserver, testContext); final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build(); - final BlockItem blockItem = BlockItem.newBuilder().blockHeader(blockHeader).build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); + final BlockItem blockItem = + BlockItem.newBuilder().blockHeader(blockHeader).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); when(objectEvent.get()).thenReturn(subscribeStreamResponse); @@ -104,8 +102,7 @@ public void testProducerTimeoutWithinWindow() { verify(responseStreamObserver).onNext(fromPbj(subscribeStreamResponse)); // verify the mediator is NOT called to unsubscribe the observer - verify(streamMediator, timeout(testTimeout).times(0)) - .unsubscribe(consumerBlockItemObserver); + verify(streamMediator, timeout(testTimeout).times(0)).unsubscribe(consumerBlockItemObserver); } @Test @@ -116,8 +113,7 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1); final var consumerBlockItemObserver = - new ConsumerStreamResponseObserver( - testClock, streamMediator, responseStreamObserver, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, responseStreamObserver, testContext); consumerBlockItemObserver.onEvent(objectEvent, 0, true); verify(streamMediator).unsubscribe(consumerBlockItemObserver); @@ -130,8 +126,7 @@ public void testHandlersSetOnObserver() throws InterruptedException { // millis() calls. Here the second call will always be inside the timeout window. when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); - new ConsumerStreamResponseObserver( - testClock, streamMediator, serverCallStreamObserver, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, serverCallStreamObserver, testContext); verify(serverCallStreamObserver, timeout(testTimeout).times(1)).setOnCloseHandler(any()); verify(serverCallStreamObserver, timeout(testTimeout).times(1)).setOnCancelHandler(any()); @@ -145,10 +140,10 @@ public void testResponseNotPermittedAfterCancel() { testClock, streamMediator, serverCallStreamObserver, testContext); final List blockItems = generateBlockItems(1); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); when(objectEvent.get()).thenReturn(subscribeStreamResponse); // Confirm that the observer is called with the first BlockItem @@ -161,8 +156,7 @@ public void testResponseNotPermittedAfterCancel() { consumerStreamResponseObserver.onEvent(objectEvent, 0, true); // Confirm that canceling the observer allowed only 1 response to be sent. - verify(serverCallStreamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + verify(serverCallStreamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); } @Test @@ -173,10 +167,10 @@ public void testResponseNotPermittedAfterClose() { testClock, streamMediator, serverCallStreamObserver, testContext); final List blockItems = generateBlockItems(1); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); when(objectEvent.get()).thenReturn(subscribeStreamResponse); // Confirm that the observer is called with the first BlockItem @@ -189,8 +183,7 @@ public void testResponseNotPermittedAfterClose() { consumerStreamResponseObserver.onEvent(objectEvent, 0, true); // Confirm that closing the observer allowed only 1 response to be sent. - verify(serverCallStreamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + verify(serverCallStreamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); } @Test @@ -201,32 +194,32 @@ public void testConsumerNotToSendBeforeBlockHeader() { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var consumerBlockItemObserver = - new ConsumerStreamResponseObserver( - testClock, streamMediator, responseStreamObserver, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, responseStreamObserver, testContext); // Send non-header BlockItems to validate that the observer does not send them for (int i = 1; i <= 10; i++) { if (i % 2 == 0) { - final EventHeader eventHeader = - EventHeader.newBuilder().eventCore(EventCore.newBuilder().build()).build(); - final BlockItem blockItem = BlockItem.newBuilder().eventHeader(eventHeader).build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); - final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build(); + final EventHeader eventHeader = EventHeader.newBuilder() + .eventCore(EventCore.newBuilder().build()) + .build(); + final BlockItem blockItem = + BlockItem.newBuilder().eventHeader(eventHeader).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); + final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder() + .blockItems(blockItemSet) + .build(); when(objectEvent.get()).thenReturn(subscribeStreamResponse); } else { final BlockProof blockProof = BlockProof.newBuilder().block(i).build(); - final BlockItem blockItem = BlockItem.newBuilder().blockProof(blockProof).build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); - final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build(); + final BlockItem blockItem = + BlockItem.newBuilder().blockProof(blockProof).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); + final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder() + .blockItems(blockItemSet) + .build(); when(objectEvent.get()).thenReturn(subscribeStreamResponse); } @@ -234,15 +227,14 @@ public void testConsumerNotToSendBeforeBlockHeader() { } final BlockItem blockItem = BlockItem.newBuilder().build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); // Confirm that the observer was called with the next BlockItem // since we never send a BlockItem with a Header to start the stream. - verify(responseStreamObserver, timeout(testTimeout).times(0)) - .onNext(fromPbj(subscribeStreamResponse)); + verify(responseStreamObserver, timeout(testTimeout).times(0)).onNext(fromPbj(subscribeStreamResponse)); } @Test @@ -252,23 +244,17 @@ public void testSubscriberStreamResponseIsBlockItemWhenBlockItemIsNull() { // being created with a null BlockItem. Here, I have to used a spy() to even // manufacture this scenario. This should not happen in production. final BlockItem blockItem = BlockItem.newBuilder().build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); - final SubscribeStreamResponse subscribeStreamResponse = - spy( - SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build()); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); + final SubscribeStreamResponse subscribeStreamResponse = spy( + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build()); when(subscribeStreamResponse.blockItems()).thenReturn(null); when(objectEvent.get()).thenReturn(subscribeStreamResponse); final var consumerBlockItemObserver = - new ConsumerStreamResponseObserver( - testClock, streamMediator, responseStreamObserver, testContext); - assertThrows( - IllegalArgumentException.class, - () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true)); + new ConsumerStreamResponseObserver(testClock, streamMediator, responseStreamObserver, testContext); + assertThrows(IllegalArgumentException.class, () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true)); } @Test @@ -279,30 +265,21 @@ public void testSubscribeStreamResponseTypeNotSupported() { when(objectEvent.get()).thenReturn(subscribeStreamResponse); final var consumerBlockItemObserver = - new ConsumerStreamResponseObserver( - testClock, streamMediator, responseStreamObserver, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, responseStreamObserver, testContext); - assertThrows( - IllegalArgumentException.class, - () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true)); + assertThrows(IllegalArgumentException.class, () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true)); } private static class TestConsumerStreamResponseObserver extends ConsumerStreamResponseObserver { public TestConsumerStreamResponseObserver( @NonNull final InstantSource producerLivenessClock, - @NonNull - final StreamMediator - subscriptionHandler, + @NonNull final StreamMediator subscriptionHandler, @NonNull final StreamObserver subscribeStreamResponseObserver, @NonNull final BlockNodeContext blockNodeContext) { - super( - producerLivenessClock, - subscriptionHandler, - subscribeStreamResponseObserver, - blockNodeContext); + super(producerLivenessClock, subscriptionHandler, subscribeStreamResponseObserver, blockNodeContext); } public void cancel() { diff --git a/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java index a5a6c07be..81622bc53 100644 --- a/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java @@ -47,6 +47,7 @@ import com.hedera.block.server.util.TestConfigUtil; import com.hedera.block.server.util.TestUtils; import com.hedera.hapi.block.Acknowledgement; +import com.hedera.hapi.block.BlockItemSet; import com.hedera.hapi.block.EndOfStream; import com.hedera.hapi.block.ItemAcknowledgement; import com.hedera.hapi.block.PublishStreamRequest; @@ -57,7 +58,6 @@ import com.hedera.hapi.block.SubscribeStreamRequest; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseCode; -import com.hedera.hapi.block.SubscribeStreamResponseSet; import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.io.buffer.Bytes; @@ -87,54 +87,50 @@ public class BlockStreamServiceIntegrationTest { private final Logger LOGGER = System.getLogger(getClass().getName()); - @Mock private Notifier notifier; + @Mock + private Notifier notifier; + + @Mock + private StreamObserver publishStreamResponseObserver1; @Mock - private StreamObserver - publishStreamResponseObserver1; + private StreamObserver publishStreamResponseObserver2; @Mock - private StreamObserver - publishStreamResponseObserver2; + private StreamObserver publishStreamResponseObserver3; @Mock - private StreamObserver - publishStreamResponseObserver3; + private StreamObserver singleBlockResponseStreamObserver; @Mock - private StreamObserver - singleBlockResponseStreamObserver; + private com.hedera.hapi.block.protoc.SubscribeStreamRequest subscribeStreamRequest; - @Mock private com.hedera.hapi.block.protoc.SubscribeStreamRequest subscribeStreamRequest; + @Mock + private StreamObserver subscribeStreamObserver1; @Mock - private StreamObserver - subscribeStreamObserver1; + private StreamObserver subscribeStreamObserver2; @Mock - private StreamObserver - subscribeStreamObserver2; + private StreamObserver subscribeStreamObserver3; @Mock - private StreamObserver - subscribeStreamObserver3; + private StreamObserver subscribeStreamObserver4; @Mock - private StreamObserver - subscribeStreamObserver4; + private StreamObserver subscribeStreamObserver5; @Mock - private StreamObserver - subscribeStreamObserver5; + private StreamObserver subscribeStreamObserver6; @Mock - private StreamObserver - subscribeStreamObserver6; + private WebServer webServer; - @Mock private WebServer webServer; + @Mock + private BlockReader blockReader; - @Mock private BlockReader blockReader; - @Mock private BlockWriter> blockWriter; + @Mock + private BlockWriter> blockWriter; private static final String TEMP_DIR = "block-node-unit-test-dir"; @@ -160,52 +156,41 @@ public void tearDown() { } @Test - public void testPublishBlockStreamRegistrationAndExecution() - throws IOException, NoSuchAlgorithmException { + public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException { final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final BlockStreamService blockStreamService = new BlockStreamService( + streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); // Register 3 producers - final StreamObserver - publishStreamObserver = - blockStreamService.protocPublishBlockStream(publishStreamResponseObserver1); + final StreamObserver publishStreamObserver = + blockStreamService.protocPublishBlockStream(publishStreamResponseObserver1); blockStreamService.protocPublishBlockStream(publishStreamResponseObserver2); blockStreamService.protocPublishBlockStream(publishStreamResponseObserver3); // Register 3 consumers - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); List blockItems = generateBlockItems(1); for (int i = 0; i < blockItems.size(); i++) { if (i == 9) { - when(blockWriter.write(List.of(blockItems.get(i)))) - .thenReturn(Optional.of(List.of(blockItems.get(i)))); + when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.of(List.of(blockItems.get(i)))); } else { when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.empty()); } } for (BlockItem blockItem : blockItems) { - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().blockItems(blockItem).build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(List.of(blockItem))) + .build(); // Calling onNext() as Helidon does with each block item for // the first producer. @@ -240,14 +225,11 @@ public void testPublishBlockStreamRegistrationAndExecution() PublishStreamResponse.newBuilder().acknowledgement(itemAck).build(); // Verify all 3 producers received the response - verify(publishStreamResponseObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(publishStreamResponse)); + verify(publishStreamResponseObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(publishStreamResponse)); - verify(publishStreamResponseObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(publishStreamResponse)); + verify(publishStreamResponseObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(publishStreamResponse)); - verify(publishStreamResponseObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(publishStreamResponse)); + verify(publishStreamResponseObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(publishStreamResponse)); // Close the stream as Helidon does publishStreamObserver.onCompleted(); @@ -264,28 +246,19 @@ public void testSubscribeBlockStream() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); // Build the BlockStreamService - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final BlockStreamService blockStreamService = new BlockStreamService( + streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); // Subscribe the consumers - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); // Subscribe the producer final StreamObserver streamObserver = @@ -293,10 +266,9 @@ public void testSubscribeBlockStream() throws IOException { // Build the BlockItem final List blockItems = generateBlockItems(1); - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder() - .blockItems(List.of(blockItems.getFirst())) - .build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(List.of(blockItems.getFirst()))) + .build(); // Calling onNext() with a BlockItem streamObserver.onNext(fromPbj(publishStreamRequest)); @@ -306,19 +278,15 @@ public void testSubscribeBlockStream() throws IOException { verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItems.getFirst())); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder() - .blockItems(List.of(blockItems.getFirst())) - .build(); + final BlockItemSet blockItemSet = BlockItemSet.newBuilder() + .blockItems(List.of(blockItems.getFirst())) + .build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); } @Test @@ -333,26 +301,21 @@ public void testFullHappyPath() throws IOException { final StreamObserver streamObserver = blockStreamService.protocPublishBlockStream(publishStreamResponseObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); final List blockItems = generateBlockItems(numberOfBlocks); for (BlockItem blockItem : blockItems) { - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().blockItems(blockItem).build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(List.of(blockItem))) + .build(); streamObserver.onNext(fromPbj(publishStreamRequest)); } - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); streamObserver.onCompleted(); @@ -373,21 +336,18 @@ public void testFullWithSubscribersAddedDynamically() { final List blockItems = generateBlockItems(numberOfBlocks); // Subscribe the initial consumers - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); for (int i = 0; i < blockItems.size(); i++) { - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().blockItems(blockItems.get(i)).build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(List.of(blockItems.get(i)))) + .build(); // Add a new subscriber if (i == 51) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver4); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver4); } // Transmit the BlockItem @@ -395,36 +355,28 @@ public void testFullWithSubscribersAddedDynamically() { // Add a new subscriber if (i == 76) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver5); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver5); } // Add a new subscriber if (i == 88) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver6); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver6); } } // Verify subscribers who were listening before the stream started - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); // Verify subscribers added while the stream was in progress. // The Helidon-provided StreamObserver onNext() method will only // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse( - numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); streamObserver.onCompleted(); } @@ -441,16 +393,10 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException { final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); final var streamMediator = buildStreamMediator(consumers, serviceStatus); - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final var blockStreamService = - new BlockStreamService( - streamMediator, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var blockStreamService = new BlockStreamService( + streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); // Pass a StreamObserver to the producer as Helidon does final StreamObserver streamObserver = @@ -458,21 +404,16 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException { final List blockItems = generateBlockItems(numberOfBlocks); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); for (int i = 0; i < blockItems.size(); i++) { // Transmit the BlockItem - streamObserver.onNext( - fromPbj( - PublishStreamRequest.newBuilder() - .blockItems(blockItems.get(i)) - .build())); + streamObserver.onNext(fromPbj(PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(List.of(blockItems.get(i)))) + .build())); // Remove 1st subscriber if (i == 10) { @@ -494,8 +435,7 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException { // Add a new subscriber if (i == 51) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver4); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver4); } // Remove 3rd subscriber @@ -509,14 +449,12 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException { // Add a new subscriber if (i == 76) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver5); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver5); } // Add a new subscriber if (i == 88) { - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver6); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver6); } } @@ -530,12 +468,9 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException { // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse( - numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse( - numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); streamObserver.onCompleted(); } @@ -563,36 +498,27 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep final var streamMediator = buildStreamMediator(consumers, serviceStatus); final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final var blockStreamService = - new BlockStreamService( - streamMediator, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var blockStreamService = new BlockStreamService( + streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); final BlockAccessService blockAccessService = - new BlockAccessService( - serviceStatus, blockReader, blockNodeContext.metricsService()); + new BlockAccessService(serviceStatus, blockReader, blockNodeContext.metricsService()); // Subscribe the consumers - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver1); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver2); - blockStreamService.protocSubscribeBlockStream( - subscribeStreamRequest, subscribeStreamObserver3); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.protocSubscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); // Initialize the producer final StreamObserver streamObserver = blockStreamService.protocPublishBlockStream(publishStreamResponseObserver1); // Transmit a BlockItem - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().blockItems(blockItems).build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new BlockItemSet(blockItems)) + .build(); streamObserver.onNext(fromPbj(publishStreamRequest)); // Use verify to make sure the serviceStatus.stopRunning() method is called @@ -601,9 +527,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep // Simulate another producer attempting to connect to the Block Node after the exception. // Later, verify they received a response indicating the stream is closed. - final StreamObserver - expectedNoOpStreamObserver = - blockStreamService.protocPublishBlockStream(publishStreamResponseObserver2); + final StreamObserver expectedNoOpStreamObserver = + blockStreamService.protocPublishBlockStream(publishStreamResponseObserver2); expectedNoOpStreamObserver.onNext(fromPbj(publishStreamRequest)); verify(publishStreamResponseObserver2, timeout(testTimeout).times(1)) @@ -622,35 +547,27 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep final SubscribeStreamRequest subscribeStreamRequest = SubscribeStreamRequest.newBuilder().startBlockNumber(1).build(); // Simulate a consumer attempting to connect to the Block Node after the exception. - blockStreamService.protocSubscribeBlockStream( - fromPbj(subscribeStreamRequest), subscribeStreamObserver4); + blockStreamService.protocSubscribeBlockStream(fromPbj(subscribeStreamRequest), subscribeStreamObserver4); // The BlockItem expected to pass through since it was published // before the IOException was thrown. - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); // Verify all the consumers received the end of stream response // TODO: Fix the response code when it's available - final SubscribeStreamResponse endStreamResponse = - SubscribeStreamResponse.newBuilder() - .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) - .build(); - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(endStreamResponse)); - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(endStreamResponse)); - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(endStreamResponse)); + final SubscribeStreamResponse endStreamResponse = SubscribeStreamResponse.newBuilder() + .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); + verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(endStreamResponse)); + verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(endStreamResponse)); + verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(endStreamResponse)); assertEquals(3, consumers.size()); @@ -665,19 +582,17 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep // Verify the singleBlock service returned the expected // error code indicating the service is not available. - final SingleBlockResponse expectedSingleBlockNotAvailable = - SingleBlockResponse.newBuilder() - .status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) - .build(); + final SingleBlockResponse expectedSingleBlockNotAvailable = SingleBlockResponse.newBuilder() + .status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) + .build(); verify(singleBlockResponseStreamObserver, timeout(testTimeout).times(1)) .onNext(fromPbj(expectedSingleBlockNotAvailable)); // TODO: Fix the response code when it's available - final SubscribeStreamResponse expectedSubscriberStreamNotAvailable = - SubscribeStreamResponse.newBuilder() - .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) - .build(); + final SubscribeStreamResponse expectedSubscriberStreamNotAvailable = SubscribeStreamResponse.newBuilder() + .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); verify(subscribeStreamObserver4, timeout(testTimeout).times(1)) .onNext(fromPbj(expectedSubscriberStreamNotAvailable)); } @@ -698,52 +613,44 @@ private static void verifySubscribeStreamResponse( } final BlockItem headerBlockItem = blockItems.get(block); - final SubscribeStreamResponse headerSubStreamResponse = - buildSubscribeStreamResponse(headerBlockItem); + final SubscribeStreamResponse headerSubStreamResponse = buildSubscribeStreamResponse(headerBlockItem); final BlockItem bodyBlockItem = blockItems.get(block + 1); - final SubscribeStreamResponse bodySubStreamResponse = - buildSubscribeStreamResponse(bodyBlockItem); + final SubscribeStreamResponse bodySubStreamResponse = buildSubscribeStreamResponse(bodyBlockItem); final BlockItem stateProofBlockItem = blockItems.get(block + 9); - final SubscribeStreamResponse stateProofStreamResponse = - buildSubscribeStreamResponse(stateProofBlockItem); - - verify(streamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(headerSubStreamResponse)); - verify(streamObserver, timeout(testTimeout).times(8)) - .onNext(fromPbj(bodySubStreamResponse)); - verify(streamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(stateProofStreamResponse)); + final SubscribeStreamResponse stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); + + verify(streamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(headerSubStreamResponse)); + verify(streamObserver, timeout(testTimeout).times(8)).onNext(fromPbj(bodySubStreamResponse)); + verify(streamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(stateProofStreamResponse)); } } private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem blockItem) { - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); - return SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + final BlockItemSet subscribeStreamResponseSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); + return SubscribeStreamResponse.newBuilder() + .blockItems(subscribeStreamResponseSet) + .build(); } private static PublishStreamResponse buildEndOfStreamResponse() { - final EndOfStream endOfStream = - EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) - .build(); + final EndOfStream endOfStream = EndOfStream.newBuilder() + .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .build(); return PublishStreamResponse.newBuilder().status(endOfStream).build(); } - private BlockStreamService buildBlockStreamService( - final BlockWriter> blockWriter) { + private BlockStreamService buildBlockStreamService(final BlockWriter> blockWriter) { final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); final var streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus); final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - return new BlockStreamService( - streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); + return new BlockStreamService(streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); } private LiveStreamMediator buildStreamMediator( @@ -760,12 +667,10 @@ private LiveStreamMediator buildStreamMediator( .build(); } - public static Acknowledgement buildAck(@NonNull final List blockItems) - throws NoSuchAlgorithmException { - ItemAcknowledgement itemAck = - ItemAcknowledgement.newBuilder() - .itemsHash(Bytes.wrap(getFakeHash(blockItems))) - .build(); + public static Acknowledgement buildAck(@NonNull final List blockItems) throws NoSuchAlgorithmException { + ItemAcknowledgement itemAck = ItemAcknowledgement.newBuilder() + .itemsHash(Bytes.wrap(getFakeHash(blockItems))) + .build(); return Acknowledgement.newBuilder().itemAck(itemAck).build(); } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 1b06334bf..f079bd5a1 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -41,9 +41,9 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.BlockItemSet; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseCode; -import com.hedera.hapi.block.SubscribeStreamResponseSet; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.hapi.block.stream.output.BlockHeader; import com.swirlds.metrics.api.LongGauge; @@ -64,12 +64,20 @@ @ExtendWith(MockitoExtension.class) public class LiveStreamMediatorImplTest { - @Mock private BlockNodeEventHandler> observer1; - @Mock private BlockNodeEventHandler> observer2; - @Mock private BlockNodeEventHandler> observer3; + @Mock + private BlockNodeEventHandler> observer1; + + @Mock + private BlockNodeEventHandler> observer2; - @Mock private BlockWriter> blockWriter; - @Mock private Notifier notifier; + @Mock + private BlockNodeEventHandler> observer3; + + @Mock + private BlockWriter> blockWriter; + + @Mock + private Notifier notifier; @Mock private StreamObserver streamObserver1; @@ -81,10 +89,10 @@ public class LiveStreamMediatorImplTest { private StreamObserver streamObserver3; @Mock - private ServerCallStreamObserver - serverCallStreamObserver; + private ServerCallStreamObserver serverCallStreamObserver; - @Mock private InstantSource testClock; + @Mock + private InstantSource testClock; private final long TIMEOUT_THRESHOLD_MILLIS = 100L; private final long TEST_TIME = 1_719_427_664_950L; @@ -95,9 +103,7 @@ public class LiveStreamMediatorImplTest { public LiveStreamMediatorImplTest() throws IOException { Map properties = new HashMap<>(); - properties.put( - TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, - String.valueOf(TIMEOUT_THRESHOLD_MILLIS)); + properties.put(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS)); properties.put(TestConfigUtil.MEDIATOR_RING_BUFFER_SIZE_KEY, String.valueOf(1024)); this.testContext = TestConfigUtil.getTestBlockNodeContext(properties); @@ -108,8 +114,7 @@ public void testUnsubscribeEach() throws InterruptedException, IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final var streamMediatorBuilder = - LiveStreamMediatorBuilder.newBuilder( - blockNodeContext, new ServiceStatusImpl(blockNodeContext)); + LiveStreamMediatorBuilder.newBuilder(blockNodeContext, new ServiceStatusImpl(blockNodeContext)); final var streamMediator = streamMediatorBuilder.build(); // Set up the subscribers @@ -117,32 +122,20 @@ public void testUnsubscribeEach() throws InterruptedException, IOException { streamMediator.subscribe(observer2); streamMediator.subscribe(observer3); - assertTrue( - streamMediator.isSubscribed(observer1), - "Expected the mediator to have observer1 subscribed"); - assertTrue( - streamMediator.isSubscribed(observer2), - "Expected the mediator to have observer2 subscribed"); - assertTrue( - streamMediator.isSubscribed(observer3), - "Expected the mediator to have observer3 subscribed"); + assertTrue(streamMediator.isSubscribed(observer1), "Expected the mediator to have observer1 subscribed"); + assertTrue(streamMediator.isSubscribed(observer2), "Expected the mediator to have observer2 subscribed"); + assertTrue(streamMediator.isSubscribed(observer3), "Expected the mediator to have observer3 subscribed"); Thread.sleep(100); streamMediator.unsubscribe(observer1); - assertFalse( - streamMediator.isSubscribed(observer1), - "Expected the mediator to have unsubscribed observer1"); + assertFalse(streamMediator.isSubscribed(observer1), "Expected the mediator to have unsubscribed observer1"); streamMediator.unsubscribe(observer2); - assertFalse( - streamMediator.isSubscribed(observer2), - "Expected the mediator to have unsubscribed observer2"); + assertFalse(streamMediator.isSubscribed(observer2), "Expected the mediator to have unsubscribed observer2"); streamMediator.unsubscribe(observer3); - assertFalse( - streamMediator.isSubscribed(observer3), - "Expected the mediator to have unsubscribed observer3"); + assertFalse(streamMediator.isSubscribed(observer3), "Expected the mediator to have unsubscribed observer3"); // Confirm the counter was never incremented assertEquals(0, blockNodeContext.metricsService().get(LiveBlockItems).get()); @@ -153,15 +146,14 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); final BlockItem blockItem = BlockItem.newBuilder().build(); // register the stream validator when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -180,22 +172,19 @@ public void testMediatorPublishEventToSubscribers() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var concreteObserver1 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver1, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); final var concreteObserver2 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver2, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); final var concreteObserver3 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver3, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -203,27 +192,24 @@ public void testMediatorPublishEventToSubscribers() throws IOException { streamMediator.subscribe(concreteObserver3); assertTrue( - streamMediator.isSubscribed(concreteObserver1), - "Expected the mediator to have observer1 subscribed"); + streamMediator.isSubscribed(concreteObserver1), "Expected the mediator to have observer1 subscribed"); assertTrue( - streamMediator.isSubscribed(concreteObserver2), - "Expected the mediator to have observer2 subscribed"); + streamMediator.isSubscribed(concreteObserver2), "Expected the mediator to have observer2 subscribed"); assertTrue( - streamMediator.isSubscribed(concreteObserver3), - "Expected the mediator to have observer3 subscribed"); + streamMediator.isSubscribed(concreteObserver3), "Expected the mediator to have observer3 subscribed"); final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build(); - final BlockItem blockItem = BlockItem.newBuilder().blockHeader(blockHeader).build(); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build(); + final BlockItem blockItem = + BlockItem.newBuilder().blockHeader(blockHeader).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItem).build(); final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); // register the stream validator when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); streamMediator.subscribe(streamValidator); // Acting as a producer, notify the mediator of a new block @@ -232,12 +218,9 @@ public void testMediatorPublishEventToSubscribers() throws IOException { assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); // Confirm each subscriber was notified of the new block - verify(streamObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(streamObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(streamObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + verify(streamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(streamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(streamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); // Confirm the BlockStorage write method was called verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItem)); @@ -248,22 +231,19 @@ public void testSubAndUnsubHandling() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var concreteObserver1 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver1, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); final var concreteObserver2 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver2, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); final var concreteObserver3 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver3, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -281,15 +261,13 @@ public void testSubAndUnsubHandling() throws IOException { @Test public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - final LongGauge consumersGauge = - blockNodeContext.metricsService().get(BlockNodeMetricTypes.Gauge.Consumers); + final LongGauge consumersGauge = blockNodeContext.metricsService().get(BlockNodeMetricTypes.Gauge.Consumers); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); final var concreteObserver1 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver1, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); streamMediator.subscribe(concreteObserver1); assertTrue(streamMediator.isSubscribed(concreteObserver1)); @@ -313,8 +291,8 @@ public void testOnCancelSubscriptionHandling() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); @@ -322,15 +300,13 @@ public void testOnCancelSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(List.of(blockItems.getFirst()))).thenReturn(Optional.empty()); - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); streamMediator.subscribe(streamValidator); // register the test observer - final var testConsumerBlockItemObserver = - new TestConsumerStreamResponseObserver( - testClock, streamMediator, serverCallStreamObserver, testContext); + final var testConsumerBlockItemObserver = new TestConsumerStreamResponseObserver( + testClock, streamMediator, serverCallStreamObserver, testContext); streamMediator.subscribe(testConsumerBlockItemObserver); assertTrue(streamMediator.isSubscribed(testConsumerBlockItemObserver)); @@ -362,8 +338,8 @@ public void testOnCloseSubscriptionHandling() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); // testClock configured to be outside the timeout window when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1); @@ -372,14 +348,12 @@ public void testOnCloseSubscriptionHandling() throws IOException { // register the stream validator when(blockWriter.write(List.of(blockItems.getFirst()))).thenReturn(Optional.empty()); - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); streamMediator.subscribe(streamValidator); - final var testConsumerBlockItemObserver = - new TestConsumerStreamResponseObserver( - testClock, streamMediator, serverCallStreamObserver, testContext); + final var testConsumerBlockItemObserver = new TestConsumerStreamResponseObserver( + testClock, streamMediator, serverCallStreamObserver, testContext); streamMediator.subscribe(testConsumerBlockItemObserver); assertTrue(streamMediator.isSubscribed(testConsumerBlockItemObserver)); @@ -411,20 +385,17 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); final var concreteObserver1 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver1, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); final var concreteObserver2 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver2, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); final var concreteObserver3 = - new ConsumerStreamResponseObserver( - testClock, streamMediator, streamObserver3, testContext); + new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -432,9 +403,8 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr streamMediator.subscribe(concreteObserver3); final Notifier notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); // Set up the stream verifier streamMediator.subscribe(streamValidator); @@ -457,26 +427,27 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); // Confirm the error counter was incremented - assertEquals(1, blockNodeContext.metricsService().get(LiveBlockStreamMediatorError).get()); + assertEquals( + 1, + blockNodeContext + .metricsService() + .get(LiveBlockStreamMediatorError) + .get()); // Send another block item after the exception streamMediator.publish(List.of(blockItems.get(1))); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(firstBlockItem).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(firstBlockItem).build(); final var subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); - verify(streamObserver1, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(streamObserver2, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); - verify(streamObserver3, timeout(testTimeout).times(1)) - .onNext(fromPbj(subscribeStreamResponse)); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); + verify(streamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(streamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); + verify(streamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(subscribeStreamResponse)); // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? - final SubscribeStreamResponse endOfStreamResponse = - SubscribeStreamResponse.newBuilder() - .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) - .build(); + final SubscribeStreamResponse endOfStreamResponse = SubscribeStreamResponse.newBuilder() + .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); verify(streamObserver1, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse)); verify(streamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse)); verify(streamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse)); @@ -490,18 +461,16 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); // register the stream validator - final var streamValidator = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + final var streamValidator = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); streamMediator.subscribe(streamValidator); - final var testConsumerBlockItemObserver = - new TestConsumerStreamResponseObserver( - testClock, streamMediator, serverCallStreamObserver, testContext); + final var testConsumerBlockItemObserver = new TestConsumerStreamResponseObserver( + testClock, streamMediator, serverCallStreamObserver, testContext); // Confirm the observer is not subscribed assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); @@ -519,9 +488,7 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { private static class TestConsumerStreamResponseObserver extends ConsumerStreamResponseObserver { public TestConsumerStreamResponseObserver( @NonNull final InstantSource producerLivenessClock, - @NonNull - final StreamMediator, SubscribeStreamResponse> - streamMediator, + @NonNull final StreamMediator, SubscribeStreamResponse> streamMediator, @NonNull final StreamObserver responseStreamObserver, diff --git a/server/src/test/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImplTest.java b/server/src/test/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImplTest.java index 45be5407c..97c8cee9b 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImplTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImplTest.java @@ -34,8 +34,8 @@ import com.hedera.block.server.persistence.storage.write.BlockWriter; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.BlockItemSet; import com.hedera.hapi.block.SubscribeStreamResponse; -import com.hedera.hapi.block.SubscribeStreamResponseSet; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.OneOf; import java.io.IOException; @@ -48,17 +48,23 @@ @ExtendWith(MockitoExtension.class) public class StreamPersistenceHandlerImplTest { - @Mock private SubscriptionHandler subscriptionHandler; + @Mock + private SubscriptionHandler subscriptionHandler; - @Mock private BlockWriter> blockWriter; + @Mock + private BlockWriter> blockWriter; - @Mock private Notifier notifier; + @Mock + private Notifier notifier; - @Mock private BlockNodeContext blockNodeContext; + @Mock + private BlockNodeContext blockNodeContext; - @Mock private ServiceStatus serviceStatus; + @Mock + private ServiceStatus serviceStatus; - @Mock private MetricsService metricsService; + @Mock + private MetricsService metricsService; private static final int testTimeout = 0; @@ -68,19 +74,14 @@ public void testOnEventWhenServiceIsNotRunning() { when(blockNodeContext.metricsService()).thenReturn(metricsService); when(serviceStatus.isRunning()).thenReturn(false); - final var streamPersistenceHandler = - new StreamPersistenceHandlerImpl( - subscriptionHandler, - notifier, - blockWriter, - blockNodeContext, - serviceStatus); + final var streamPersistenceHandler = new StreamPersistenceHandlerImpl( + subscriptionHandler, notifier, blockWriter, blockNodeContext, serviceStatus); final List blockItems = generateBlockItems(1); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); final var subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build(); + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); final ObjectEvent event = new ObjectEvent<>(); event.set(subscribeStreamResponse); @@ -98,22 +99,14 @@ public void testBlockItemIsNull() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); when(serviceStatus.isRunning()).thenReturn(true); - final var streamPersistenceHandler = - new StreamPersistenceHandlerImpl( - subscriptionHandler, - notifier, - blockWriter, - blockNodeContext, - serviceStatus); + final var streamPersistenceHandler = new StreamPersistenceHandlerImpl( + subscriptionHandler, notifier, blockWriter, blockNodeContext, serviceStatus); final List blockItems = generateBlockItems(1); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); - final var subscribeStreamResponse = - spy( - SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build()); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); + final var subscribeStreamResponse = spy( + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build()); // Force the block item to be null when(subscribeStreamResponse.blockItems()).thenReturn(null); @@ -132,22 +125,14 @@ public void testSubscribeStreamResponseTypeUnknown() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); when(serviceStatus.isRunning()).thenReturn(true); - final var streamPersistenceHandler = - new StreamPersistenceHandlerImpl( - subscriptionHandler, - notifier, - blockWriter, - blockNodeContext, - serviceStatus); + final var streamPersistenceHandler = new StreamPersistenceHandlerImpl( + subscriptionHandler, notifier, blockWriter, blockNodeContext, serviceStatus); final List blockItems = generateBlockItems(1); - final SubscribeStreamResponseSet subscribeStreamResponseSet = - SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build(); - final var subscribeStreamResponse = - spy( - SubscribeStreamResponse.newBuilder() - .blockItems(subscribeStreamResponseSet) - .build()); + final BlockItemSet blockItemSet = + BlockItemSet.newBuilder().blockItems(blockItems).build(); + final var subscribeStreamResponse = spy( + SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build()); // Force the block item to be UNSET final OneOf illegalOneOf = @@ -169,16 +154,11 @@ public void testSubscribeStreamResponseTypeStatus() { when(blockNodeContext.metricsService()).thenReturn(metricsService); when(serviceStatus.isRunning()).thenReturn(true); - final var streamPersistenceHandler = - new StreamPersistenceHandlerImpl( - subscriptionHandler, - notifier, - blockWriter, - blockNodeContext, - serviceStatus); + final var streamPersistenceHandler = new StreamPersistenceHandlerImpl( + subscriptionHandler, notifier, blockWriter, blockNodeContext, serviceStatus); - final SubscribeStreamResponse subscribeStreamResponse = - spy(SubscribeStreamResponse.newBuilder().status(READ_STREAM_SUCCESS).build()); + final SubscribeStreamResponse subscribeStreamResponse = spy( + SubscribeStreamResponse.newBuilder().status(READ_STREAM_SUCCESS).build()); final ObjectEvent event = new ObjectEvent<>(); event.set(subscribeStreamResponse); diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java index bcde8e3ea..0d1ab7b7b 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java @@ -35,28 +35,26 @@ void testPersistenceStorageConfig_happyPath() throws IOException { Path testPath = Files.createTempDirectory(TEMP_DIR); - PersistenceStorageConfig persistenceStorageConfig = - new PersistenceStorageConfig(testPath.toString()); + PersistenceStorageConfig persistenceStorageConfig = new PersistenceStorageConfig(testPath.toString()); assertEquals(testPath.toString(), persistenceStorageConfig.rootPath()); } @Test void testPersistenceStorageConfig_emptyRootPath() throws IOException { final String expectedDefaultRootPath = - Paths.get("").toAbsolutePath().resolve("data").toString(); + Paths.get("").toAbsolutePath().resolve("data_empty").toString(); // delete if exists deleteDirectory(Paths.get(expectedDefaultRootPath)); - PersistenceStorageConfig persistenceStorageConfig = new PersistenceStorageConfig(""); + PersistenceStorageConfig persistenceStorageConfig = + new PersistenceStorageConfig(getAbsoluteFolder("data_empty")); assertEquals(expectedDefaultRootPath, persistenceStorageConfig.rootPath()); } @Test void persistenceStorageConfig_throwsExceptionForRelativePath() { IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> new PersistenceStorageConfig("relative/path")); + assertThrows(IllegalArgumentException.class, () -> new PersistenceStorageConfig("relative/path")); assertEquals("relative/path Root path must be absolute", exception.getMessage()); } @@ -65,9 +63,7 @@ void persistenceStorageConfig_throwsRuntimeExceptionOnIOException() { Path invalidPath = Paths.get("/invalid/path"); RuntimeException exception = - assertThrows( - RuntimeException.class, - () -> new PersistenceStorageConfig(invalidPath.toString())); + assertThrows(RuntimeException.class, () -> new PersistenceStorageConfig(invalidPath.toString())); assertInstanceOf(IOException.class, exception.getCause()); } @@ -76,15 +72,17 @@ public static void deleteDirectory(Path path) throws IOException { return; } try (Stream walk = Files.walk(path)) { - walk.sorted(Comparator.reverseOrder()) - .forEach( - p -> { - try { - Files.delete(p); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + walk.sorted(Comparator.reverseOrder()).forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } } + + private String getAbsoluteFolder(String relativePath) { + return Paths.get(relativePath).toAbsolutePath().toString(); + } } diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index 5f0e9bf0c..e12d6e2db 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -39,6 +39,7 @@ import com.hedera.hapi.block.PublishStreamRequest; import com.hedera.hapi.block.PublishStreamResponse; import com.hedera.hapi.block.PublishStreamResponseCode; +import com.hedera.hapi.block.protoc.BlockItemSet; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; @@ -58,20 +59,26 @@ @ExtendWith(MockitoExtension.class) public class ProducerBlockItemObserverTest { - @Mock private InstantSource testClock; - @Mock private Publisher> publisher; - @Mock private SubscriptionHandler subscriptionHandler; + @Mock + private InstantSource testClock; + + @Mock + private Publisher> publisher; + + @Mock + private SubscriptionHandler subscriptionHandler; + + @Mock + private StreamObserver publishStreamResponseObserver; @Mock - private StreamObserver - publishStreamResponseObserver; + private ServerCallStreamObserver serverCallStreamObserver; @Mock - private ServerCallStreamObserver - serverCallStreamObserver; + private ServiceStatus serviceStatus; - @Mock private ServiceStatus serviceStatus; - @Mock private ObjectEvent objectEvent; + @Mock + private ObjectEvent objectEvent; private final long TIMEOUT_THRESHOLD_MILLIS = 50L; private static final int testTimeout = 1000; @@ -80,25 +87,21 @@ public class ProducerBlockItemObserverTest { @BeforeEach public void setUp() throws IOException { - this.testContext = - TestConfigUtil.getTestBlockNodeContext( - Map.of( - TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, - String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); + this.testContext = TestConfigUtil.getTestBlockNodeContext( + Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); } @Test public void testOnError() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - final ProducerBlockItemObserver producerBlockItemObserver = - new ProducerBlockItemObserver( - testClock, - publisher, - subscriptionHandler, - publishStreamResponseObserver, - blockNodeContext, - serviceStatus); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, + publisher, + subscriptionHandler, + publishStreamResponseObserver, + blockNodeContext, + serviceStatus); final Throwable t = new Throwable("Test error"); producerBlockItemObserver.onError(t); @@ -109,14 +112,13 @@ public void testOnError() throws IOException { public void testBlockItemThrowsParseException() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - final ProducerBlockItemObserver producerBlockItemObserver = - new ProducerBlockItemObserver( - testClock, - publisher, - subscriptionHandler, - publishStreamResponseObserver, - blockNodeContext, - serviceStatus); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, + publisher, + subscriptionHandler, + publishStreamResponseObserver, + blockNodeContext, + serviceStatus); // Create a pbj block item final List blockItems = generateBlockItems(1); @@ -135,25 +137,24 @@ public void testBlockItemThrowsParseException() throws IOException { // create the PublishStreamRequest with the spy block item final com.hedera.hapi.block.protoc.PublishStreamRequest protocPublishStreamRequest = com.hedera.hapi.block.protoc.PublishStreamRequest.newBuilder() - .addBlockItems(protocBlockItem) + .setBlockItems(BlockItemSet.newBuilder().addBlockItems(protocBlockItem)) .build(); // call the producerBlockItemObserver producerBlockItemObserver.onNext(protocPublishStreamRequest); // TODO: Replace this with a real error enum. - final EndOfStream endOfStream = - EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) - .build(); + final EndOfStream endOfStream = EndOfStream.newBuilder() + .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .build(); fromPbj(PublishStreamResponse.newBuilder().status(endOfStream).build()); // verify the ProducerBlockItemObserver has sent an error response verify( publishStreamResponseObserver, - timeout(testTimeout) - .atLeast(1)) // TODO: it calls more than 1 usually 2, but why? - .onNext(fromPbj(PublishStreamResponse.newBuilder().status(endOfStream).build())); + timeout(testTimeout).atLeast(1)) // TODO: it calls more than 1 usually 2, but why? + .onNext(fromPbj( + PublishStreamResponse.newBuilder().status(endOfStream).build())); verify(serviceStatus, timeout(testTimeout).times(1)).stopWebServer(any()); } @@ -161,24 +162,16 @@ public void testBlockItemThrowsParseException() throws IOException { @Test public void testResponseNotPermittedAfterCancel() throws NoSuchAlgorithmException { - final TestProducerBlockItemObserver producerStreamResponseObserver = - new TestProducerBlockItemObserver( - testClock, - publisher, - subscriptionHandler, - serverCallStreamObserver, - testContext, - serviceStatus); + final TestProducerBlockItemObserver producerStreamResponseObserver = new TestProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, serverCallStreamObserver, testContext, serviceStatus); final List blockItems = generateBlockItems(1); - final ItemAcknowledgement itemAck = - ItemAcknowledgement.newBuilder() - .itemsHash(Bytes.wrap(getFakeHash(blockItems))) - .build(); - final PublishStreamResponse publishStreamResponse = - PublishStreamResponse.newBuilder() - .acknowledgement(Acknowledgement.newBuilder().itemAck(itemAck).build()) - .build(); + final ItemAcknowledgement itemAck = ItemAcknowledgement.newBuilder() + .itemsHash(Bytes.wrap(getFakeHash(blockItems))) + .build(); + final PublishStreamResponse publishStreamResponse = PublishStreamResponse.newBuilder() + .acknowledgement(Acknowledgement.newBuilder().itemAck(itemAck).build()) + .build(); when(objectEvent.get()).thenReturn(publishStreamResponse); // Confirm that the observer is called with the first BlockItem @@ -191,31 +184,22 @@ public void testResponseNotPermittedAfterCancel() throws NoSuchAlgorithmExceptio producerStreamResponseObserver.onEvent(objectEvent, 0, true); // Confirm that canceling the observer allowed only 1 response to be sent. - verify(serverCallStreamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(publishStreamResponse)); + verify(serverCallStreamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(publishStreamResponse)); } @Test public void testResponseNotPermittedAfterClose() throws NoSuchAlgorithmException { - final TestProducerBlockItemObserver producerBlockItemObserver = - new TestProducerBlockItemObserver( - testClock, - publisher, - subscriptionHandler, - serverCallStreamObserver, - testContext, - serviceStatus); + final TestProducerBlockItemObserver producerBlockItemObserver = new TestProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, serverCallStreamObserver, testContext, serviceStatus); final List blockItems = generateBlockItems(1); - final ItemAcknowledgement itemAck = - ItemAcknowledgement.newBuilder() - .itemsHash(Bytes.wrap(getFakeHash(blockItems))) - .build(); - final PublishStreamResponse publishStreamResponse = - PublishStreamResponse.newBuilder() - .acknowledgement(Acknowledgement.newBuilder().itemAck(itemAck).build()) - .build(); + final ItemAcknowledgement itemAck = ItemAcknowledgement.newBuilder() + .itemsHash(Bytes.wrap(getFakeHash(blockItems))) + .build(); + final PublishStreamResponse publishStreamResponse = PublishStreamResponse.newBuilder() + .acknowledgement(Acknowledgement.newBuilder().itemAck(itemAck).build()) + .build(); when(objectEvent.get()).thenReturn(publishStreamResponse); // Confirm that the observer is called with the first BlockItem @@ -228,8 +212,7 @@ public void testResponseNotPermittedAfterClose() throws NoSuchAlgorithmException producerBlockItemObserver.onEvent(objectEvent, 0, true); // Confirm that closing the observer allowed only 1 response to be sent. - verify(serverCallStreamObserver, timeout(testTimeout).times(1)) - .onNext(fromPbj(publishStreamResponse)); + verify(serverCallStreamObserver, timeout(testTimeout).times(1)).onNext(fromPbj(publishStreamResponse)); } @Test @@ -237,18 +220,13 @@ public void testOnlyErrorStreamResponseAllowedAfterStatusChange() { final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext); - final ProducerBlockItemObserver producerBlockItemObserver = - new ProducerBlockItemObserver( - testClock, - publisher, - subscriptionHandler, - serverCallStreamObserver, - testContext, - serviceStatus); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, serverCallStreamObserver, testContext, serviceStatus); final List blockItems = generateBlockItems(1); - final PublishStreamRequest publishStreamRequest = - PublishStreamRequest.newBuilder().blockItems(blockItems).build(); + final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() + .blockItems(new com.hedera.hapi.block.BlockItemSet(blockItems)) + .build(); // Confirm that the observer is called with the first BlockItem producerBlockItemObserver.onNext(fromPbj(publishStreamRequest)); diff --git a/settings.gradle.kts b/settings.gradle.kts index 9e2361aa9..fb15cd766 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -80,8 +80,8 @@ dependencyResolutionManagement { version("com.google.protobuf.util", protobufVersion) // PBJ dependencies - plugin("pbj", "com.hedera.pbj.pbj-compiler").version("0.9.2") - version("com.hedera.pbj.runtime", "0.9.2") + plugin("pbj", "com.hedera.pbj.pbj-compiler").version("0.9.8") + version("com.hedera.pbj.runtime", "0.9.8") version("org.antlr.antlr4.runtime", "4.13.1") version("java.annotation", "1.3.2") diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java index 63b11dfcb..b3dd94b36 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -25,6 +25,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.data.GrpcConfig; import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.hapi.block.protoc.BlockItemSet; import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc; import com.hedera.hapi.block.protoc.PublishStreamRequest; import com.hedera.hapi.block.stream.Block; @@ -98,7 +99,9 @@ public boolean streamBlockItem(List blockItems) { } requestStreamObserver.onNext(PublishStreamRequest.newBuilder() - .addAllBlockItems(blockItemsProtoc) + .setBlockItems(BlockItemSet.newBuilder() + .addAllBlockItems(blockItemsProtoc) + .build()) .build()); metricsService.get(LiveBlockItemsSent).add(blockItemsProtoc.size()); LOGGER.log( @@ -127,7 +130,9 @@ public boolean streamBlock(Block block) { ChunkUtils.chunkify(blockItemsProtoc, blockStreamConfig.blockItemsBatchSize()); for (List streamingBatch : streamingBatches) { requestStreamObserver.onNext(PublishStreamRequest.newBuilder() - .addAllBlockItems(streamingBatch) + .setBlockItems(BlockItemSet.newBuilder() + .addAllBlockItems(streamingBatch) + .build()) .build()); metricsService.get(LiveBlockItemsSent).add(streamingBatch.size()); LOGGER.log( diff --git a/stream/build.gradle.kts b/stream/build.gradle.kts index eaf519835..64f5f971c 100644 --- a/stream/build.gradle.kts +++ b/stream/build.gradle.kts @@ -33,10 +33,10 @@ tasks.withType().configureEach { tasks.cloneHederaProtobufs { // uncomment below to use a specific tag // tag = "v0.53.0" or a specific commit like "0047255" - // tag = "d5e6988" + tag = "1033f10" // uncomment below to use a specific branch - branch = "block-repeated-fix-tss-message" + // branch = "main" } sourceSets {