Skip to content

Commit

Permalink
feat: added singleBlock counter
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 8, 2024
1 parent ac7a2a1 commit f558680
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.hedera.block.server.Constants.*;

import com.google.protobuf.Descriptors;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.producer.ItemAckBuilder;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
Expand All @@ -47,6 +49,7 @@ public class BlockStreamService implements GrpcService {
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ServiceStatus serviceStatus;
private final BlockReader<Block> blockReader;
private final BlockNodeContext blockNodeContext;

/**
* Constructor for the BlockStreamService class. It initializes the BlockStreamService with the
Expand All @@ -69,12 +72,14 @@ public class BlockStreamService implements GrpcService {
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus) {
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.blockReader = blockReader;
this.serviceStatus = serviceStatus;
this.blockNodeContext = blockNodeContext;
}

/**
Expand Down Expand Up @@ -170,6 +175,10 @@ void singleBlock(
blockNumber);
singleBlockResponseStreamObserver.onNext(
buildSingleBlockResponse(blockOpt.get()));

@NonNull
final MetricsService metricsService = blockNodeContext.metricsService();
metricsService.singleBlockRetrievedCounter.increment();
} else {
LOGGER.log(
System.Logger.Level.DEBUG, "Block number {0} not found", blockNumber);
Expand Down
9 changes: 6 additions & 3 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public static void main(final String[] args) {
config,
streamMediator,
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
serviceStatus);
serviceStatus,
blockNodeContext);
@NonNull final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService);

// Build the web server
Expand Down Expand Up @@ -146,7 +147,8 @@ private static BlockStreamService buildBlockStreamService(
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus) {
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
Expand All @@ -157,6 +159,7 @@ private static BlockStreamService buildBlockStreamService(
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public class MetricsService {
new LongGauge.Config(SUBSCRIBER_CATEGORY, "subscriberGauge")
.withDescription("Subscriber Gauge");

// Single Block Retrieved Counter
private static final String SINGLE_BLOCK_CATEGORY = "singleBlock";
private static final Counter.Config SINGLE_BLOCK_RETRIEVED_COUNTER =
new Counter.Config(SINGLE_BLOCK_CATEGORY, "singleBlockRetrievedCounter")
.withDescription("Single Block Retrieved Counter");

/** An example gauge. */
public final LongGauge exampleGauge;

Expand All @@ -57,6 +63,7 @@ public class MetricsService {

public final Counter liveBlockItemCounter;
public final Counter blockPersistenceCounter;
public final Counter singleBlockRetrievedCounter;
public final LongGauge subscriberGauge;

/**
Expand All @@ -69,7 +76,8 @@ public MetricsService(@NonNull final Metrics metrics) {
this.exampleCounter = metrics.getOrCreate(EXAMPLE_COUNTER);

this.liveBlockItemCounter = metrics.getOrCreate(LIVE_BLOCK_ITEM_COUNTER);
this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE);
this.blockPersistenceCounter = metrics.getOrCreate(BLOCK_PERSISTENCE_COUNTER);
this.singleBlockRetrievedCounter = metrics.getOrCreate(SINGLE_BLOCK_RETRIEVED_COUNTER);
this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ public void tearDown() {
public void testPublishBlockStreamRegistrationAndExecution()
throws IOException, NoSuchAlgorithmException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
1500L, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus);
1500L,
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus,
blockNodeContext);

// Enable the serviceStatus
when(serviceStatus.isRunning()).thenReturn(true);
Expand Down Expand Up @@ -158,7 +164,12 @@ public void testSubscribeBlockStream() throws IOException {
// Build the BlockStreamService
final BlockStreamService blockStreamService =
new BlockStreamService(
2000L, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus);
2000L,
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus,
blockNodeContext);

// Subscribe the consumers
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
Expand Down Expand Up @@ -551,6 +562,14 @@ private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem bl
return SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
}

private BlockStreamService buildBlockStreamService() throws IOException {
final var streamMediator =
buildStreamMediator(new ConcurrentHashMap<>(32), Util.defaultPerms);

// Build the BlockStreamService
return buildBlockStreamService(streamMediator, blockReader, serviceStatus);
}

private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStreamMediator(
final Map<
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
Expand Down Expand Up @@ -580,19 +599,19 @@ private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStr
.build();
}

private BlockStreamService buildBlockStreamService() throws IOException {
final var streamMediator =
buildStreamMediator(new ConcurrentHashMap<>(32), Util.defaultPerms);

// Build the BlockStreamService
return buildBlockStreamService(streamMediator, blockReader, serviceStatus);
}

private BlockStreamService buildBlockStreamService(
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final BlockReader<Block> blockReader,
final ServiceStatus serviceStatus) {
final ServiceStatus serviceStatus)
throws IOException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
return new BlockStreamService(
2000, new ItemAckBuilder(), streamMediator, blockReader, serviceStatus);
2000,
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus,
blockNodeContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ public void tearDown() {

@Test
public void testServiceName() throws IOException, NoSuchAlgorithmException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

// Verify the service name
assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName());
Expand All @@ -117,13 +120,15 @@ public void testServiceName() throws IOException, NoSuchAlgorithmException {

@Test
public void testProto() throws IOException, NoSuchAlgorithmException {
final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);
Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto();

// Verify the current rpc methods
Expand All @@ -138,19 +143,20 @@ public void testProto() throws IOException, NoSuchAlgorithmException {
void testSingleBlockHappyPath() throws IOException {

final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

// Enable the serviceStatus
when(serviceStatus.isRunning()).thenReturn(true);

// Generate and persist a block
final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockWriter<BlockItem> blockWriter =
BlockAsDirWriterBuilder.newBuilder(JUNIT, testConfig, blockNodeContext).build();
final List<BlockItem> blockItems = generateBlockItems(1);
Expand Down Expand Up @@ -181,6 +187,8 @@ void testSingleBlockHappyPath() throws IOException {
@Test
void testSingleBlockNotFoundPath() throws IOException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();

// Get the block so we can verify the response payload
when(blockReader.read(1)).thenReturn(Optional.empty());

Expand All @@ -198,7 +206,8 @@ void testSingleBlockNotFoundPath() throws IOException {
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

// Enable the serviceStatus
when(serviceStatus.isRunning()).thenReturn(true);
Expand All @@ -208,15 +217,17 @@ void testSingleBlockNotFoundPath() throws IOException {
}

@Test
void testSingleBlockServiceNotAvailable() {
void testSingleBlockServiceNotAvailable() throws IOException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

// Set the service status to not running
when(serviceStatus.isRunning()).thenReturn(false);
Expand All @@ -232,13 +243,15 @@ void testSingleBlockServiceNotAvailable() {

@Test
public void testSingleBlockIOExceptionPath() throws IOException {
final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

// Set the service status to not running
when(serviceStatus.isRunning()).thenReturn(true);
Expand All @@ -254,15 +267,17 @@ public void testSingleBlockIOExceptionPath() throws IOException {
}

@Test
public void testUpdateInvokesRoutingWithLambdas() {
public void testUpdateInvokesRoutingWithLambdas() throws IOException {

final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();
final BlockStreamService blockStreamService =
new BlockStreamService(
TIMEOUT_THRESHOLD_MILLIS,
itemAckBuilder,
streamMediator,
blockReader,
serviceStatus);
serviceStatus,
blockNodeContext);

GrpcService.Routing routing = mock(GrpcService.Routing.class);
blockStreamService.update(routing);
Expand Down

0 comments on commit f558680

Please sign in to comment.