diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 66f295e0d..25b9ba1a0 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -94,7 +94,7 @@ public void update(final Routing routing) { routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream); } - private StreamObserver publishBlockStream( + StreamObserver publishBlockStream( final StreamObserver publishStreamResponseObserver) { LOGGER.log( System.Logger.Level.DEBUG, @@ -104,7 +104,7 @@ private StreamObserver publishBlockStream( streamMediator, publishStreamResponseObserver, itemAckBuilder); } - private void subscribeBlockStream( + void subscribeBlockStream( final SubscribeStreamRequest subscribeStreamRequest, final StreamObserver subscribeStreamResponseObserver) { LOGGER.log( diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 7e0313b0a..402ee5ca8 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -35,11 +35,11 @@ public class Server { // Function stubs to satisfy the routing param signatures. The implementations are in the // service class. - private static ServerCalls.BidiStreamingMethod< + static ServerCalls.BidiStreamingMethod< StreamObserver, StreamObserver> clientBidiStreamingMethod; - public static ServerCalls.ServerStreamingMethod< + static ServerCalls.ServerStreamingMethod< SubscribeStreamRequest, StreamObserver> serverStreamingMethod; diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java new file mode 100644 index 000000000..f788f63d5 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -0,0 +1,220 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server; + +import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.protobuf.Descriptors; +import com.hedera.block.server.data.ObjectEvent; +import com.hedera.block.server.mediator.LiveStreamMediatorImpl; +import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.persistence.WriteThroughCacheHandler; +import com.hedera.block.server.persistence.storage.BlockAsDirReader; +import com.hedera.block.server.persistence.storage.BlockAsDirWriter; +import com.hedera.block.server.persistence.storage.BlockReader; +import com.hedera.block.server.persistence.storage.BlockWriter; +import com.hedera.block.server.producer.ItemAckBuilder; +import com.hedera.block.server.util.TestUtils; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import io.helidon.config.Config; +import io.helidon.config.MapConfigSource; +import io.helidon.config.spi.ConfigSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BlockStreamServiceIT { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + private final Object lock = new Object(); + + @Mock private StreamMediator, BlockItem> streamMediator; + + @Mock private StreamObserver publishStreamResponseObserver; + + @Mock private SubscribeStreamRequest subscribeStreamRequest; + + @Mock private StreamObserver subscribeStreamObserver1; + @Mock private StreamObserver subscribeStreamObserver2; + @Mock private StreamObserver subscribeStreamObserver3; + + @Mock private BlockReader blockReader; + @Mock private BlockWriter blockWriter; + @Mock private Consumer, BlockItem>> testCallback; + + private static final String TEMP_DIR = "block-node-unit-test-dir"; + private static final String JUNIT = "my-junit-test"; + + private Path testPath; + private Config testConfig; + + @BeforeEach + public void setUp() throws IOException { + testPath = Files.createTempDirectory(TEMP_DIR); + LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString()); + + Map testProperties = Map.of(JUNIT, testPath.toString()); + ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); + testConfig = Config.builder(testConfigSource).build(); + } + + @AfterEach + public void tearDown() { + TestUtils.deleteDirectory(testPath.toFile()); + } + + @Test + public void testPublishBlockStreamRegistrationAndExec() + throws InterruptedException, IOException, NoSuchAlgorithmException { + + final BlockStreamService blockStreamService = + new BlockStreamService(50L, new ItemAckBuilder(), streamMediator); + + final StreamObserver streamObserver = + blockStreamService.publishBlockStream(publishStreamResponseObserver); + + final BlockItem blockItem = generateBlockItems(1).get(0); + final PublishStreamRequest publishStreamRequest = + PublishStreamRequest.newBuilder().setBlockItem(blockItem).build(); + + // Calling onNext() as Helidon will + streamObserver.onNext(publishStreamRequest); + + synchronized (lock) { + lock.wait(50); + } + + final ItemAcknowledgement itemAck = new ItemAckBuilder().buildAck(blockItem); + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); + + // Verify the BlockItem message is sent to the mediator + verify(streamMediator, times(1)).publishEvent(blockItem); + + // Verify our custom StreamObserver implementation builds and sends + // a response back to the producer + verify(publishStreamResponseObserver, times(1)).onNext(publishStreamResponse); + + // Close the stream as Helidon does + streamObserver.onCompleted(); + + synchronized (lock) { + lock.wait(50); + } + + // verify the onCompleted() method is invoked on the wrapped StreamObserver + verify(publishStreamResponseObserver, times(1)).onCompleted(); + } + + @Test + public void testSubscribeBlockStream() throws InterruptedException { + final var streamMediator = + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter), testCallback); + + // Build the BlockStreamService + final BlockStreamService blockStreamService = + new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator); + + // Subscribe the consumers + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); + + // Subscribe the producer + final StreamObserver streamObserver = + blockStreamService.publishBlockStream(publishStreamResponseObserver); + + // Build the BlockItem + final List blockItems = generateBlockItems(1); + final PublishStreamRequest publishStreamRequest = + PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build(); + + // Calling onNext() with a BlockItem + streamObserver.onNext(publishStreamRequest); + + synchronized (lock) { + lock.wait(50); + } + + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build(); + + verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse); + + } + + @Test + public void testFullHappyPath() throws IOException, InterruptedException { + + // Initialize with concrete a concrete BlockReader, BlockWriter and Mediator + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + final var streamMediator = + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter), testCallback); + + // Build the BlockStreamService + final BlockStreamService blockStreamService = + new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator); + + // Pass a StreamObserver to the producer as Helidon will + final StreamObserver streamObserver = + blockStreamService.publishBlockStream(publishStreamResponseObserver); + + final List blockItems = generateBlockItems(1); + + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3); + + final PublishStreamRequest publishStreamRequest = + PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build(); + streamObserver.onNext(publishStreamRequest); + + synchronized (lock) { + lock.wait(50); + } + + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build(); + verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse); + verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse); + + } +} diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java new file mode 100644 index 000000000..26d327e3c --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -0,0 +1,55 @@ +package com.hedera.block.server; + +import com.google.protobuf.Descriptors; +import com.hedera.block.server.data.ObjectEvent; +import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.producer.ItemAckBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; + +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class BlockStreamServiceTest { + + @Mock + private ItemAckBuilder itemAckBuilder; + + @Mock private StreamMediator, com.hedera.block.protos.BlockStreamService.BlockItem> streamMediator; + + @Test + public void testServiceName() throws IOException, NoSuchAlgorithmException { + final BlockStreamService blockStreamService = + new BlockStreamService(50L, itemAckBuilder, streamMediator); + + // Verify the service name + assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName()); + + // Verify other methods not invoked + verify(itemAckBuilder, never()).buildAck(any(BlockItem.class)); + verify(streamMediator, never()).publishEvent(any(BlockItem.class)); + } + + @Test + public void testProto() throws IOException, NoSuchAlgorithmException { + final BlockStreamService blockStreamService = + new BlockStreamService(50L, itemAckBuilder, streamMediator); + Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto(); + + // Verify the current rpc methods + assertEquals(2, fileDescriptor.getServices().get(0).getMethods().size()); + + // Verify other methods not invoked + verify(itemAckBuilder, never()).buildAck(any(BlockItem.class)); + verify(streamMediator, never()).publishEvent(any(BlockItem.class)); + } +}