Skip to content

Commit

Permalink
wip: docs
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent 1f62b7e commit 3dd03b6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@

package com.hedera.block.server.consumer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.time.InstantSource;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
* the downstream consumer via the notify method and manage the bidirectional stream to the consumer
* via the onNext, onError, and onCompleted methods.
* The ConsumerBlockItemObserver class is the primary integration point between the LMAX Disruptor and
* an instance of a downstream consumer (represented by subscribeStreamResponseObserver provided by Helidon).
* The ConsumerBlockItemObserver implements the EventHandler interface so the Disruptor can invoke
* the onEvent() method when a new SubscribeStreamResponse is available.
*/
public class ConsumerBlockItemObserver
implements BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> {
Expand All @@ -50,9 +52,12 @@ public class ConsumerBlockItemObserver
protected Runnable onClose;

/**
* Constructor for the LiveStreamObserverImpl class.
* Constructor for the ConsumerBlockItemObserver class.
*
* @param subscribeStreamResponseObserver the response stream observer
* @param timeoutThresholdMillis The timeout threshold in milliseconds.
* @param producerLivenessClock The producer liveness clock.
* @param streamMediator The StreamMediator instance.
* @param subscribeStreamResponseObserver The StreamObserver instance.
*/
public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
Expand Down Expand Up @@ -109,23 +114,28 @@ public void onEvent(
final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
streamMediator.unsubscribe(this);
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed handler");
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed ConsumerBlockItemObserver due to producer timeout");
} else {

// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;

// Only start sending BlockItems after we've reached
// the beginning of a block.
final SubscribeStreamResponse subscribeStreamResponse = event.get();
final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
if (!streamStarted && blockItem.hasHeader()) {
streamStarted = true;
}

if (streamStarted && isResponsePermitted.get()) {
LOGGER.log(System.Logger.Level.INFO, "Send BlockItem downstream: {0} ", blockItem);
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
// Only send the response if the consumer has not cancelled
// or closed the stream.
if (isResponsePermitted.get()) {

// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;

// Only start sending BlockItems after we've reached
// the beginning of a block.
final SubscribeStreamResponse subscribeStreamResponse = event.get();
final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
if (!streamStarted && blockItem.hasHeader()) {
streamStarted = true;
}

if (streamStarted) {
LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem);
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
import java.io.IOException;

/**
* The StreamMediator interface represents a one-to-many bridge between a bidirectional stream of
* blocks from a producer (e.g. a Consensus Node) and N consumers each requesting a bidirectional
* connection to get a "live stream" of blocks from the producer. StreamMediator satisfies Helidon's
* type requirements for a bidirectional StreamObserver representing a stream of blocks returned
* FROM the downstream consuming client. However, the StreamObserver type may be distinct from Block
* type streamed TO the client. The type definition for the onNext() method provides the flexibility
* for the StreamObserver and the Block types to vary independently.
* The StreamMediator interface represents a bridge between a bidirectional stream of
* items from a producer (e.g. a Consensus Node) and N consumers each requesting a gRPC
* server stream of items. The StreamMediator manages adding and removing consumers
* dynamically to receive items as they arrive from upstream.
*
* @param <U> The type required by the RingBuffer implementation
* @param <U> The type of items sent by the upstream producer.
* @param <V> The type of the response published to the downstream consumers.
*/
public interface StreamMediator<U, V> {

/**
* Publishes an item received from a producer to the downstream consumers.
*
* @param blockItem The block item to publish.
* @throws IOException .
*/
void publishEvent(final U blockItem) throws IOException;

void subscribe(final BlockItemEventHandler<V> handler);
Expand Down

0 comments on commit 3dd03b6

Please sign in to comment.