diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index aa3ae2d55..ae7daf62f 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -108,12 +108,8 @@ StreamObserver publishBlockStream( System.Logger.Level.DEBUG, "Executing bidirectional publishBlockStream gRPC method"); - if (serviceStatus.isRunning()) { - return new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus); - } - - return null; + return new ProducerBlockItemObserver( + streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus); } void subscribeBlockStream( diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index b71eeb675..88f66f2f4 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -78,7 +78,6 @@ public static void main(final String[] args) { serviceStatus.setWebServer(webServer); streamMediator.register(serviceStatus); - // Start the web server webServer.start(); } catch (IOException e) { @@ -87,7 +86,8 @@ public static void main(final String[] args) { } private static StreamMediator> - buildStreamMediator(final Config config, final ServiceStatus serviceStatus) throws IOException { + buildStreamMediator(final Config config, final ServiceStatus serviceStatus) + throws IOException { return new LiveStreamMediatorImpl( new ConcurrentHashMap<>(32), new FileSystemPersistenceHandler( diff --git a/server/src/main/java/com/hedera/block/server/ServiceStatus.java b/server/src/main/java/com/hedera/block/server/ServiceStatus.java index b5f0e8f0c..df7a50233 100644 --- a/server/src/main/java/com/hedera/block/server/ServiceStatus.java +++ b/server/src/main/java/com/hedera/block/server/ServiceStatus.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.block.server; import io.helidon.webserver.WebServer; diff --git a/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java b/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java index 955d459a8..8a6da4e3f 100644 --- a/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java +++ b/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java @@ -17,7 +17,6 @@ package com.hedera.block.server; import io.helidon.webserver.WebServer; - import java.util.concurrent.atomic.AtomicBoolean; public class ServiceStatusImpl implements ServiceStatus { diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 3c0c59511..b04ce2c9c 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -16,6 +16,8 @@ package com.hedera.block.server.mediator; +import static com.hedera.block.protos.BlockStreamService.*; + import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.consumer.BlockItemEventHandler; @@ -26,15 +28,12 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; - import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static com.hedera.block.protos.BlockStreamService.*; - /** * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible * for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies @@ -87,6 +86,12 @@ public LiveStreamMediatorImpl( this(new ConcurrentHashMap<>(), blockPersistenceHandler, new ServiceStatusImpl()); } + public LiveStreamMediatorImpl( + final BlockPersistenceHandler blockPersistenceHandler, + final ServiceStatus serviceStatus) { + this(new ConcurrentHashMap<>(), blockPersistenceHandler, serviceStatus); + } + @Override public void publishEvent(final BlockItem blockItem) throws IOException { @@ -130,11 +135,6 @@ public void publishEvent(final BlockItem blockItem) throws IOException { } } - @Override - public boolean isPublishing() { - return serviceStatus.isRunning(); - } - @Override public void subscribe( final BlockItemEventHandler> handler) { diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index 8eaf00498..e1775a4bf 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -18,7 +18,6 @@ import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.consumer.BlockItemEventHandler; - import java.io.IOException; /** @@ -36,8 +35,6 @@ public interface StreamMediator { void publishEvent(final U blockItem) throws IOException; - boolean isPublishing(); - void subscribe(final BlockItemEventHandler handler); void unsubscribe(final BlockItemEventHandler handler); diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index a27da3662..bf8b9f944 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -16,17 +16,16 @@ package com.hedera.block.server.producer; +import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; + import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; - import java.io.IOException; import java.security.NoSuchAlgorithmException; -import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; - /** * The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC * service implementation. Helidon calls methods on this class as networking events occur with the @@ -71,7 +70,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { try { // Publish the block to all the subscribers unless // there's an issue with the StreamMediator. - if (streamMediator.isPublishing()) { + if (serviceStatus.isRunning()) { // Publish the block to the mediator streamMediator.publishEvent(blockItem); @@ -106,6 +105,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { } private static PublishStreamResponse buildErrorStreamResponse() { + // TODO: Replace this with a real error enum. final EndOfStream endOfStream = EndOfStream.newBuilder() .setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index c5bf89e69..f88fa5ce1 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -19,8 +19,7 @@ import static com.hedera.block.protos.BlockStreamService.*; import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; import com.hedera.block.server.consumer.BlockItemEventHandler; @@ -73,6 +72,7 @@ public class BlockStreamServiceIT { @Mock private StreamObserver subscribeStreamObserver6; @Mock private WebServer webServer; + @Mock private ServiceStatus serviceStatus; @Mock private BlockReader blockReader; @Mock private BlockWriter blockWriter; @@ -104,8 +104,7 @@ public void testPublishBlockStreamRegistrationAndExecution() final BlockStreamService blockStreamService = new BlockStreamService(50L, new ItemAckBuilder(), streamMediator); - - when(streamMediator.isPublishing()).thenReturn(true); + blockStreamService.register(webServer); final StreamObserver streamObserver = blockStreamService.publishBlockStream(publishStreamResponseObserver); @@ -420,9 +419,14 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() final List blockItems = generateBlockItems(1); final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build(); - streamObserver.onNext(publishStreamRequest); + // Simulate another producer attempting to connect to the Block Node. + // Later, verify they received a response indicating the stream is closed. + final StreamObserver expectedNoOpStreamObserver = + blockStreamService.publishBlockStream(publishStreamResponseObserver); + expectedNoOpStreamObserver.onNext(publishStreamRequest); + synchronized (lock) { lock.wait(50); } @@ -453,7 +457,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() .build(); final var endOfStreamResponse = PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); - verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse); + verify(publishStreamResponseObserver, times(2)).onNext(endOfStreamResponse); verify(webServer, times(1)).stop(); // Now verify the block was removed from the file system. @@ -529,7 +533,8 @@ private StreamMediator> buildStr final ServiceStatus serviceStatus = new ServiceStatusImpl(); serviceStatus.setWebServer(webServer); return new LiveStreamMediatorImpl( - subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter), + subscribers, + new FileSystemPersistenceHandler(blockReader, blockWriter), serviceStatus); } diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java index 684e5ec18..beaf397d7 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java @@ -228,7 +228,7 @@ public void testPartialBlockRemoval() throws IOException { JUNIT, testConfig, blockRemover, Util.defaultPerms, 23); // Write all the block items for 2 blocks - for (int i = 0;i < 20;i++) { + for (int i = 0; i < 20; i++) { blockWriter.write(blockItems.get(i)); } diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index e88ff078f..d439e23b3 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -27,6 +27,7 @@ import com.google.protobuf.ByteString; import com.hedera.block.protos.BlockStreamService; import com.hedera.block.server.ServiceStatus; +import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.consumer.ConsumerBlockItemObserver; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.LiveStreamMediatorImpl; @@ -73,7 +74,7 @@ public void testProducerOnNext() new ItemAckBuilder(), serviceStatus); - when(streamMediator.isPublishing()).thenReturn(true); + when(serviceStatus.isRunning()).thenReturn(true); BlockItem blockHeader = blockItems.getFirst(); PublishStreamRequest publishStreamRequest = @@ -106,9 +107,10 @@ public void testProducerOnNext() public void testProducerToManyConsumers() throws IOException, InterruptedException { final long TIMEOUT_THRESHOLD_MILLIS = 100L; final long TEST_TIME = 1_719_427_664_950L; + final ServiceStatus serviceStatus = new ServiceStatusImpl(); final var streamMediator = new LiveStreamMediatorImpl( - new FileSystemPersistenceHandler(blockReader, blockWriter)); + new FileSystemPersistenceHandler(blockReader, blockWriter), serviceStatus); final var concreteObserver1 = new ConsumerBlockItemObserver( @@ -201,7 +203,7 @@ public void testItemAckBuilderExceptionTest() itemAckBuilder, serviceStatus); - when(streamMediator.isPublishing()).thenReturn(true); + when(serviceStatus.isRunning()).thenReturn(true); when(itemAckBuilder.buildAck(any())) .thenThrow(new NoSuchAlgorithmException("Test exception"));