Skip to content

Commit

Permalink
fix: working with hedera-protobufs rpc definitions and types
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 11, 2024
1 parent fef178d commit 7dad6af
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 184 deletions.
60 changes: 21 additions & 39 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,29 @@ syntax = "proto3";
*/

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";
option java_outer_classname = "BlockStreamService";

/**
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
* exchanging BlockItems with the Block Node server.
*
* A producer (e.g. Consensus Node) can use the StreamSink method to stream BlockItems to the
* Block Node server. The Block Node server will respond with a BlockResponse message for
* each BlockItem received.
*
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
* BlockItems from the server. The consumer is expected to respond with a BlockResponse message
* with the id of each BlockItem received.
*/
service BlockStreamGrpc {
/**
* StreamSink is a bidirectional streaming method that allows a producer to stream BlockItems
* to the Block Node server. The server will respond with a BlockResponse message for each
* BlockItem received.
*/
rpc StreamSink (stream BlockItem) returns (stream BlockItemResponse) {}
service BlockStreamGrpcService {

/**
* StreamSource is a bidirectional streaming method that allows a consumer to request a
* stream of BlockItems from the server. The consumer is expected to respond with a BlockResponse
* message with the id of each BlockItem received.
*/
rpc StreamSource (stream BlockItemResponse) returns (stream BlockItem) {}
rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {}

rpc subscribeBlockStream (stream SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
}

message PublishStreamRequest {
BlockItem block_item = 1;
}

message PublishStreamResponse {
uint64 block_number = 1;
}

message SubscribeStreamRequest {
uint64 start_block_number = 1;
}

message SubscribeStreamResponse {
int32 status = 1;
}

/**
Expand All @@ -63,16 +58,3 @@ message BlockItem {
*/
string value = 2;
}

/**
* A BlockItemResponse is a simple message that contains an id.
* The BlockItemResponse is meant to confirm the receipt of a BlockItem.
* A future use case may expand on this type to communicate a failure
* condition where the BlockItem needs to be resent, etc.
*/
message BlockItemResponse {
/**
* The id of the BlockItem which was received.
*/
int64 id = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
Expand All @@ -28,6 +27,11 @@
import java.time.Clock;

import static com.hedera.block.server.Constants.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
import static com.hedera.block.protos.BlockStreamService.BlockItem;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
Expand All @@ -42,15 +46,15 @@ public class BlockStreamService implements GrpcService {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator) {
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
}
Expand All @@ -62,7 +66,7 @@ public BlockStreamService(final long timeoutThresholdMillis,
*/
@Override
public Descriptors.FileDescriptor proto() {
return BlockStreamServiceGrpcProto.getDescriptor();
return com.hedera.block.protos.BlockStreamService.getDescriptor();
}

/**
Expand All @@ -83,43 +87,25 @@ public String serviceName() {
*/
@Override
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
*
* @return a custom StreamObserver to handle streaming blockItems from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockItem> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> responseStreamObserver) {
private StreamObserver<PublishStreamRequest> publishBlockStream(final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockItemObserver(streamMediator, responseStreamObserver);
return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(final StreamObserver<BlockItem> subscribeStreamRequestObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final var streamObserver = new ConsumerBlockItemObserver(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);
subscribeStreamRequestObserver);

streamMediator.subscribe(streamObserver);

Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private Constants() {}
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
public static final String SERVICE_NAME = "BlockStreamGrpcService";
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";
}
21 changes: 14 additions & 7 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hedera.block.server;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.BlockStorage;
Expand All @@ -28,18 +27,26 @@
import io.helidon.webserver.grpc.GrpcRouting;

import java.io.IOException;
import java.util.stream.Stream;

import static com.hedera.block.server.Constants.*;
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

/**
* Main class for the block node server
*/
public class Server {

// public interface BidiStreamingMethod<ReqT, RespT> extends ServerCalls.StreamingRequestMethod<ReqT, RespT> {
// @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
// }

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItem>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItemResponse>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> serverBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<StreamObserver<SubscribeStreamRequest>, StreamObserver<SubscribeStreamResponse>> serverBidiStreamingMethod;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

Expand All @@ -62,7 +69,7 @@ public static void main(final String[] args) {
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStorage<BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));

Expand All @@ -71,11 +78,11 @@ public static void main(final String[] args) {
.port(8080)
.addRouting(GrpcRouting.builder()
.service(blockStreamService)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
.bidi(com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
.bidi(com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hedera.block.server.consumer;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
Expand All @@ -26,15 +25,18 @@
import java.time.InstantSource;
import java.util.concurrent.CountDownLatch;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
*/
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver;
private final StreamObserver<BlockItem> subscribeStreamResponseObserver;

private final long timeoutThresholdMillis;

Expand All @@ -43,18 +45,18 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv

private final CountDownLatch shutdownLatch = new CountDownLatch(1);

private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;

/**
* Constructor for the LiveStreamObserverImpl class.
*
* @param responseStreamObserver the response stream observer
* @param subscribeStreamResponseObserver the response stream observer
*/
public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator,
final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
final StreamObserver<BlockItem> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.producerLivenessClock = producerLivenessClock;
Expand All @@ -63,18 +65,18 @@ public ConsumerBlockItemObserver(
// be executed if a downstream consumer cancels the stream without
// sending an HTTP/2 End Stream DATA frame. If triggered, unsubscribe
// this observer to avoid orphaning subscribed resources.
if (responseStreamObserver instanceof ServerCallStreamObserver) {
if (subscribeStreamResponseObserver instanceof ServerCallStreamObserver) {

// Unfortunately we have to cast the responseStreamObserver to a ServerCallStreamObserver
// to register the onCancelHandler.
((ServerCallStreamObserver<BlockStreamServiceGrpcProto.BlockItem>)responseStreamObserver)
((ServerCallStreamObserver<BlockItem>)subscribeStreamResponseObserver)
.setOnCancelHandler(() -> {
LOGGER.log(System.Logger.Level.DEBUG, "Consumer cancelled stream. Unsubscribing observer.");
streamMediator.unsubscribe(this);
});
}

this.responseStreamObserver = responseStreamObserver;
this.subscribeStreamResponseObserver = subscribeStreamResponseObserver;
this.producerLivenessMillis = producerLivenessClock.millis();

this.streamMediator = streamMediator;
Expand All @@ -85,11 +87,11 @@ public ConsumerBlockItemObserver(
*
*/
@Override
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> event, final long l, final boolean b) throws Exception {
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) throws Exception {

// Refresh the producer liveness and pass the block to the observer.
producerLivenessMillis = producerLivenessClock.millis();
responseStreamObserver.onNext(event.get());
subscribeStreamResponseObserver.onNext(event.get());
}

/**
Expand All @@ -98,7 +100,7 @@ public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> eve
* @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer.
*/
@Override
public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItemResponse) {
public void onNext(final SubscribeStreamRequest blockItemResponse) {

// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
if (isThresholdExceeded(producerLivenessMillis)) {
Expand Down
Loading

0 comments on commit 7dad6af

Please sign in to comment.