Skip to content

Commit

Permalink
fix: implementing type changes and additional tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 30, 2024
1 parent 67a942e commit 92cbe23
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 18 deletions.
16 changes: 14 additions & 2 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,22 @@ message SubscribeStreamRequest {
}

message SubscribeStreamResponse {
int32 status = 1;
BlockItem block_item = 2;
oneof response {
SubscribeStreamResponseCode status = 1;
BlockItem block_item = 2;
}

enum SubscribeStreamResponseCode {
READ_STREAM_UNKNOWN = 0;
READ_STREAM_INSUFFICIENT_BALANCE = 1;
READ_STREAM_SUCCESS = 2;
READ_STREAM_INVALID_START_BLOCK_NUMBER = 3;
READ_STREAM_INVALID_END_BLOCK_NUMBER = 4;
}
}



message Block {
repeated BlockItem block_items = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ void subscribeBlockStream(
LOGGER.log(
System.Logger.Level.ERROR,
"Server Streaming subscribeBlockStream gRPC Service is not currently running");

subscribeStreamResponseObserver.onNext(buildSubscribeStreamNotAvailableResponse());
}
}

Expand All @@ -172,37 +174,45 @@ void singleBlock(
System.Logger.Level.INFO,
"Successfully returning block number: {0}",
blockNumber);
singleBlockResponseStreamObserver.onNext(buildBlockResponse(blockOpt.get()));
singleBlockResponseStreamObserver.onNext(
buildSingleBlockResponse(blockOpt.get()));
} else {
LOGGER.log(System.Logger.Level.INFO, "Block number {0} not found", blockNumber);
singleBlockResponseStreamObserver.onNext(buildNotFoundResponse());
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse());
}
} catch (IOException e) {
LOGGER.log(
System.Logger.Level.ERROR, "Error reading block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse());
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());

Check warning on line 186 in server/src/main/java/com/hedera/block/server/BlockStreamService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/BlockStreamService.java#L183-L186

Added lines #L183 - L186 were not covered by tests
}
} else {
LOGGER.log(
System.Logger.Level.ERROR,
"Unary singleBlock gRPC method is not currently running");
singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse());
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}
}

static SingleBlockResponse buildBlockResponse(final Block block) {
static SingleBlockResponse buildSingleBlockResponse(final Block block) {
return SingleBlockResponse.newBuilder().setBlock(block).build();
}

static SingleBlockResponse buildNotAvailableResponse() {
static SingleBlockResponse buildSingleBlockNotAvailableResponse() {
return SingleBlockResponse.newBuilder()
.setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE)
.build();
}

static SingleBlockResponse buildNotFoundResponse() {
static SingleBlockResponse buildSingleBlockNotFoundResponse() {
return SingleBlockResponse.newBuilder()
.setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_FOUND)
.build();
}

// TODO: Fix this error type once we have it - should not be success
static SubscribeStreamResponse buildSubscribeStreamNotAvailableResponse() {
return SubscribeStreamResponse.newBuilder()
.setStatus(SubscribeStreamResponse.SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ private static SubscribeStreamResponse buildEndStreamResponse() {
// The current spec does not contain a generic error code for
// SubscribeStreamResponseCode.
// TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code?
return SubscribeStreamResponse.newBuilder().setStatus(2).build();
return SubscribeStreamResponse.newBuilder()
.setStatus(SubscribeStreamResponse.SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.data.ObjectEvent;
Expand Down Expand Up @@ -64,6 +63,7 @@ public class BlockStreamServiceIT {
@Mock private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

@Mock private StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
@Mock private StreamObserver<SingleBlockResponse> singleBlockResponseStreamObserver;

@Mock private SubscribeStreamRequest subscribeStreamRequest;

Expand Down Expand Up @@ -429,12 +429,24 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build();
streamObserver.onNext(publishStreamRequest);

// Simulate another producer attempting to connect to the Block Node.
// Simulate another producer attempting to connect to the Block Node after the exception.
// Later, verify they received a response indicating the stream is closed.
final StreamObserver<PublishStreamRequest> expectedNoOpStreamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);
expectedNoOpStreamObserver.onNext(publishStreamRequest);

// Build a request to invoke the singleBlock service
final SingleBlockRequest singleBlockRequest =
SingleBlockRequest.newBuilder().setBlockNumber(1).build();
// Simulate a consumer attempting to connect to the Block Node after the exception.
blockStreamService.singleBlock(singleBlockRequest, singleBlockResponseStreamObserver);

// Build a request to invoke the subscribeBlockStream service
final SubscribeStreamRequest subscribeStreamRequest =
SubscribeStreamRequest.newBuilder().setStartBlockNumber(1).build();
// Simulate a consumer attempting to connect to the Block Node after the exception.
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver4);

synchronized (lock) {
lock.wait(50);
}
Expand All @@ -448,8 +460,13 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);

// Verify all the consumers received the end of stream response
// TODO: Fix the response code when it's available
final SubscribeStreamResponse endStreamResponse =
SubscribeStreamResponse.newBuilder().setStatus(2).build();
SubscribeStreamResponse.newBuilder()
.setStatus(
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver1, times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(endStreamResponse);
Expand All @@ -459,6 +476,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
assertFalse(streamMediator.isSubscribed(s));
}

// Verify the publishBlockStream service returned the expected
// error code indicating the service is not available.
final EndOfStream endOfStream =
EndOfStream.newBuilder()
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
Expand All @@ -472,6 +491,25 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(1);
assertTrue(blockOpt.isEmpty());

// Verify the singleBlock service returned the expected
// error code indicating the service is not available.
final SingleBlockResponse expectedSingleBlockNotAvailable =
SingleBlockResponse.newBuilder()
.setStatus(
SingleBlockResponse.SingleBlockResponseCode
.READ_BLOCK_NOT_AVAILABLE)
.build();
verify(singleBlockResponseStreamObserver, times(1)).onNext(expectedSingleBlockNotAvailable);

// TODO: Fix the response code when it's available
final SubscribeStreamResponse expectedSubscriberStreamNotAvailable =
SubscribeStreamResponse.newBuilder()
.setStatus(
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver4, times(1)).onNext(expectedSubscriberStreamNotAvailable);
}

private void removeRootPathWritePerms(final Config config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.hedera.block.server;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.BlockStreamService.buildNotAvailableResponse;
import static com.hedera.block.server.BlockStreamService.buildNotFoundResponse;
import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotAvailableResponse;
import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotFoundResponse;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -172,7 +172,7 @@ void testSingleBlockNotFoundPath() throws IOException {
when(blockPersistenceHandler.retrieve(1)).thenReturn(Optional.empty());

// Build a response to verify what's passed to the response observer
final SingleBlockResponse expectedNotFound = buildNotFoundResponse();
final SingleBlockResponse expectedNotFound = buildSingleBlockNotFoundResponse();

// Build a request to invoke the service
final SingleBlockRequest singleBlockRequest =
Expand Down Expand Up @@ -203,7 +203,7 @@ void testSingleBlockServiceNotAvailable() throws IOException {
// Set the service status to not running
when(serviceStatus.isRunning()).thenReturn(false);

final SingleBlockResponse expectedNotAvailable = buildNotAvailableResponse();
final SingleBlockResponse expectedNotAvailable = buildSingleBlockNotAvailableResponse();

// Build a request to invoke the service
final SingleBlockRequest singleBlockRequest =
Expand Down

0 comments on commit 92cbe23

Please sign in to comment.