From 92cbe236724a26047c81c31cff31ada6bb2ee644 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Tue, 30 Jul 2024 13:34:54 -0600 Subject: [PATCH] fix: implementing type changes and additional tests Signed-off-by: Matt Peterson --- protos/src/main/protobuf/blockstream.proto | 16 ++++++- .../block/server/BlockStreamService.java | 24 +++++++--- .../mediator/LiveStreamMediatorImpl.java | 4 +- .../block/server/BlockStreamServiceIT.java | 46 +++++++++++++++++-- .../block/server/BlockStreamServiceTest.java | 8 ++-- 5 files changed, 80 insertions(+), 18 deletions(-) diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index fdcea96f7..5b9f4538c 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -98,10 +98,22 @@ message SubscribeStreamRequest { } message SubscribeStreamResponse { - int32 status = 1; - BlockItem block_item = 2; + oneof response { + SubscribeStreamResponseCode status = 1; + BlockItem block_item = 2; + } + + enum SubscribeStreamResponseCode { + READ_STREAM_UNKNOWN = 0; + READ_STREAM_INSUFFICIENT_BALANCE = 1; + READ_STREAM_SUCCESS = 2; + READ_STREAM_INVALID_START_BLOCK_NUMBER = 3; + READ_STREAM_INVALID_END_BLOCK_NUMBER = 4; + } } + + message Block { repeated BlockItem block_items = 1; } diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index f6bb538a2..088df21b8 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -153,6 +153,8 @@ void subscribeBlockStream( LOGGER.log( System.Logger.Level.ERROR, "Server Streaming subscribeBlockStream gRPC Service is not currently running"); + + subscribeStreamResponseObserver.onNext(buildSubscribeStreamNotAvailableResponse()); } } @@ -172,37 +174,45 @@ void singleBlock( System.Logger.Level.INFO, "Successfully returning block number: {0}", blockNumber); - singleBlockResponseStreamObserver.onNext(buildBlockResponse(blockOpt.get())); + singleBlockResponseStreamObserver.onNext( + buildSingleBlockResponse(blockOpt.get())); } else { LOGGER.log(System.Logger.Level.INFO, "Block number {0} not found", blockNumber); - singleBlockResponseStreamObserver.onNext(buildNotFoundResponse()); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse()); } } catch (IOException e) { LOGGER.log( System.Logger.Level.ERROR, "Error reading block number: {0}", blockNumber); - singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse()); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); } } else { LOGGER.log( System.Logger.Level.ERROR, "Unary singleBlock gRPC method is not currently running"); - singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse()); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); } } - static SingleBlockResponse buildBlockResponse(final Block block) { + static SingleBlockResponse buildSingleBlockResponse(final Block block) { return SingleBlockResponse.newBuilder().setBlock(block).build(); } - static SingleBlockResponse buildNotAvailableResponse() { + static SingleBlockResponse buildSingleBlockNotAvailableResponse() { return SingleBlockResponse.newBuilder() .setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) .build(); } - static SingleBlockResponse buildNotFoundResponse() { + static SingleBlockResponse buildSingleBlockNotFoundResponse() { return SingleBlockResponse.newBuilder() .setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_FOUND) .build(); } + + // TODO: Fix this error type once we have it - should not be success + static SubscribeStreamResponse buildSubscribeStreamNotAvailableResponse() { + return SubscribeStreamResponse.newBuilder() + .setStatus(SubscribeStreamResponse.SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); + } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index b04ce2c9c..417603f89 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -180,7 +180,9 @@ private static SubscribeStreamResponse buildEndStreamResponse() { // The current spec does not contain a generic error code for // SubscribeStreamResponseCode. // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? - return SubscribeStreamResponse.newBuilder().setStatus(2).build(); + return SubscribeStreamResponse.newBuilder() + .setStatus(SubscribeStreamResponse.SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); } @Override diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index f58642c76..6b1614523 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -21,8 +21,7 @@ import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; @@ -64,6 +63,7 @@ public class BlockStreamServiceIT { @Mock private StreamMediator> streamMediator; @Mock private StreamObserver publishStreamResponseObserver; + @Mock private StreamObserver singleBlockResponseStreamObserver; @Mock private SubscribeStreamRequest subscribeStreamRequest; @@ -429,12 +429,24 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build(); streamObserver.onNext(publishStreamRequest); - // Simulate another producer attempting to connect to the Block Node. + // Simulate another producer attempting to connect to the Block Node after the exception. // Later, verify they received a response indicating the stream is closed. final StreamObserver expectedNoOpStreamObserver = blockStreamService.publishBlockStream(publishStreamResponseObserver); expectedNoOpStreamObserver.onNext(publishStreamRequest); + // Build a request to invoke the singleBlock service + final SingleBlockRequest singleBlockRequest = + SingleBlockRequest.newBuilder().setBlockNumber(1).build(); + // Simulate a consumer attempting to connect to the Block Node after the exception. + blockStreamService.singleBlock(singleBlockRequest, singleBlockResponseStreamObserver); + + // Build a request to invoke the subscribeBlockStream service + final SubscribeStreamRequest subscribeStreamRequest = + SubscribeStreamRequest.newBuilder().setStartBlockNumber(1).build(); + // Simulate a consumer attempting to connect to the Block Node after the exception. + blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver4); + synchronized (lock) { lock.wait(50); } @@ -448,8 +460,13 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse); // Verify all the consumers received the end of stream response + // TODO: Fix the response code when it's available final SubscribeStreamResponse endStreamResponse = - SubscribeStreamResponse.newBuilder().setStatus(2).build(); + SubscribeStreamResponse.newBuilder() + .setStatus( + SubscribeStreamResponse.SubscribeStreamResponseCode + .READ_STREAM_SUCCESS) + .build(); verify(subscribeStreamObserver1, times(1)).onNext(endStreamResponse); verify(subscribeStreamObserver2, times(1)).onNext(endStreamResponse); verify(subscribeStreamObserver3, times(1)).onNext(endStreamResponse); @@ -459,6 +476,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() assertFalse(streamMediator.isSubscribed(s)); } + // Verify the publishBlockStream service returned the expected + // error code indicating the service is not available. final EndOfStream endOfStream = EndOfStream.newBuilder() .setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) @@ -472,6 +491,25 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); final Optional blockOpt = blockReader.read(1); assertTrue(blockOpt.isEmpty()); + + // Verify the singleBlock service returned the expected + // error code indicating the service is not available. + final SingleBlockResponse expectedSingleBlockNotAvailable = + SingleBlockResponse.newBuilder() + .setStatus( + SingleBlockResponse.SingleBlockResponseCode + .READ_BLOCK_NOT_AVAILABLE) + .build(); + verify(singleBlockResponseStreamObserver, times(1)).onNext(expectedSingleBlockNotAvailable); + + // TODO: Fix the response code when it's available + final SubscribeStreamResponse expectedSubscriberStreamNotAvailable = + SubscribeStreamResponse.newBuilder() + .setStatus( + SubscribeStreamResponse.SubscribeStreamResponseCode + .READ_STREAM_SUCCESS) + .build(); + verify(subscribeStreamObserver4, times(1)).onNext(expectedSubscriberStreamNotAvailable); } private void removeRootPathWritePerms(final Config config) throws IOException { diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java index 77a4d6c5c..28c0a4ffa 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -17,8 +17,8 @@ package com.hedera.block.server; import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.server.BlockStreamService.buildNotAvailableResponse; -import static com.hedera.block.server.BlockStreamService.buildNotFoundResponse; +import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotAvailableResponse; +import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotFoundResponse; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -172,7 +172,7 @@ void testSingleBlockNotFoundPath() throws IOException { when(blockPersistenceHandler.retrieve(1)).thenReturn(Optional.empty()); // Build a response to verify what's passed to the response observer - final SingleBlockResponse expectedNotFound = buildNotFoundResponse(); + final SingleBlockResponse expectedNotFound = buildSingleBlockNotFoundResponse(); // Build a request to invoke the service final SingleBlockRequest singleBlockRequest = @@ -203,7 +203,7 @@ void testSingleBlockServiceNotAvailable() throws IOException { // Set the service status to not running when(serviceStatus.isRunning()).thenReturn(false); - final SingleBlockResponse expectedNotAvailable = buildNotAvailableResponse(); + final SingleBlockResponse expectedNotAvailable = buildSingleBlockNotAvailableResponse(); // Build a request to invoke the service final SingleBlockRequest singleBlockRequest =