Skip to content

Commit

Permalink
fix: added marker interface to more cleanly separate producer versus …
Browse files Browse the repository at this point in the history
…consumer visibility

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 4, 2024
1 parent a030376 commit d26b0ac
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.lmax.disruptor.EventHandler;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
Expand All @@ -39,7 +39,7 @@ public class ConsumerBlockItemObserver
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>> subscriptionHandler;

private final long timeoutThresholdMillis;
private final InstantSource producerLivenessClock;
Expand All @@ -52,21 +52,18 @@ public class ConsumerBlockItemObserver
protected Runnable onClose;

/**
* Constructor for the ConsumerBlockItemObserver class.
*
* @param timeoutThresholdMillis The timeout threshold in milliseconds.
* @param producerLivenessClock The producer liveness clock.
* @param streamMediator The StreamMediator instance.
* @param subscribeStreamResponseObserver The StreamObserver instance.
* Constructor for the ConsumerBlockItemObserver class. It is responsible for observing the
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*/
public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>> subscriptionHandler,
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
this.subscriptionHandler = subscriptionHandler;

// The ServerCallStreamObserver can be configured with Runnable handlers to
// be executed when a downstream consumer closes the connection. The handlers
Expand All @@ -81,7 +78,7 @@ public ConsumerBlockItemObserver(
// The consumer has cancelled the stream.
// Do not allow additional responses to be sent.
isResponsePermitted.set(false);
streamMediator.unsubscribe(this);
subscriptionHandler.unsubscribe(this);
LOGGER.log(
System.Logger.Level.INFO,
"Consumer cancelled stream. Observer unsubscribed.");
Expand All @@ -93,7 +90,7 @@ public ConsumerBlockItemObserver(
// The consumer has closed the stream.
// Do not allow additional responses to be sent.
isResponsePermitted.set(false);
streamMediator.unsubscribe(this);
subscriptionHandler.unsubscribe(this);
LOGGER.log(
System.Logger.Level.INFO,
"Consumer completed stream. Observer unsubscribed.");
Expand All @@ -116,7 +113,7 @@ public void onEvent(
if (isResponsePermitted.get()) {
final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
streamMediator.unsubscribe(this);
subscriptionHandler.unsubscribe(this);
LOGGER.log(
System.Logger.Level.DEBUG,
"Unsubscribed ConsumerBlockItemObserver due to producer timeout");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.mediator;

import java.io.IOException;

public interface EventPublisher<U> {
void publish(final U blockItem) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,45 +92,44 @@ public LiveStreamMediatorImpl(
}

@Override
public void publishEvent(final BlockItem blockItem) throws IOException {
public void publish(final BlockItem blockItem) throws IOException {

try {
if (serviceStatus.isRunning()) {
if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem);
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));
// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem);
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));

try {
// Persist the BlockItem
blockPersistenceHandler.persist(blockItem);
} catch (IOException e) {
// Disable BlockItem publication for upstream producers
serviceStatus.setRunning(false);
LOGGER.log(
System.Logger.Level.ERROR,
"An exception occurred while attempting to persist the BlockItem: {0}",
blockItem,
e);

} else {
LOGGER.log(System.Logger.Level.ERROR, "StreamMediator is not accepting BlockItems");
}
} catch (IOException e) {
// Disable publishing BlockItems from upstream producers
serviceStatus.setRunning(false);
LOGGER.log(
System.Logger.Level.ERROR,
"An exception occurred while attempting to persist the BlockItem: {0}",
blockItem,
e);
LOGGER.log(System.Logger.Level.INFO, "Send a response to end the stream");

LOGGER.log(System.Logger.Level.INFO, "Send a response to end the stream");
// Publish the block for all subscribers to receive
final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse));

// Publish the block for all subscribers to receive
final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse));
// Unsubscribe all downstream consumers
for (final var subscriber : subscribers.keySet()) {
LOGGER.log(System.Logger.Level.INFO, "Unsubscribing: {0}", subscriber);
unsubscribe(subscriber);
}

// Unsubscribe all downstream consumers
for (final var subscriber : subscribers.keySet()) {
LOGGER.log(System.Logger.Level.INFO, "Unsubscribing: {0}", subscriber);
unsubscribe(subscriber);
throw e;
}

