diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 294c69556..1dcc451e6 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -20,9 +20,11 @@ import static com.hedera.block.server.Constants.*; import com.google.protobuf.Descriptors; +import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.consumer.ConsumerStreamResponseObserver; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.producer.ItemAckBuilder; import com.hedera.block.server.producer.ProducerBlockItemObserver; @@ -47,6 +49,7 @@ public class BlockStreamService implements GrpcService { private final StreamMediator> streamMediator; private final ServiceStatus serviceStatus; private final BlockReader blockReader; + private final BlockNodeContext blockNodeContext; /** * Constructor for the BlockStreamService class. It initializes the BlockStreamService with the @@ -69,12 +72,14 @@ public class BlockStreamService implements GrpcService { final StreamMediator> streamMediator, @NonNull final BlockReader blockReader, - @NonNull final ServiceStatus serviceStatus) { + @NonNull final ServiceStatus serviceStatus, + @NonNull final BlockNodeContext blockNodeContext) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.itemAckBuilder = itemAckBuilder; this.streamMediator = streamMediator; this.blockReader = blockReader; this.serviceStatus = serviceStatus; + this.blockNodeContext = blockNodeContext; } /** @@ -170,6 +175,10 @@ void singleBlock( blockNumber); singleBlockResponseStreamObserver.onNext( buildSingleBlockResponse(blockOpt.get())); + + @NonNull + final MetricsService metricsService = blockNodeContext.metricsService(); + metricsService.singleBlockRetrievedCounter.increment(); } else { LOGGER.log( System.Logger.Level.DEBUG, "Block number {0} not found", blockNumber); diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 910edf541..4461822f3 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -98,7 +98,8 @@ public static void main(final String[] args) { config, streamMediator, new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config), - serviceStatus); + serviceStatus, + blockNodeContext); @NonNull final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService); // Build the web server @@ -146,7 +147,8 @@ private static BlockStreamService buildBlockStreamService( final StreamMediator> streamMediator, @NonNull final BlockReader blockReader, - @NonNull final ServiceStatus serviceStatus) { + @NonNull final ServiceStatus serviceStatus, + @NonNull final BlockNodeContext blockNodeContext) { // Get Timeout threshold from configuration final long consumerTimeoutThreshold = @@ -157,6 +159,7 @@ private static BlockStreamService buildBlockStreamService( new ItemAckBuilder(), streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); } } diff --git a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java index e51c01903..29f9d3dd6 100644 --- a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java +++ b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java @@ -49,6 +49,12 @@ public class MetricsService { new LongGauge.Config(SUBSCRIBER_CATEGORY, "subscriberGauge") .withDescription("Subscriber Gauge"); + // Single Block Retrieved Counter + private static final String SINGLE_BLOCK_CATEGORY = "singleBlock"; + private static final Counter.Config SINGLE_BLOCK_RETRIEVED_COUNTER = + new Counter.Config(SINGLE_BLOCK_CATEGORY, "singleBlockRetrievedCounter") + .withDescription("Single Block Retrieved Counter"); + /** An example gauge. */ public final LongGauge exampleGauge; @@ -57,6 +63,7 @@ public class MetricsService { public final Counter liveBlockItemCounter; public final Counter blockPersistenceCounter; + public final Counter singleBlockRetrievedCounter; public final LongGauge subscriberGauge; /** @@ -69,7 +76,8 @@ public MetricsService(@NonNull final Metrics metrics) { this.exampleCounter = metrics.getOrCreate(EXAMPLE_COUNTER); this.liveBlockItemCounter = metrics.getOrCreate(LIVE_BLOCK_ITEM_COUNTER); - this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE); this.blockPersistenceCounter = metrics.getOrCreate(BLOCK_PERSISTENCE_COUNTER); + this.singleBlockRetrievedCounter = metrics.getOrCreate(SINGLE_BLOCK_RETRIEVED_COUNTER); + this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 32115f625..42ae26210 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -107,9 +107,15 @@ public void tearDown() { public void testPublishBlockStreamRegistrationAndExecution() throws IOException, NoSuchAlgorithmException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( - 1500L, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus); + 1500L, + new ItemAckBuilder(), + streamMediator, + blockReader, + serviceStatus, + blockNodeContext); // Enable the serviceStatus when(serviceStatus.isRunning()).thenReturn(true); @@ -158,7 +164,12 @@ public void testSubscribeBlockStream() throws IOException { // Build the BlockStreamService final BlockStreamService blockStreamService = new BlockStreamService( - 2000L, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus); + 2000L, + new ItemAckBuilder(), + streamMediator, + blockReader, + serviceStatus, + blockNodeContext); // Subscribe the consumers blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); @@ -551,6 +562,14 @@ private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem bl return SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); } + private BlockStreamService buildBlockStreamService() throws IOException { + final var streamMediator = + buildStreamMediator(new ConcurrentHashMap<>(32), Util.defaultPerms); + + // Build the BlockStreamService + return buildBlockStreamService(streamMediator, blockReader, serviceStatus); + } + private StreamMediator> buildStreamMediator( final Map< EventHandler>, @@ -580,19 +599,19 @@ private StreamMediator> buildStr .build(); } - private BlockStreamService buildBlockStreamService() throws IOException { - final var streamMediator = - buildStreamMediator(new ConcurrentHashMap<>(32), Util.defaultPerms); - - // Build the BlockStreamService - return buildBlockStreamService(streamMediator, blockReader, serviceStatus); - } - private BlockStreamService buildBlockStreamService( final StreamMediator> streamMediator, final BlockReader blockReader, - final ServiceStatus serviceStatus) { + final ServiceStatus serviceStatus) + throws IOException { + + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); return new BlockStreamService( - 2000, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus); + 2000, + new ItemAckBuilder(), + streamMediator, + blockReader, + serviceStatus, + blockNodeContext); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java index b8a7d74f7..9bbe8117d 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -99,13 +99,16 @@ public void tearDown() { @Test public void testServiceName() throws IOException, NoSuchAlgorithmException { + + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); // Verify the service name assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName()); @@ -117,13 +120,15 @@ public void testServiceName() throws IOException, NoSuchAlgorithmException { @Test public void testProto() throws IOException, NoSuchAlgorithmException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto(); // Verify the current rpc methods @@ -138,19 +143,20 @@ public void testProto() throws IOException, NoSuchAlgorithmException { void testSingleBlockHappyPath() throws IOException { final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); // Enable the serviceStatus when(serviceStatus.isRunning()).thenReturn(true); // Generate and persist a block - final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockWriter blockWriter = BlockAsDirWriterBuilder.newBuilder(JUNIT, testConfig, blockNodeContext).build(); final List blockItems = generateBlockItems(1); @@ -181,6 +187,8 @@ void testSingleBlockHappyPath() throws IOException { @Test void testSingleBlockNotFoundPath() throws IOException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); + // Get the block so we can verify the response payload when(blockReader.read(1)).thenReturn(Optional.empty()); @@ -198,7 +206,8 @@ void testSingleBlockNotFoundPath() throws IOException { itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); // Enable the serviceStatus when(serviceStatus.isRunning()).thenReturn(true); @@ -208,15 +217,17 @@ void testSingleBlockNotFoundPath() throws IOException { } @Test - void testSingleBlockServiceNotAvailable() { + void testSingleBlockServiceNotAvailable() throws IOException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); // Set the service status to not running when(serviceStatus.isRunning()).thenReturn(false); @@ -232,13 +243,15 @@ void testSingleBlockServiceNotAvailable() { @Test public void testSingleBlockIOExceptionPath() throws IOException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); // Set the service status to not running when(serviceStatus.isRunning()).thenReturn(true); @@ -254,15 +267,17 @@ public void testSingleBlockIOExceptionPath() throws IOException { } @Test - public void testUpdateInvokesRoutingWithLambdas() { + public void testUpdateInvokesRoutingWithLambdas() throws IOException { + final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create(); final BlockStreamService blockStreamService = new BlockStreamService( TIMEOUT_THRESHOLD_MILLIS, itemAckBuilder, streamMediator, blockReader, - serviceStatus); + serviceStatus, + blockNodeContext); GrpcService.Routing routing = mock(GrpcService.Routing.class); blockStreamService.update(routing);