diff --git a/server/docs/design/bidi-producer-consumers-streaming.md b/server/docs/design/bidi-producer-consumers-streaming.md index 19ae4548..82e051d7 100644 --- a/server/docs/design/bidi-producer-consumers-streaming.md +++ b/server/docs/design/bidi-producer-consumers-streaming.md @@ -4,9 +4,9 @@ A primary use case of the `hedera-block-node` is to stream live BlockItems (see Terms section) from a producer (e.g. Consensus Node) to N consumers (e.g. Mirror Node) with the lowest possible latency while correctly preserving the -order of the BlockItems. This document outlines several possible strategies to implement this use case and the design +order of the BlockItems. This document outlines several possible strategies to implement this use case and the design of the recommended approach. All strategies rely on the Helidon 4.x.x server implementations of HTTP/2 and gRPC -services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It +services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It does this by defining bidirectional gRPC streaming services based on protobuf definitions. Helidon provides well-defined APIs and extension points to implement business logic for these services. The main entry @@ -27,18 +27,18 @@ point for custom logic is an implementation of `GrpcService`. ### Terms **BlockItem** - The BlockItem is the primary data structure passed between the producer, the `hedera-block-node` -and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`. +and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`. **Bidirectional Streaming** - Bidirectional streaming is an [HTTP/2 feature](https://datatracker.ietf.org/doc/html/rfc9113#name-streams-and-multiplexing) allowing both a client and a server to emit -a continuous stream of frames without waiting for responses. In this way, gRPC services can be used to efficiently +a continuous stream of frames without waiting for responses. In this way, gRPC services can be used to efficiently transmit a continuous flow of BlockItem messages while the HTTP/2 connection is open. **Producer StreamObserver** - The Producer StreamObserver is a custom implementation of the [gRPC StreamObserver -interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Producer +interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Producer StreamObserver at runtime when the producer sends a new BlockItem to the `StreamSink` gRPC service. **Consumer StreamObserver** - The Consumer StreamObserver is a custom implementation of the [gRPC StreamObserver -interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Consumer +interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Consumer StreamObserver at runtime when the downstream consumer of the `StreamSource` gRPC service sends HTTP/2 responses to sent BlockItems. @@ -61,21 +61,21 @@ The Block Node gRPC Streaming Services API is now aligned with the names and sim ## Approaches: All the following approaches require integrating with Helidon 4.x.x gRPC services to implement the bidirectional -streaming API methods defined above. The following objects are used in all approaches: +streaming API methods defined above. The following objects are used in all approaches: `BlockItemStreamService` is a custom implementation of the Helidon gRPC `GrpcService`. It is responsible for binding the Helidon routing mechanism to the gRPC streaming methods called by producers and consumers. `ProducerBlockItemObserver` is a custom implementation of the Helidon gRPC `StreamObserver` interface. `BlockItemStreamService` instantiates a new `ProducerBlockItemObserver` instance when the `StreamSink` gRPC method is -called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem +called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem from the producer and return BlockItemResponses via a bidirectional stream. `ConsumerBlockItemObserver` is also a custom implementation of the Helidon gRPC `StreamObserver` interface. `BlockItemStreamService` instantiates a new `ConsumerBlockItemObserver` instance when the `StreamSource` gRPC method -is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon -when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest -BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses +is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon +when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest +BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses from the consumer in receipt of BlockItems. @@ -83,7 +83,7 @@ from the consumer in receipt of BlockItems. Directly passing BlockItems from the `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s without storing BlockItems in an intermediate data structure. This approach was the basis for one of the first implementations of gRPC -Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems: +Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems: Drawbacks: 1) Each `ProducerBlockItemObserver` must iterate over the list of subscribed consumers to pass the BlockItem to each @@ -91,15 +91,15 @@ Drawbacks: The linear scaling of consumers will aggregate latency resulting in the last consumer in the list to be penalized with the sum of the latencies of all consumers before it. 2) Dynamically subscribing/unsubscribing `ConsumerBlockItemObserver`s while deterministically broadcasting BlockItems - to each consumer in the correct order complicates and slows down the process. It requires thread-safe data + to each consumer in the correct order complicates and slows down the process. It requires thread-safe data structures and synchronization on all reads and writes to ensure new/removed subscribers do not disrupt the iteration order of the `ConsumerBlockItemObserver`s. -### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems. +### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems. Alternatively, if `ProducerBlockItemObserver`s store BlockItems in a shared data structure before immediately returning a response to the producer, the BlockItem is then immediately available for all `ConsumerBlockItemObserver`s to read -asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the +asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the following consequences: Advantages: @@ -113,22 +113,22 @@ Drawbacks: up or down. 3) While prototyping this approach, it appeared that `ConsumerBlockItemObserver`s using a busy-wait to watch for new BlockItems impaired the ability of the Helidon Virtual Thread instance to process the inbound responses from the - downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases + downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases requiring downstream consumer response processing. -### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems. +### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems. With this approach, the `ProducerBlockItemObserver` will store BlockItems in a shared data structure before immediately -returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems, +returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems, `ConsumerBlockItemObserver`s will send new BlockItems only upon receipt of BlockItemResponses from previously sent -BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an +BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an internal counter) will calculate and send all newest BlockItems available from the shared data structure to the -downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems. +downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems. Advantages: 1) It will not consume CPU resources polling. -2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the +2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the consumer to trigger sending the newest BlockItems downstream. 3) The shared data structure will need to be concurrent but, after the initial write operation, all subsequent reads should not require synchronization. @@ -141,10 +141,10 @@ Drawbacks: BlockItemResponses. Given, the latency of a network request/response round-trip, this approach will likely be far too slow to be considered effective even when sending a batch of all the latest BlockItems. -### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer. +### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer. The LMAX Disruptor library is a high-performance inter-process pub/sub message passing library that could be used to -efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor +efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor library is designed to minimize latency as well as CPU cycles to by not blocking while maintaining concurrency guarantees. @@ -161,7 +161,7 @@ Drawbacks: effectively. 2) Leveraging the Disruptor library requires the communication between the `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s to be affiliated by subscribing/unsubscribing the downstream consumers to receive the - latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to + latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to the RingBuffer can be complex. --- @@ -169,9 +169,9 @@ Drawbacks: ## Design Given the goals and the proposed approaches, Approach #4 has significant advantages and fewer significant drawbacks. -Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer +Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer intermediate data structure should serve to decouple the producer bidirectional stream from the consumer bidirectional -streams. Please see the following Entities section and Diagrams for a visual representation of the design. +streams. Please see the following Entities section and Diagrams for a visual representation of the design. ### Producer Registration Flow @@ -179,7 +179,7 @@ At boot time, the `BlockItemStreamService` will initialize the `StreamMediator` When a producer calls the `StreamSink` gRPC method, the `BlockItemStreamService` will create a new `ProducerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the -upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to +upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the producer. See the Producer Registration Flow diagram for more details. @@ -187,8 +187,8 @@ See the Producer Registration Flow diagram for more details. When a consumer calls the `StreamSource` gRPC method, the `BlockItemStreamService` will create a new `ConsumerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the -downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to -the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The +downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to +the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The `BlockItemStreamService` will also subscribe the `ConsumerBlockItemObserver` to the `StreamMediator` to receive the streaming BlockItems from the producer. @@ -196,29 +196,29 @@ streaming BlockItems from the producer. ### Runtime Streaming At runtime, the `ProducerBlockItemObserver` will receive the latest BlockItem from the producer via Helidon and will -invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The +invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The `ProducerBlockItemObserver` will then persist the BlockItem and return a BlockItemResponse to the producer via its reference to `ResponseStreamObserver`. Asynchronously, the RingBuffer will invoke the onEvent(BlockItem) method of all the subscribed -`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit -the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will -respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the +`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit +the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will +respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the BlockItemResponse. BlockItems sent to the `ConsumerBlockItemObserver` via the RingBuffer and BlockItemResponses passed by Helidon from -the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a +the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a configurable timeout threshold is exceeded, the `ConsumerBlockItemObserver` will unsubscribe itself from the -`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA -frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to -`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not +`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA +frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to +`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not permanently subscribed to the `StreamMediator`. ### Entities **BlockItemStreamService** - The BlockItemStreamService is a custom implementation of the Helidon gRPC GrpcService. It is responsible for initializing the StreamMediator and instantiating ProducerBlockItemObserver and -ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is +ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is the primary binding between the Helidon routing mechanisms and the `hedera-block-node` custom business logic. **StreamObserver** - StreamObserver is the main interface through which Helidon 4.x.x invokes custom business logic @@ -238,7 +238,7 @@ the producer and consumers. The RingBuffer is a fixed-sized array of ConsumerBlockItemObservers that is managed by the Disruptor library. **EventHandler** - The EventHandler is an integration interface provided by the Disruptor library as a mechanism to -invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing +invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing the latest BlockItem to the ConsumerBlockItemObserver when it is available in the RingBuffer. **ConsumerBlockItemObserver** - A custom implementation of StreamObserver called by Helidon which is responsible for: