Skip to content

Commit

Permalink
refactor: rebased the GetBlock unary method and refactored it to work…
Browse files Browse the repository at this point in the history
… with the new types

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 30, 2024
1 parent af6496f commit 67a942e
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 87 deletions.
20 changes: 19 additions & 1 deletion protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ service BlockStreamGrpcService {

rpc subscribeBlockStream (SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}

rpc GetBlock(Block) returns (Block) {}
rpc singleBlock(SingleBlockRequest) returns (SingleBlockResponse) {}
}

message PublishStreamRequest {
Expand Down Expand Up @@ -134,3 +134,21 @@ message BlockProof {
uint64 block = 1;
}

message SingleBlockRequest {
uint64 block_number = 1;
}

message SingleBlockResponse {
oneof response {
SingleBlockResponseCode status = 1;
Block block = 2;
}

enum SingleBlockResponseCode {
READ_BLOCK_UNKNOWN = 0;
READ_BLOCK_INSUFFICIENT_BALANCE = 1;
READ_BLOCK_SUCCESS = 2;
READ_BLOCK_NOT_FOUND = 3;
READ_BLOCK_NOT_AVAILABLE = 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.producer.ItemAckBuilder;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcService;
import java.io.IOException;
import java.time.Clock;
import java.util.Optional;

/**
* This class implements the GrpcService interface and provides the functionality for the
Expand All @@ -48,6 +51,20 @@ public class BlockStreamService implements GrpcService {
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ServiceStatus serviceStatus;
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;

BlockStreamService(
final long timeoutThresholdMillis,
final ItemAckBuilder itemAckBuilder,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
final ServiceStatus serviceStatus) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.blockPersistenceHandler = blockPersistenceHandler;
this.serviceStatus = serviceStatus;
}

/**
* Constructor for the BlockStreamService class.
Expand All @@ -57,11 +74,14 @@ public class BlockStreamService implements GrpcService {
public BlockStreamService(
final long timeoutThresholdMillis,
final ItemAckBuilder itemAckBuilder,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.serviceStatus = new ServiceStatusImpl();
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler) {
this(
timeoutThresholdMillis,
itemAckBuilder,
streamMediator,
blockPersistenceHandler,
new ServiceStatusImpl());
}

/**
Expand Down Expand Up @@ -95,7 +115,7 @@ public String serviceName() {
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream);
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
routing.unary(GET_BLOCK_METHOD_NAME, this::getBlock);
routing.unary(SINGLE_BLOCK_METHOD_NAME, this::singleBlock);

Check warning on line 118 in server/src/main/java/com/hedera/block/server/BlockStreamService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/BlockStreamService.java#L116-L118

Added lines #L116 - L118 were not covered by tests
}

public void register(final WebServer webServer) {
Expand All @@ -117,7 +137,7 @@ void subscribeBlockStream(
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
LOGGER.log(
System.Logger.Level.DEBUG,
"Executing Server Streaming subscribeBlockStream gRPC method");
"Executing Server Streaming subscribeBlockStream gRPC Service");

// Return a custom StreamObserver to handle streaming blocks from the producer.
if (serviceStatus.isRunning()) {
Expand All @@ -129,25 +149,60 @@ void subscribeBlockStream(
subscribeStreamResponseObserver);

streamMediator.subscribe(streamObserver);
} else {
LOGGER.log(

Check warning on line 153 in server/src/main/java/com/hedera/block/server/BlockStreamService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/BlockStreamService.java#L153

Added line #L153 was not covered by tests
System.Logger.Level.ERROR,
"Server Streaming subscribeBlockStream gRPC Service is not currently running");
}
}

void getBlock(
BlockStreamServiceGrpcProto.Block block,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
final Optional<BlockStreamServiceGrpcProto.Block> responseBlock =
blockPersistenceHandler.read(block.getId());
if (responseBlock.isPresent()) {
LOGGER.log(System.Logger.Level.INFO, "Returning block with id: {0}", block.getId());
complete(responseObserver, responseBlock.get());
void singleBlock(
final SingleBlockRequest singleBlockRequest,
final StreamObserver<SingleBlockResponse> singleBlockResponseStreamObserver) {

LOGGER.log(System.Logger.Level.DEBUG, "Executing Unary singleBlock gRPC method");

if (serviceStatus.isRunning()) {
final long blockNumber = singleBlockRequest.getBlockNumber();
try {
final Optional<Block> blockOpt =
blockPersistenceHandler.retrieve(singleBlockRequest.getBlockNumber());
if (blockOpt.isPresent()) {
LOGGER.log(
System.Logger.Level.INFO,
"Successfully returning block number: {0}",
blockNumber);
singleBlockResponseStreamObserver.onNext(buildBlockResponse(blockOpt.get()));
} else {
LOGGER.log(System.Logger.Level.INFO, "Block number {0} not found", blockNumber);
singleBlockResponseStreamObserver.onNext(buildNotFoundResponse());
}
} catch (IOException e) {
LOGGER.log(
System.Logger.Level.ERROR, "Error reading block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse());

Check warning on line 183 in server/src/main/java/com/hedera/block/server/BlockStreamService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/BlockStreamService.java#L180-L183

Added lines #L180 - L183 were not covered by tests
}
} else {
LOGGER.log(
System.Logger.Level.INFO,
"Did not find your block with id: {0}",
block.getId());
responseObserver.onNext(
BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build());
System.Logger.Level.ERROR,
"Unary singleBlock gRPC method is not currently running");
singleBlockResponseStreamObserver.onNext(buildNotAvailableResponse());
}
}

static SingleBlockResponse buildBlockResponse(final Block block) {
return SingleBlockResponse.newBuilder().setBlock(block).build();
}

static SingleBlockResponse buildNotAvailableResponse() {
return SingleBlockResponse.newBuilder()
.setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE)
.build();
}

static SingleBlockResponse buildNotFoundResponse() {
return SingleBlockResponse.newBuilder()
.setStatus(SingleBlockResponse.SingleBlockResponseCode.READ_BLOCK_NOT_FOUND)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private Constants() {}
public static final String SERVICE_NAME = "BlockStreamGrpcService";
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";
public static final String GET_BLOCK_METHOD_NAME = "GetBlock";
public static final String SINGLE_BLOCK_METHOD_NAME = "singleBlock";

// Constants used when interacting with the file system.
public static final String BLOCK_FILE_EXTENSION = ".blk";
Expand Down
54 changes: 32 additions & 22 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
import com.hedera.block.server.persistence.storage.*;
import com.hedera.block.server.persistence.storage.BlockAsDirReader;
import com.hedera.block.server.persistence.storage.BlockAsDirWriter;
import com.hedera.block.server.producer.ItemAckBuilder;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
Expand All @@ -46,6 +48,9 @@ public class Server {
SubscribeStreamRequest, StreamObserver<SubscribeStreamResponse>>
serverStreamingMethod;

static ServerCalls.UnaryMethod<SingleBlockRequest, StreamObserver<SingleBlockResponse>>
singleBlock;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

private Server() {}
Expand All @@ -57,22 +62,28 @@ private Server() {}
*/
public static void main(final String[] args) {

LOGGER.log(System.Logger.Level.INFO, "Starting BlockNode Server");

Check warning on line 65 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L65

Added line #L65 was not covered by tests

// Set the global configuration
final Config config = Config.create();
Config.global(config);

Check warning on line 69 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L68-L69

Added lines #L68 - L69 were not covered by tests

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
.asLong()
.orElse(1500L);
// final long consumerTimeoutThreshold =
// config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
// .asLong()
// .orElse(1500L);

try {
final ServiceStatus serviceStatus = new ServiceStatusImpl();
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler =

Check warning on line 79 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L78-L79

Added lines #L78 - L79 were not covered by tests
new FileSystemPersistenceHandler(
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config));
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
buildStreamMediator(config, serviceStatus);
buildStreamMediator(serviceStatus, blockPersistenceHandler);

Check warning on line 84 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L83-L84

Added lines #L83 - L84 were not covered by tests
final BlockStreamService blockStreamService =
buildBlockStreamService(config, streamMediator);
buildBlockStreamService(config, streamMediator, blockPersistenceHandler);
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService);

Check warning on line 87 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L86-L87

Added lines #L86 - L87 were not covered by tests

// Build the web server
Expand All @@ -92,14 +103,12 @@ public static void main(final String[] args) {
}

private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
buildStreamMediator(final Config config, final ServiceStatus serviceStatus)
buildStreamMediator(
final ServiceStatus serviceStatus,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler)
throws IOException {
return new LiveStreamMediatorImpl(

Check warning on line 110 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L110

Added line #L110 was not covered by tests
new ConcurrentHashMap<>(32),
new FileSystemPersistenceHandler(
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)),
serviceStatus);
new ConcurrentHashMap<>(32), blockPersistenceHandler, serviceStatus);
}

private static GrpcRouting.Builder buildGrpcRouting(
Expand All @@ -118,24 +127,25 @@ private static GrpcRouting.Builder buildGrpcRouting(
SERVER_STREAMING_METHOD_NAME,
serverStreamingMethod)
.unary(
BlockStreamServiceGrpcProto.getDescriptor(),
com.hedera.block.protos.BlockStreamService.getDescriptor(),

Check warning on line 130 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L129-L130

Added lines #L129 - L130 were not covered by tests
SERVICE_NAME,
GET_BLOCK_METHOD_NAME,
Server::grpcGetBlock);
SINGLE_BLOCK_METHOD_NAME,
singleBlock);
}

private static BlockStreamService buildBlockStreamService(
final Config config,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler) {

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

Check warning on line 143 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L142-L143

Added lines #L142 - L143 were not covered by tests

return new BlockStreamService(

Check warning on line 145 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L145

Added line #L145 was not covered by tests
consumerTimeoutThreshold, new ItemAckBuilder(), streamMediator);
consumerTimeoutThreshold,
new ItemAckBuilder(),
streamMediator,
blockPersistenceHandler);
}

static void grpcGetBlock(
BlockStreamServiceGrpcProto.BlockRequest request,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
import com.hedera.block.server.persistence.storage.*;
import com.hedera.block.server.producer.ItemAckBuilder;
Expand Down Expand Up @@ -57,6 +60,7 @@ public class BlockStreamServiceIT {
private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final Object lock = new Object();

@Mock private BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
@Mock private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

@Mock private StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
Expand Down Expand Up @@ -103,7 +107,8 @@ public void testPublishBlockStreamRegistrationAndExecution()
throws InterruptedException, IOException, NoSuchAlgorithmException {

final BlockStreamService blockStreamService =
new BlockStreamService(50L, new ItemAckBuilder(), streamMediator);
new BlockStreamService(
50L, new ItemAckBuilder(), streamMediator, blockPersistenceHandler);
blockStreamService.register(webServer);

final StreamObserver<PublishStreamRequest> streamObserver =
Expand Down Expand Up @@ -156,7 +161,8 @@ public void testSubscribeBlockStream() throws InterruptedException {

// Build the BlockStreamService
final BlockStreamService blockStreamService =
new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator);
new BlockStreamService(
1000L, new ItemAckBuilder(), streamMediator, blockPersistenceHandler);

// Subscribe the consumers
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
Expand Down Expand Up @@ -304,7 +310,8 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
subscribers = new LinkedHashMap<>();
final var streamMediator = buildStreamMediator(subscribers, Util.defaultPerms);
final var blockStreamService = buildBlockStreamService(streamMediator);
final var blockStreamService =
buildBlockStreamService(streamMediator, blockPersistenceHandler);

// Pass a StreamObserver to the producer as Helidon does
final StreamObserver<PublishStreamRequest> streamObserver =
Expand Down Expand Up @@ -395,7 +402,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
// to write the BlockItem or fix the permissions causing the BlockReader to
// throw an IOException.
final var streamMediator = buildStreamMediator(subscribers, TestUtils.getNoPerms());
final var blockStreamService = buildBlockStreamService(streamMediator);
final var blockStreamService =
buildBlockStreamService(streamMediator, blockPersistenceHandler);

// Register the web server to confirm
// the server is stopped when an exception occurs
Expand Down Expand Up @@ -542,11 +550,13 @@ private BlockStreamService buildBlockStreamService() throws IOException {
final var streamMediator = buildStreamMediator();

// Build the BlockStreamService
return buildBlockStreamService(streamMediator);
return buildBlockStreamService(streamMediator, blockPersistenceHandler);
}

private BlockStreamService buildBlockStreamService(
StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
return new BlockStreamService(2000, new ItemAckBuilder(), streamMediator);
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler) {
return new BlockStreamService(
2000, new ItemAckBuilder(), streamMediator, blockPersistenceHandler);
}
}
Loading

0 comments on commit 67a942e

Please sign in to comment.