throw e;
} else {
LOGGER.log(System.Logger.Level.ERROR, "StreamMediator is not accepting BlockItems");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,10 @@

package com.hedera.block.server.mediator;

import com.lmax.disruptor.EventHandler;
import java.io.IOException;

/**
* The StreamMediator interface represents a bridge between a bidirectional stream of items from a
* producer (e.g. a Consensus Node) and N consumers each requesting a gRPC server stream of items.
* The StreamMediator manages adding and removing consumers dynamically to receive items as they
* arrive from upstream.
*
* @param <U> The type of items sent by the upstream producer.
* @param <V> The type of the response published to the downstream consumers.
*/
public interface StreamMediator<U, V> {

/**
* Publishes an item received from a producer to the downstream consumers.
*
* @param blockItem The block item to publish.
* @throws IOException .
*/
void publishEvent(final U blockItem) throws IOException;

void subscribe(final EventHandler<V> handler);

void unsubscribe(final EventHandler<V> handler);

boolean isSubscribed(final EventHandler<V> handler);
}
public interface StreamMediator<U, V> extends EventPublisher<U>, SubscriptionHandler<V> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.mediator;

import com.lmax.disruptor.EventHandler;

/**
* The SubscriptionHandler interface represents a bridge between a producer and N consumers each
* requesting a gRPC server stream of items. The SubscriptionHandler manages adding and removing
* consumers dynamically to receive items as they arrive from upstream.
*
* @param <V> The type of the response published to the downstream consumers.
*/
public interface SubscriptionHandler<V> {

void subscribe(final EventHandler<V> handler);

void unsubscribe(final EventHandler<V> handler);

boolean isSubscribed(final EventHandler<V> handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.mediator.EventPublisher;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
Expand All @@ -36,7 +35,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final EventPublisher<BlockItem> eventPublisher;
private final ItemAckBuilder itemAckBuilder;
private final ServiceStatus serviceStatus;

Expand All @@ -46,12 +45,12 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
* to the upstream producer via the responseStreamObserver.
*/
public ProducerBlockItemObserver(
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final EventPublisher<BlockItem> eventPublisher,
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
final ItemAckBuilder itemAckBuilder,
final ServiceStatus serviceStatus) {

this.streamMediator = streamMediator;
this.eventPublisher = eventPublisher;
this.publishStreamResponseObserver = publishStreamResponseObserver;
this.itemAckBuilder = itemAckBuilder;
this.serviceStatus = serviceStatus;
Expand All @@ -73,7 +72,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
if (serviceStatus.isRunning()) {

// Publish the block to the mediator
streamMediator.publishEvent(blockItem);
eventPublisher.publish(blockItem);

try {
// Send a successful response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testPublishBlockStreamRegistrationAndExecution()
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();

// Verify the BlockItem message is sent to the mediator
verify(streamMediator, timeout(testTimeout).times(1)).publishEvent(blockItem);
verify(streamMediator, timeout(testTimeout).times(1)).publish(blockItem);

// Verify our custom StreamObserver implementation builds and sends
// a response back to the producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testServiceName() throws IOException, NoSuchAlgorithmException {

// Verify other methods not invoked
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
verify(streamMediator, never()).publish(any(BlockItem.class));
}

@Test
Expand All @@ -129,7 +129,7 @@ public void testProto() throws IOException, NoSuchAlgorithmException {

// Verify other methods not invoked
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
verify(streamMediator, never()).publish(any(BlockItem.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException {
final BlockItem blockItem = BlockItem.newBuilder().build();

// Acting as a producer, notify the mediator of a new block
streamMediator.publishEvent(blockItem);
streamMediator.publish(blockItem);

// Confirm the BlockStorage write method was
// called despite the absence of subscribers
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

// Acting as a producer, notify the mediator of a new block
streamMediator.publishEvent(blockItem);
streamMediator.publish(blockItem);

// Confirm each subscriber was notified of the new block
verify(streamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
Expand Down Expand Up @@ -222,7 +222,7 @@ public void testOnCancelSubscriptionHandling() throws IOException {

// Simulate the producer notifying the mediator of a new block
final List<BlockItem> blockItems = generateBlockItems(1);
streamMediator.publishEvent(blockItems.getFirst());
streamMediator.publish(blockItems.getFirst());

// Simulate the consumer cancelling the stream
testConsumerBlockItemObserver.getOnCancel().run();
Expand Down Expand Up @@ -256,7 +256,7 @@ public void testOnCloseSubscriptionHandling() throws IOException {

// Simulate the producer notifying the mediator of a new block
final List<BlockItem> blockItems = generateBlockItems(1);
streamMediator.publishEvent(blockItems.getFirst());
streamMediator.publish(blockItems.getFirst());

// Simulate the consumer completing the stream
testConsumerBlockItemObserver.getOnClose().run();
Expand All @@ -282,12 +282,12 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
// is not able to publish a block after the first producer fails.
doThrow(new IOException()).when(blockPersistenceHandler).persist(firstBlockItem);
try {
streamMediator.publishEvent(firstBlockItem);
streamMediator.publish(firstBlockItem);
fail("Expected an IOException to be thrown");
} catch (IOException e) {

final BlockItem secondBlockItem = blockItems.get(1);
streamMediator.publishEvent(secondBlockItem);
streamMediator.publish(secondBlockItem);

// Confirm the BlockPersistenceHandler write method was only called
// once despite the second block being published.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testProducerOnNext() throws IOException, NoSuchAlgorithmException {
PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build();
producerBlockItemObserver.onNext(publishStreamRequest);

verify(streamMediator, timeout(50).times(1)).publishEvent(blockHeader);
verify(streamMediator, timeout(50).times(1)).publish(blockHeader);

final ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
Expand Down

0 comments on commit d26b0ac

Please sign in to comment.