From d26b0ac094d604ea8b019233b4e55cfe9bf5e105 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Sun, 4 Aug 2024 11:23:54 -0600 Subject: [PATCH] fix: added marker interface to more cleanly separate producer versus consumer visibility Signed-off-by: Matt Peterson --- .../consumer/ConsumerBlockItemObserver.java | 23 ++++---- .../block/server/mediator/EventPublisher.java | 23 ++++++++ .../mediator/LiveStreamMediatorImpl.java | 57 +++++++++---------- .../block/server/mediator/StreamMediator.java | 23 +------- .../server/mediator/SubscriptionHandler.java | 35 ++++++++++++ .../producer/ProducerBlockItemObserver.java | 11 ++-- .../block/server/BlockStreamServiceIT.java | 2 +- .../block/server/BlockStreamServiceTest.java | 4 +- .../mediator/LiveStreamMediatorImplTest.java | 12 ++-- .../ProducerBlockItemObserverTest.java | 2 +- 10 files changed, 112 insertions(+), 80 deletions(-) create mode 100644 server/src/main/java/com/hedera/block/server/mediator/EventPublisher.java create mode 100644 server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 46ab9a71a..40370c8cb 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -20,7 +20,7 @@ import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; import com.hedera.block.server.data.ObjectEvent; -import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.mediator.SubscriptionHandler; import com.lmax.disruptor.EventHandler; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; @@ -39,7 +39,7 @@ public class ConsumerBlockItemObserver private final System.Logger LOGGER = System.getLogger(getClass().getName()); private final StreamObserver subscribeStreamResponseObserver; - private final StreamMediator> streamMediator; + private final SubscriptionHandler> subscriptionHandler; private final long timeoutThresholdMillis; private final InstantSource producerLivenessClock; @@ -52,21 +52,18 @@ public class ConsumerBlockItemObserver protected Runnable onClose; /** - * Constructor for the ConsumerBlockItemObserver class. - * - * @param timeoutThresholdMillis The timeout threshold in milliseconds. - * @param producerLivenessClock The producer liveness clock. - * @param streamMediator The StreamMediator instance. - * @param subscribeStreamResponseObserver The StreamObserver instance. + * Constructor for the ConsumerBlockItemObserver class. It is responsible for observing the + * SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer + * via the subscribeStreamResponseObserver. */ public ConsumerBlockItemObserver( final long timeoutThresholdMillis, final InstantSource producerLivenessClock, - final StreamMediator> streamMediator, + final SubscriptionHandler> subscriptionHandler, final StreamObserver subscribeStreamResponseObserver) { this.timeoutThresholdMillis = timeoutThresholdMillis; - this.streamMediator = streamMediator; + this.subscriptionHandler = subscriptionHandler; // The ServerCallStreamObserver can be configured with Runnable handlers to // be executed when a downstream consumer closes the connection. The handlers @@ -81,7 +78,7 @@ public ConsumerBlockItemObserver( // The consumer has cancelled the stream. // Do not allow additional responses to be sent. isResponsePermitted.set(false); - streamMediator.unsubscribe(this); + subscriptionHandler.unsubscribe(this); LOGGER.log( System.Logger.Level.INFO, "Consumer cancelled stream. Observer unsubscribed."); @@ -93,7 +90,7 @@ public ConsumerBlockItemObserver( // The consumer has closed the stream. // Do not allow additional responses to be sent. isResponsePermitted.set(false); - streamMediator.unsubscribe(this); + subscriptionHandler.unsubscribe(this); LOGGER.log( System.Logger.Level.INFO, "Consumer completed stream. Observer unsubscribed."); @@ -116,7 +113,7 @@ public void onEvent( if (isResponsePermitted.get()) { final long currentMillis = producerLivenessClock.millis(); if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) { - streamMediator.unsubscribe(this); + subscriptionHandler.unsubscribe(this); LOGGER.log( System.Logger.Level.DEBUG, "Unsubscribed ConsumerBlockItemObserver due to producer timeout"); diff --git a/server/src/main/java/com/hedera/block/server/mediator/EventPublisher.java b/server/src/main/java/com/hedera/block/server/mediator/EventPublisher.java new file mode 100644 index 000000000..254d83fbe --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/mediator/EventPublisher.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + +import java.io.IOException; + +public interface EventPublisher { + void publish(final U blockItem) throws IOException; +} diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index eddbd4b66..7929897e3 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -92,45 +92,44 @@ public LiveStreamMediatorImpl( } @Override - public void publishEvent(final BlockItem blockItem) throws IOException { + public void publish(final BlockItem blockItem) throws IOException { - try { - if (serviceStatus.isRunning()) { + if (serviceStatus.isRunning()) { - // Publish the block for all subscribers to receive - LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); - final var subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); - ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse)); + // Publish the block for all subscribers to receive + LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); + final var subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse)); + try { // Persist the BlockItem blockPersistenceHandler.persist(blockItem); + } catch (IOException e) { + // Disable BlockItem publication for upstream producers + serviceStatus.setRunning(false); + LOGGER.log( + System.Logger.Level.ERROR, + "An exception occurred while attempting to persist the BlockItem: {0}", + blockItem, + e); - } else { - LOGGER.log(System.Logger.Level.ERROR, "StreamMediator is not accepting BlockItems"); - } - } catch (IOException e) { - // Disable publishing BlockItems from upstream producers - serviceStatus.setRunning(false); - LOGGER.log( - System.Logger.Level.ERROR, - "An exception occurred while attempting to persist the BlockItem: {0}", - blockItem, - e); + LOGGER.log(System.Logger.Level.INFO, "Send a response to end the stream"); - LOGGER.log(System.Logger.Level.INFO, "Send a response to end the stream"); + // Publish the block for all subscribers to receive + final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse(); + ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse)); - // Publish the block for all subscribers to receive - final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse(); - ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse)); + // Unsubscribe all downstream consumers + for (final var subscriber : subscribers.keySet()) { + LOGGER.log(System.Logger.Level.INFO, "Unsubscribing: {0}", subscriber); + unsubscribe(subscriber); + } - // Unsubscribe all downstream consumers - for (final var subscriber : subscribers.keySet()) { - LOGGER.log(System.Logger.Level.INFO, "Unsubscribing: {0}", subscriber); - unsubscribe(subscriber); + throw e; } - - throw e; + } else { + LOGGER.log(System.Logger.Level.ERROR, "StreamMediator is not accepting BlockItems"); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index 73a0bbf8d..d1e59f7c1 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -16,31 +16,10 @@ package com.hedera.block.server.mediator; -import com.lmax.disruptor.EventHandler; -import java.io.IOException; - /** * The StreamMediator interface represents a bridge between a bidirectional stream of items from a * producer (e.g. a Consensus Node) and N consumers each requesting a gRPC server stream of items. - * The StreamMediator manages adding and removing consumers dynamically to receive items as they - * arrive from upstream. * * @param The type of items sent by the upstream producer. - * @param The type of the response published to the downstream consumers. */ -public interface StreamMediator { - - /** - * Publishes an item received from a producer to the downstream consumers. - * - * @param blockItem The block item to publish. - * @throws IOException . - */ - void publishEvent(final U blockItem) throws IOException; - - void subscribe(final EventHandler handler); - - void unsubscribe(final EventHandler handler); - - boolean isSubscribed(final EventHandler handler); -} +public interface StreamMediator extends EventPublisher, SubscriptionHandler {} diff --git a/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java new file mode 100644 index 000000000..5cbcc0ccb --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + +import com.lmax.disruptor.EventHandler; + +/** + * The SubscriptionHandler interface represents a bridge between a producer and N consumers each + * requesting a gRPC server stream of items. The SubscriptionHandler manages adding and removing + * consumers dynamically to receive items as they arrive from upstream. + * + * @param The type of the response published to the downstream consumers. + */ +public interface SubscriptionHandler { + + void subscribe(final EventHandler handler); + + void unsubscribe(final EventHandler handler); + + boolean isSubscribed(final EventHandler handler); +} diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 7847b994d..249f736b4 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -20,8 +20,7 @@ import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; import com.hedera.block.server.ServiceStatus; -import com.hedera.block.server.data.ObjectEvent; -import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.mediator.EventPublisher; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.security.NoSuchAlgorithmException; @@ -36,7 +35,7 @@ public class ProducerBlockItemObserver implements StreamObserver publishStreamResponseObserver; - private final StreamMediator> streamMediator; + private final EventPublisher eventPublisher; private final ItemAckBuilder itemAckBuilder; private final ServiceStatus serviceStatus; @@ -46,12 +45,12 @@ public class ProducerBlockItemObserver implements StreamObserver> streamMediator, + final EventPublisher eventPublisher, final StreamObserver publishStreamResponseObserver, final ItemAckBuilder itemAckBuilder, final ServiceStatus serviceStatus) { - this.streamMediator = streamMediator; + this.eventPublisher = eventPublisher; this.publishStreamResponseObserver = publishStreamResponseObserver; this.itemAckBuilder = itemAckBuilder; this.serviceStatus = serviceStatus; @@ -73,7 +72,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { if (serviceStatus.isRunning()) { // Publish the block to the mediator - streamMediator.publishEvent(blockItem); + eventPublisher.publish(blockItem); try { // Send a successful response diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index a63441206..c842442fa 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -133,7 +133,7 @@ public void testPublishBlockStreamRegistrationAndExecution() PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); // Verify the BlockItem message is sent to the mediator - verify(streamMediator, timeout(testTimeout).times(1)).publishEvent(blockItem); + verify(streamMediator, timeout(testTimeout).times(1)).publish(blockItem); // Verify our custom StreamObserver implementation builds and sends // a response back to the producer diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java index 305293bea..2a235bccd 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -110,7 +110,7 @@ public void testServiceName() throws IOException, NoSuchAlgorithmException { // Verify other methods not invoked verify(itemAckBuilder, never()).buildAck(any(BlockItem.class)); - verify(streamMediator, never()).publishEvent(any(BlockItem.class)); + verify(streamMediator, never()).publish(any(BlockItem.class)); } @Test @@ -129,7 +129,7 @@ public void testProto() throws IOException, NoSuchAlgorithmException { // Verify other methods not invoked verify(itemAckBuilder, never()).buildAck(any(BlockItem.class)); - verify(streamMediator, never()).publishEvent(any(BlockItem.class)); + verify(streamMediator, never()).publish(any(BlockItem.class)); } @Test diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index b94491556..b444dc509 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -110,7 +110,7 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException { final BlockItem blockItem = BlockItem.newBuilder().build(); // Acting as a producer, notify the mediator of a new block - streamMediator.publishEvent(blockItem); + streamMediator.publish(blockItem); // Confirm the BlockStorage write method was // called despite the absence of subscribers @@ -159,7 +159,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); // Acting as a producer, notify the mediator of a new block - streamMediator.publishEvent(blockItem); + streamMediator.publish(blockItem); // Confirm each subscriber was notified of the new block verify(streamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse); @@ -222,7 +222,7 @@ public void testOnCancelSubscriptionHandling() throws IOException { // Simulate the producer notifying the mediator of a new block final List blockItems = generateBlockItems(1); - streamMediator.publishEvent(blockItems.getFirst()); + streamMediator.publish(blockItems.getFirst()); // Simulate the consumer cancelling the stream testConsumerBlockItemObserver.getOnCancel().run(); @@ -256,7 +256,7 @@ public void testOnCloseSubscriptionHandling() throws IOException { // Simulate the producer notifying the mediator of a new block final List blockItems = generateBlockItems(1); - streamMediator.publishEvent(blockItems.getFirst()); + streamMediator.publish(blockItems.getFirst()); // Simulate the consumer completing the stream testConsumerBlockItemObserver.getOnClose().run(); @@ -282,12 +282,12 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr // is not able to publish a block after the first producer fails. doThrow(new IOException()).when(blockPersistenceHandler).persist(firstBlockItem); try { - streamMediator.publishEvent(firstBlockItem); + streamMediator.publish(firstBlockItem); fail("Expected an IOException to be thrown"); } catch (IOException e) { final BlockItem secondBlockItem = blockItems.get(1); - streamMediator.publishEvent(secondBlockItem); + streamMediator.publish(secondBlockItem); // Confirm the BlockPersistenceHandler write method was only called // once despite the second block being published. diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index 3dc0ac922..b7b7e92b5 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -79,7 +79,7 @@ public void testProducerOnNext() throws IOException, NoSuchAlgorithmException { PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build(); producerBlockItemObserver.onNext(publishStreamRequest); - verify(streamMediator, timeout(50).times(1)).publishEvent(blockHeader); + verify(streamMediator, timeout(50).times(1)).publish(blockHeader); final ItemAcknowledgement itemAck = ItemAcknowledgement.newBuilder()