diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index c292c9f3c..77fb3b3e0 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -107,9 +107,9 @@ public String serviceName() { } /** - * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming method - * for publishBlockStream, server streaming method for subscribeBlockStream and a unary method - * for singleBlock. + * Updates the routing definitions for the BlockStreamService. It establishes the bidirectional + * streaming method for publishBlockStream, server streaming method for subscribeBlockStream and + * a unary method for singleBlock. * * @param routing the routing for the BlockStreamService */ @@ -272,7 +272,7 @@ void singleBlock( @NonNull final MetricsService metricsService = blockNodeContext.metricsService(); - metricsService.singleBlockRetrievedCounter.increment(); + metricsService.singleBlocksRetrieved.increment(); } else { LOGGER.log( System.Logger.Level.DEBUG, "Block number {0} not found", blockNumber); diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 85768d173..0c392c595 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -74,8 +74,8 @@ public static void main(final String[] args) { .build(); @NonNull final StreamMediator> streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext) - .serviceStatus(serviceStatus) + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, serviceStatus) .build(); @NonNull diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorBuilder.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorBuilder.java index 43b23adde..661c8f51a 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorBuilder.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorBuilder.java @@ -20,7 +20,6 @@ import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; import com.hedera.block.server.ServiceStatus; -import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.persistence.storage.write.BlockWriter; @@ -41,49 +40,43 @@ public class LiveStreamMediatorBuilder { private final BlockWriter blockWriter; - private ServiceStatus serviceStatus; + private final BlockNodeContext blockNodeContext; + private final ServiceStatus serviceStatus; + private Map< EventHandler>, BatchEventProcessor>> subscribers; - private final BlockNodeContext blockNodeContext; + /** The initial capacity of the subscriber map. */ private static final int SUBSCRIBER_INIT_CAPACITY = 32; private LiveStreamMediatorBuilder( @NonNull final BlockWriter blockWriter, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { this.subscribers = new ConcurrentHashMap<>(SUBSCRIBER_INIT_CAPACITY); - this.serviceStatus = new ServiceStatusImpl(); this.blockWriter = blockWriter; this.blockNodeContext = blockNodeContext; + this.serviceStatus = serviceStatus; } /** * Create a new instance of the builder using the minimum required parameters. * - * @param blockWriter is required for the stream mediator to persist block items to storage + * @param blockWriter is required for the stream mediator to persist block items to storage. * @param blockNodeContext is required to provide metrics reporting mechanisms to the stream - * mediator - * @return a new instance of the {@link LiveStreamMediatorBuilder} + * mediator. + * @param serviceStatus is required to provide the stream mediator with access to check the + * status of the server and to stop the web server if necessary. + * @return a new stream mediator builder configured with required parameters. */ @NonNull public static LiveStreamMediatorBuilder newBuilder( @NonNull final BlockWriter blockWriter, - @NonNull final BlockNodeContext blockNodeContext) { - return new LiveStreamMediatorBuilder(blockWriter, blockNodeContext); - } - - /** - * Provide a non-default service status to the stream mediator. - * - * @param serviceStatus is the service status to set - * @return the builder - */ - @NonNull - public LiveStreamMediatorBuilder serviceStatus(@NonNull final ServiceStatus serviceStatus) { - this.serviceStatus = serviceStatus; - return this; + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final ServiceStatus serviceStatus) { + return new LiveStreamMediatorBuilder(blockWriter, blockNodeContext, serviceStatus); } /** @@ -110,7 +103,7 @@ public LiveStreamMediatorBuilder subscribers( * Use the build method to construct a stream mediator to handle live stream events from a * producer to N consumers. * - * @return the stream mediator + * @return the stream mediator to handle live stream events between a producer and N consumers. */ @NonNull public StreamMediator> build() { diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 3fe4cdf9d..09645e440 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -117,7 +117,7 @@ public void publish(@NonNull final BlockItem blockItem) throws IOException { // Increment the block item counter @NonNull final MetricsService metricsService = blockNodeContext.metricsService(); - metricsService.liveBlockItemCounter.increment(); + metricsService.liveBlockItems.increment(); try { // Persist the BlockItem @@ -208,7 +208,7 @@ private static SubscribeStreamResponse buildEndStreamResponse() { private void updateSubscriberMetrics() { @NonNull final MetricsService metricsService = blockNodeContext.metricsService(); - @NonNull final LongGauge longGauge = metricsService.subscriberGauge; + @NonNull final LongGauge longGauge = metricsService.subscribers; longGauge.set(subscribers.size()); } } diff --git a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java index 0ce4976d4..4a5d90c9b 100644 --- a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java +++ b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java @@ -21,6 +21,12 @@ import com.swirlds.metrics.api.Metrics; import edu.umd.cs.findbugs.annotations.NonNull; +/** + * Use member variables of this class to update metric data for the Hedera Block Node. + * + *

Metrics are updated by calling the appropriate method on the metric object instance. For + * example, to increment a counter, call {@link Counter#increment()}. + */ public class MetricsService { private static final String CATEGORY = "hedera_block_node"; @@ -54,13 +60,13 @@ public class MetricsService { /** An example counter. */ public final Counter exampleCounter; - public final Counter liveBlockItemCounter; - public final Counter blockPersistenceCounter; - public final Counter singleBlockRetrievedCounter; - public final LongGauge subscriberGauge; + public final Counter liveBlockItems; + public final Counter blocksPersisted; + public final Counter singleBlocksRetrieved; + public final LongGauge subscribers; /** - * Creates a new instance of {@link MetricsService}. + * Create singleton instance of metrics service to be used throughout the application. * * @param metrics the metrics instance */ @@ -68,9 +74,9 @@ public MetricsService(@NonNull final Metrics metrics) { this.exampleGauge = metrics.getOrCreate(EXAMPLE_GAUGE); this.exampleCounter = metrics.getOrCreate(EXAMPLE_COUNTER); - this.liveBlockItemCounter = metrics.getOrCreate(LIVE_BLOCK_ITEM_COUNTER); - this.blockPersistenceCounter = metrics.getOrCreate(BLOCK_PERSISTENCE_COUNTER); - this.singleBlockRetrievedCounter = metrics.getOrCreate(SINGLE_BLOCK_RETRIEVED_COUNTER); - this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE); + this.liveBlockItems = metrics.getOrCreate(LIVE_BLOCK_ITEM_COUNTER); + this.blocksPersisted = metrics.getOrCreate(BLOCK_PERSISTENCE_COUNTER); + this.singleBlocksRetrieved = metrics.getOrCreate(SINGLE_BLOCK_RETRIEVED_COUNTER); + this.subscribers = metrics.getOrCreate(SUBSCRIBER_GAUGE); } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java b/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java index e15174535..5dec622c0 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java @@ -22,11 +22,15 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; -/** Utility class for storage. */ +/** Util methods provide common functionality for the storage package. */ public final class Util { private Util() {} - /** Default file permissions. Default permissions: rwxr-xr-x */ + /** + * Default file permissions defines the file and directory for the storage package. + * + *

Default permissions are set to: rwxr-xr-x + */ @NonNull public static final FileAttribute> defaultPerms = PosixFilePermissions.asFileAttribute( diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/read/BlockAsDirReaderBuilder.java b/server/src/main/java/com/hedera/block/server/persistence/storage/read/BlockAsDirReaderBuilder.java index 3be4e7d54..c0a3f9d54 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/read/BlockAsDirReaderBuilder.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/read/BlockAsDirReaderBuilder.java @@ -25,6 +25,11 @@ import java.nio.file.attribute.PosixFilePermission; import java.util.Set; +/** + * Use builder methods to create a {@link BlockReader} to read blocks from storage. + * + *

When a block reader is created, it will provide access to read blocks from storage. + */ public class BlockAsDirReaderBuilder { private final String key; @@ -36,12 +41,31 @@ private BlockAsDirReaderBuilder(@NonNull final String key, @NonNull final Config this.config = config; } + /** + * Creates a new block reader builder using the minimum required parameters. + * + * @param key is required to read pertinent configuration info. + * @param config is required to supply pertinent configuration info for the block reader to + * access storage. + * @return a block reader builder configured with required parameters. + */ @NonNull public static BlockAsDirReaderBuilder newBuilder( @NonNull final String key, @NonNull final Config config) { return new BlockAsDirReaderBuilder(key, config); } + /** + * Optionally, provide file permissions for the block reader to use when managing block files + * and directories. + * + *

By default, the block reader will use the permissions defined in {@link + * Util#defaultPerms}. This method is primarily used for testing purposes. Default values should + * be sufficient for production use. + * + * @param filePerms the file permissions to use when managing block files and directories. + * @return a block reader builder configured with required parameters. + */ @NonNull public BlockAsDirReaderBuilder filePerms( @NonNull final FileAttribute> filePerms) { @@ -49,6 +73,11 @@ public BlockAsDirReaderBuilder filePerms( return this; } + /** + * Use the build method to construct a block reader to read blocks from storage. + * + * @return a new block reader configured with the parameters provided to the builder. + */ @NonNull public BlockReader build() { return new BlockAsDirReader(key, config, filePerms); diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriter.java index 4d32bed8d..c04965454 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriter.java @@ -59,7 +59,7 @@ class BlockAsDirWriter implements BlockWriter { * * @param key the key to use to retrieve the block node root path from the config * @param config the config to use to retrieve the block node root path - * @param blockRemover the block remover to use to remove blocks if there's an exception while + * @param blockRemover the block remover to use to remove blocks if there is an exception while * writing a partial block * @param filePerms the file permissions to set on the block node root path * @throws IOException if an error occurs while initializing the BlockAsDirWriter @@ -179,7 +179,7 @@ private void resetState(@NonNull final BlockItem blockItem) throws IOException { // Increment the block counter @NonNull final MetricsService metricsService = blockNodeContext.metricsService(); - metricsService.blockPersistenceCounter.increment(); + metricsService.blocksPersisted.increment(); } private void repairPermissions(@NonNull final Path path) throws IOException { diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriterBuilder.java b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriterBuilder.java index f9c36a1f2..579e8d5e5 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriterBuilder.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsDirWriterBuilder.java @@ -30,6 +30,11 @@ import java.nio.file.attribute.PosixFilePermission; import java.util.Set; +/** + * Use builder methods to create a {@link BlockWriter} to write blocks to storage. + * + *

When a block writer is created, it will provide access to write blocks to storage. + */ public class BlockAsDirWriterBuilder { private final String key; @@ -49,6 +54,15 @@ private BlockAsDirWriterBuilder( new BlockAsDirRemover(Path.of(config.get(key).asString().get()), Util.defaultPerms); } + /** + * Creates a new block writer builder using the minimum required parameters. + * + * @param key is required to read pertinent configuration info. + * @param config is required to supply pertinent configuration info for the block writer to + * access storage. + * @param blockNodeContext is required to provide metrics reporting mechanisms . + * @return a block writer builder configured with required parameters. + */ @NonNull public static BlockAsDirWriterBuilder newBuilder( @NonNull final String key, @@ -58,6 +72,17 @@ public static BlockAsDirWriterBuilder newBuilder( return new BlockAsDirWriterBuilder(key, config, blockNodeContext); } + /** + * Optionally, provide file permissions for the block writer to use when managing block files + * and directories. + * + *

By default, the block writer will use the permissions defined in {@link + * Util#defaultPerms}. This method is primarily used for testing purposes. Default values should + * be sufficient for production use. + * + * @param filePerms the file permissions to use when managing block files and directories. + * @return a block writer builder configured with required parameters. + */ @NonNull public BlockAsDirWriterBuilder filePerms( @NonNull FileAttribute> filePerms) { @@ -65,12 +90,28 @@ public BlockAsDirWriterBuilder filePerms( return this; } + /** + * Optionally, provide a block remover to remove blocks from storage. + * + *

By default, the block writer will use the block remover defined in {@link + * BlockAsDirRemover}. This method is primarily used for testing purposes. Default values should + * be sufficient for production use. + * + * @param blockRemover the block remover to use when removing blocks from storage. + * @return a block writer builder configured with required parameters. + */ @NonNull public BlockAsDirWriterBuilder blockRemover(@NonNull BlockRemover blockRemover) { this.blockRemover = blockRemover; return this; } + /** + * Use the build method to construct a block writer to write blocks to storage. + * + * @return a new block writer configured with the parameters provided to the builder. + * @throws IOException when an error occurs while persisting block items to storage. + */ @NonNull public BlockWriter build() throws IOException { return new BlockAsDirWriter(key, config, blockRemover, filePerms, blockNodeContext); diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockWriter.java index d9bd477f0..84baa9e18 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockWriter.java @@ -20,17 +20,17 @@ import java.io.IOException; /** - * The BlockWriter interface defines the contract for writing a block to the storage. + * BlockWriter defines the contract for writing block items to storage. * * @param the type of the block item to write */ public interface BlockWriter { /** - * Writes the block item to the storage. + * Write the block item to storage. * - * @param blockItem the block item to write - * @throws IOException if an I/O error occurs writing the block item + * @param blockItem the block item to write to storage. + * @throws IOException when failing to write the block item to storage. */ void write(@NonNull final V blockItem) throws IOException; } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 6a5250a17..e867c5f92 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -161,8 +161,7 @@ public void testSubscribeBlockStream() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext) - .serviceStatus(serviceStatus) + LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) .build(); // Build the BlockStreamService @@ -193,7 +192,7 @@ public void testSubscribeBlockStream() throws IOException { streamObserver.onNext(publishStreamRequest); // Verify the counter was incremented - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); verify(blockWriter, timeout(testTimeout).times(1)).write(blockItems.getFirst()); @@ -597,8 +596,7 @@ private StreamMediator> buildStr final ServiceStatus serviceStatus = new ServiceStatusImpl(); serviceStatus.setWebServer(webServer); - return LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext) - .serviceStatus(serviceStatus) + return LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus) .subscribers(subscribers) .build(); } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 8bd4c3ada..0227d6b4e 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.config.BlockNodeContextFactory; import com.hedera.block.server.consumer.ConsumerStreamResponseObserver; @@ -64,7 +65,8 @@ public void testUnsubscribeEach() throws InterruptedException, IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediatorBuilder = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()); final var streamMediator = streamMediatorBuilder.build(); // Set up the subscribers @@ -100,7 +102,7 @@ public void testUnsubscribeEach() throws InterruptedException, IOException { "Expected the mediator to have unsubscribed observer3"); // Confirm the counter was never incremented - assertEquals(0, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(0, blockNodeContext.metricsService().liveBlockItems.get()); } @Test @@ -108,14 +110,16 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); final BlockItem blockItem = BlockItem.newBuilder().build(); // Acting as a producer, notify the mediator of a new block streamMediator.publish(blockItem); // Verify the counter was incremented - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Confirm the BlockStorage write method was // called despite the absence of subscribers @@ -127,7 +131,9 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); @@ -166,7 +172,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup // Acting as a producer, notify the mediator of a new block streamMediator.publish(blockItem); - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Confirm each subscriber was notified of the new block verify(streamObserver1, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); @@ -182,7 +188,9 @@ public void testSubAndUnsubHandling() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); @@ -208,7 +216,7 @@ public void testSubAndUnsubHandling() throws IOException { streamMediator.unsubscribe(concreteObserver3); // Confirm the counter was never incremented - assertEquals(0, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(0, blockNodeContext.metricsService().liveBlockItems.get()); } @Test @@ -216,7 +224,9 @@ public void testOnCancelSubscriptionHandling() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); @@ -238,7 +248,7 @@ public void testOnCancelSubscriptionHandling() throws IOException { testConsumerBlockItemObserver.getOnCancel().run(); // Verify the block item incremented the counter - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Verify the event made it to the consumer verify(serverCallStreamObserver, timeout(testTimeout).times(1)).setOnCancelHandler(any()); @@ -252,7 +262,9 @@ public void testOnCloseSubscriptionHandling() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); // testClock configured to be outside the timeout window when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1); @@ -275,7 +287,7 @@ public void testOnCloseSubscriptionHandling() throws IOException { testConsumerBlockItemObserver.getOnClose().run(); // Verify the block item incremented the counter - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Verify the event made it to the consumer verify(serverCallStreamObserver, timeout(testTimeout).times(1)).setOnCancelHandler(any()); @@ -288,7 +300,9 @@ public void testOnCloseSubscriptionHandling() throws IOException { public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); final List blockItems = generateBlockItems(1); final BlockItem firstBlockItem = blockItems.getFirst(); @@ -308,7 +322,7 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr streamMediator.publish(secondBlockItem); // Confirm the counter was incremented only once - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Confirm the BlockPersistenceHandler write method was only called // once despite the second block being published. @@ -321,7 +335,9 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); final var testConsumerBlockItemObserver = new TestConsumerStreamResponseObserver( TIMEOUT_THRESHOLD_MILLIS, diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index 40a229c0d..d21f9393c 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -27,6 +27,7 @@ import com.google.protobuf.ByteString; import com.hedera.block.protos.BlockStreamService; import com.hedera.block.server.ServiceStatus; +import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.config.BlockNodeContextFactory; import com.hedera.block.server.consumer.ConsumerStreamResponseObserver; @@ -101,7 +102,9 @@ public void testProducerWithManyConsumers() throws IOException { final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final var streamMediator = - LiveStreamMediatorBuilder.newBuilder(blockWriter, blockNodeContext).build(); + LiveStreamMediatorBuilder.newBuilder( + blockWriter, blockNodeContext, new ServiceStatusImpl()) + .build(); // Mock a clock with 2 different return values in response to anticipated // millis() calls. Here the second call will always be inside the timeout window. @@ -155,7 +158,7 @@ public void testProducerWithManyConsumers() throws IOException { producerBlockItemObserver.onNext(publishStreamRequest); // Confirm the block item counter was incremented - assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get()); + assertEquals(1, blockNodeContext.metricsService().liveBlockItems.get()); // Confirm each subscriber was notified of the new block verify(streamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);