From 7cc9abd1214aff230c28bfbf684a2ba759ef5a24 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Tue, 16 Jul 2024 14:46:36 -0600 Subject: [PATCH] fix: changed the subscribeBlockStream gRPC method from bidirectional to server streaming. Signed-off-by: Matt Peterson --- protos/src/main/protobuf/blockstream.proto | 2 +- .../block/server/BlockStreamService.java | 19 ++++++---- .../java/com/hedera/block/server/Server.java | 13 ++++--- .../consumer/ConsumerBlockItemObserver.java | 8 +--- server/src/test/resources/consumer.sh | 37 +------------------ 5 files changed, 22 insertions(+), 57 deletions(-) diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index f6c1f7ab3..df976c2a9 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -23,7 +23,7 @@ service BlockStreamGrpcService { rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {} - rpc subscribeBlockStream (stream SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {} + rpc subscribeBlockStream (SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {} } message PublishStreamRequest { diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 8603ce644..b4d97523d 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -91,19 +91,24 @@ public String serviceName() { @Override public void update(final Routing routing) { routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream); - routing.bidi(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream); + routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream); } private StreamObserver publishBlockStream( final StreamObserver publishStreamResponseObserver) { - LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method"); + LOGGER.log( + System.Logger.Level.DEBUG, + "Executing bidirectional publishBlockStream gRPC method"); return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver); } - private StreamObserver subscribeBlockStream( - final StreamObserver subscribeStreamRequestObserver) { - LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method"); + private void subscribeBlockStream( + final SubscribeStreamRequest subscribeStreamRequest, + final StreamObserver subscribeStreamResponseObserver) { + LOGGER.log( + System.Logger.Level.DEBUG, + "Executing Server Streaming subscribeBlockStream gRPC method"); // Return a custom StreamObserver to handle streaming blocks from the producer. final var streamObserver = @@ -111,10 +116,8 @@ private StreamObserver subscribeBlockStream( timeoutThresholdMillis, Clock.systemDefaultZone(), streamMediator, - subscribeStreamRequestObserver); + subscribeStreamResponseObserver); streamMediator.subscribe(streamObserver); - - return streamObserver; } } diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 78a741be0..c2176c749 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -33,14 +33,15 @@ /** Main class for the block node server */ public class Server { - // Function stubs to satisfy the bidi routing param signatures. The implementations are in the + // Function stubs to satisfy the routing param signatures. The implementations are in the // service class. private static ServerCalls.BidiStreamingMethod< StreamObserver, StreamObserver> clientBidiStreamingMethod; - private static ServerCalls.BidiStreamingMethod< - StreamObserver, StreamObserver> - serverBidiStreamingMethod; + + public static ServerCalls.ServerStreamingMethod< + SubscribeStreamRequest, StreamObserver> + serverStreamingMethod; private static final System.Logger LOGGER = System.getLogger(Server.class.getName()); @@ -85,12 +86,12 @@ public static void main(final String[] args) { SERVICE_NAME, CLIENT_STREAMING_METHOD_NAME, clientBidiStreamingMethod) - .bidi( + .serverStream( com.hedera.block.protos.BlockStreamService .getDescriptor(), SERVICE_NAME, SERVER_STREAMING_METHOD_NAME, - serverBidiStreamingMethod)) + serverStreamingMethod)) .build() .start(); diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 204924a60..c403af32c 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -16,11 +16,8 @@ package com.hedera.block.server.consumer; -import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest; -import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; +import static com.hedera.block.protos.BlockStreamService.*; -import com.hedera.block.protos.BlockStreamService; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.ServerCallStreamObserver; @@ -39,8 +36,7 @@ public class ConsumerBlockItemObserver private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final StreamObserver - subscribeStreamResponseObserver; + private final StreamObserver subscribeStreamResponseObserver; private final long timeoutThresholdMillis; private final InstantSource producerLivenessClock; diff --git a/server/src/test/resources/consumer.sh b/server/src/test/resources/consumer.sh index cbd9807ca..43eca1c06 100755 --- a/server/src/test/resources/consumer.sh +++ b/server/src/test/resources/consumer.sh @@ -10,16 +10,8 @@ if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then usage_error fi -# Check if the second argument is provided and if it's a positive integer -if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then - usage_error -fi - # If the script reaches here, the parameters are valid echo "The provided integer is: $1" -if [ "$#" -eq 2 ]; then - echo "The optional positive integer is: $2" -fi # Use environment variables or default values GRPC_SERVER=${GRPC_SERVER:-"localhost:8080"} @@ -29,32 +21,5 @@ PROTO_IMPORT_PATH=${PROTO_IMPORT_PATH:-"../../../../protos/src/main/protobuf"} echo "Starting consumer..." -# Signal handler to handle SIGINT (Ctrl+C) -function cleanup { - echo "Received SIGINT, stopping..." - kill $GRPC_PID - exit 0 -} - -# Trap SIGINT -trap cleanup SIGINT - -# Generate and push messages to the gRPC server as a consumer. # Response block messages from the gRPC server are printed to stdout. -( - iter=$1 - while true; do - echo "{\"start_block_number\": $iter}" - - if [ $iter -eq $2 ]; then - exit 0 - fi - - ((iter++)) - - # Configure the message speed - sleep 1 - - done -) | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD - +echo "{\"start_block_number\": $1}" | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD