Skip to content

Commit

Permalink
fix: changed the subscribeBlockStream gRPC method from bidirectional …
Browse files Browse the repository at this point in the history
…to server streaming.

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 16, 2024
1 parent 31be928 commit 7cc9abd
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 57 deletions.
2 changes: 1 addition & 1 deletion protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ service BlockStreamGrpcService {

rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {}

rpc subscribeBlockStream (stream SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
rpc subscribeBlockStream (SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
}

message PublishStreamRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,33 @@ public String serviceName() {
@Override
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
}

private StreamObserver<PublishStreamRequest> publishBlockStream(
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
LOGGER.log(
System.Logger.Level.DEBUG,
"Executing bidirectional publishBlockStream gRPC method");

return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
}

private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(
final StreamObserver<SubscribeStreamResponse> subscribeStreamRequestObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
private void subscribeBlockStream(
final SubscribeStreamRequest subscribeStreamRequest,
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
LOGGER.log(
System.Logger.Level.DEBUG,
"Executing Server Streaming subscribeBlockStream gRPC method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final var streamObserver =
new ConsumerBlockItemObserver(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamRequestObserver);
subscribeStreamResponseObserver);

streamMediator.subscribe(streamObserver);

return streamObserver;
}
}
13 changes: 7 additions & 6 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
/** Main class for the block node server */
public class Server {

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the
// Function stubs to satisfy the routing param signatures. The implementations are in the
// service class.
private static ServerCalls.BidiStreamingMethod<
StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>>
clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<
StreamObserver<SubscribeStreamRequest>, StreamObserver<SubscribeStreamResponse>>
serverBidiStreamingMethod;

public static ServerCalls.ServerStreamingMethod<
SubscribeStreamRequest, StreamObserver<SubscribeStreamResponse>>
serverStreamingMethod;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

Expand Down Expand Up @@ -85,12 +86,12 @@ public static void main(final String[] args) {
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(
.serverStream(
com.hedera.block.protos.BlockStreamService
.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod))
serverStreamingMethod))
.build()
.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@

package com.hedera.block.server.consumer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
import static com.hedera.block.protos.BlockStreamService.*;

import com.hedera.block.protos.BlockStreamService;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
Expand All @@ -39,8 +36,7 @@ public class ConsumerBlockItemObserver

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<BlockStreamService.SubscribeStreamResponse>
subscribeStreamResponseObserver;
private final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver;

private final long timeoutThresholdMillis;
private final InstantSource producerLivenessClock;
Expand Down
37 changes: 1 addition & 36 deletions server/src/test/resources/consumer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,8 @@ if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then
usage_error
fi

# Check if the second argument is provided and if it's a positive integer
if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then
usage_error
fi

# If the script reaches here, the parameters are valid
echo "The provided integer is: $1"
if [ "$#" -eq 2 ]; then
echo "The optional positive integer is: $2"
fi

# Use environment variables or default values
GRPC_SERVER=${GRPC_SERVER:-"localhost:8080"}
Expand All @@ -29,32 +21,5 @@ PROTO_IMPORT_PATH=${PROTO_IMPORT_PATH:-"../../../../protos/src/main/protobuf"}

echo "Starting consumer..."

# Signal handler to handle SIGINT (Ctrl+C)
function cleanup {
echo "Received SIGINT, stopping..."
kill $GRPC_PID
exit 0
}

# Trap SIGINT
trap cleanup SIGINT

# Generate and push messages to the gRPC server as a consumer.
# Response block messages from the gRPC server are printed to stdout.
(
iter=$1
while true; do
echo "{\"start_block_number\": $iter}"

if [ $iter -eq $2 ]; then
exit 0
fi

((iter++))

# Configure the message speed
sleep 1

done
) | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD

echo "{\"start_block_number\": $1}" | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD

0 comments on commit 7cc9abd

Please sign in to comment.