diff --git a/.gitignore b/.gitignore
index f0f492d91..594c0f46a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,5 +47,7 @@ gradle-app.setting
# JDT-specific (Eclipse Java Development Tools)
.classpath
+.idea
+.DS_Store
# .env files
server/docker/.env
diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto
index 87ff477f7..c1d7a425d 100644
--- a/protos/src/main/protobuf/blockstream.proto
+++ b/protos/src/main/protobuf/blockstream.proto
@@ -46,6 +46,8 @@ service BlockStreamGrpc {
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
+
+ rpc GetBlock(Block) returns (Block) {}
}
/**
@@ -79,3 +81,16 @@ message BlockResponse {
*/
int64 id = 1;
}
+
+/**
+ * A block request is a simple message that contains an id.
+ * This specification is a simple example meant to expedite development.
+ * It will be replaced with a PBJ implementation in the future.
+ */
+message BlockRequest {
+ /**
+ * The id of the block which was requested. Each block id should
+ * correlate with the id of a Block message id.
+ */
+ int64 id = 1;
+}
diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java
index f301272df..7b40f4034 100644
--- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java
+++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java
@@ -16,33 +16,41 @@
package com.hedera.block.server;
+import static com.hedera.block.server.Constants.*;
+import static io.helidon.webserver.grpc.ResponseHelper.complete;
+
import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.mediator.StreamMediator;
+import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;
-
import java.time.Clock;
-
-import static com.hedera.block.server.Constants.*;
+import java.util.Optional;
/**
- * This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
- * It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
- * It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
+ * This class implements the GrpcService interface and provides the functionality for the
+ * BlockStreamService. It sets up the bidirectional streaming methods for the service and handles
+ * the routing for these methods. It also initializes the StreamMediator, BlockStorage, and
+ * BlockCache upon creation.
*
- *
The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
- * respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
+ *
The class provides two main methods, streamSink and streamSource, which handle the client and
+ * server streaming respectively. These methods return custom StreamObservers which are used to
+ * observe and respond to the streams.
*/
public class BlockStreamService implements GrpcService {
private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final long timeoutThresholdMillis;
- private final StreamMediator streamMediator;
+ private final StreamMediator<
+ BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse>
+ streamMediator;
+ private final BlockPersistenceHandler
+ blockPersistenceHandler;
/**
* Constructor for the BlockStreamService class.
@@ -50,11 +58,18 @@ public class BlockStreamService implements GrpcService {
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
- public BlockStreamService(final long timeoutThresholdMillis,
- final StreamMediator streamMediator) {
+ public BlockStreamService(
+ final long timeoutThresholdMillis,
+ final StreamMediator<
+ BlockStreamServiceGrpcProto.Block,
+ BlockStreamServiceGrpcProto.BlockResponse>
+ streamMediator,
+ final BlockPersistenceHandler
+ blockPersistenceHandler) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
+ this.blockPersistenceHandler = blockPersistenceHandler;
}
/**
@@ -68,8 +83,8 @@ public Descriptors.FileDescriptor proto() {
}
/**
- * Returns the service name for the BlockStreamService. This service name corresponds to the service name in
- * the proto file.
+ * Returns the service name for the BlockStreamService. This service name corresponds to the
+ * service name in the proto file.
*
* @return the service name corresponding to the service name in the proto file
*/
@@ -79,7 +94,8 @@ public String serviceName() {
}
/**
- * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
+ * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming
+ * methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
@@ -87,48 +103,74 @@ public String serviceName() {
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
+ routing.unary(GET_BLOCK_METHOD_NAME, this::getBlock);
}
/**
- * The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
- *
- * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
+ * The streamSink method is called by Helidon each time a producer initiates a bidirectional
+ * stream.
*
- * @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
- * via the streamMediator as well as sending responses back to the producer.
+ * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to
+ * the producer.
+ * @return a custom StreamObserver to handle streaming blocks from the producer to all
+ * subscribed consumer via the streamMediator as well as sending responses back to the
+ * producer.
*/
private StreamObserver streamSink(
- final StreamObserver responseStreamObserver) {
+ final StreamObserver
+ responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}
/**
- * The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
+ * The streamSource method is called by Helidon each time a consumer initiates a bidirectional
+ * stream.
*
- * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
- * back to the server.
- *
- * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
- * as handling responses from the consumer.
+ * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the
+ * consumer back to the server.
+ * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer
+ * as well as handling responses from the consumer.
*/
- private StreamObserver streamSource(final StreamObserver responseStreamObserver) {
+ private StreamObserver streamSource(
+ final StreamObserver responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
// Return a custom StreamObserver to handle streaming blocks from the producer.
- final LiveStreamObserver streamObserver = new LiveStreamObserverImpl(
- timeoutThresholdMillis,
- Clock.systemDefaultZone(),
- Clock.systemDefaultZone(),
- streamMediator,
- responseStreamObserver);
+ final LiveStreamObserver<
+ BlockStreamServiceGrpcProto.Block,
+ BlockStreamServiceGrpcProto.BlockResponse>
+ streamObserver =
+ new LiveStreamObserverImpl(
+ timeoutThresholdMillis,
+ Clock.systemDefaultZone(),
+ Clock.systemDefaultZone(),
+ streamMediator,
+ responseStreamObserver);
// Subscribe the observer to the mediator
streamMediator.subscribe(streamObserver);
return streamObserver;
}
-}
-
+ void getBlock(
+ BlockStreamServiceGrpcProto.Block block,
+ StreamObserver responseObserver) {
+ LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
+ final Optional responseBlock =
+ blockPersistenceHandler.read(block.getId());
+ if (responseBlock.isPresent()) {
+ LOGGER.log(System.Logger.Level.INFO, "Returning block with id: {0}", block.getId());
+ complete(responseObserver, responseBlock.get());
+ } else {
+ LOGGER.log(
+ System.Logger.Level.INFO,
+ "Did not find your block with id: {0}",
+ block.getId());
+ responseObserver.onNext(
+ BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build());
+ }
+ }
+}
diff --git a/server/src/main/java/com/hedera/block/server/Constants.java b/server/src/main/java/com/hedera/block/server/Constants.java
index 2651397b9..a48d3a4b7 100644
--- a/server/src/main/java/com/hedera/block/server/Constants.java
+++ b/server/src/main/java/com/hedera/block/server/Constants.java
@@ -16,18 +16,18 @@
package com.hedera.block.server;
-/**
- * Constants used in the BlockNode service.
- */
+/** Constants used in the BlockNode service. */
public final class Constants {
private Constants() {}
// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
- public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";
+ public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
+ "blocknode.server.consumer.timeout.threshold";
// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
+ public static final String GET_BLOCK_METHOD_NAME = "GetBlock";
}
diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java
index 08aebf39d..a14d2cb5f 100644
--- a/server/src/main/java/com/hedera/block/server/Server.java
+++ b/server/src/main/java/com/hedera/block/server/Server.java
@@ -16,6 +16,8 @@
package com.hedera.block.server;
+import static com.hedera.block.server.Constants.*;
+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
@@ -26,21 +28,22 @@
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
-import io.helidon.webserver.http.HttpRouting;
-
import java.io.IOException;
import java.util.stream.Stream;
-import static com.hedera.block.server.Constants.*;
-
-/**
- * Main class for the block node server
- */
+/** Main class for the block node server */
public class Server {
- // Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
- private static ServerCalls.BidiStreamingMethod, StreamObserver> clientBidiStreamingMethod;
- private static ServerCalls.BidiStreamingMethod, StreamObserver> serverBidiStreamingMethod;
+ // Function stubs to satisfy the bidi routing param signatures. The implementations are in the
+ // service class.
+ private static ServerCalls.BidiStreamingMethod<
+ Stream,
+ StreamObserver>
+ clientBidiStreamingMethod;
+ private static ServerCalls.BidiStreamingMethod<
+ Stream,
+ StreamObserver>
+ serverBidiStreamingMethod;
private static final System.Logger LOGGER = System.getLogger(Server.class.getName());
@@ -60,32 +63,52 @@ public static void main(final String[] args) {
Config.global(config);
// Get Timeout threshold from configuration
- final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
+ final long consumerTimeoutThreshold =
+ config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
+ .asLong()
+ .orElse(1500L);
// Initialize the block storage, cache, and service
- final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
- final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
- new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
+ final BlockStorage blockStorage =
+ new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
+
+ // Initialize blockStreamService with Live Stream and Cache
+ final BlockStreamService blockStreamService =
+ new BlockStreamService(
+ consumerTimeoutThreshold,
+ new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
+ new WriteThroughCacheHandler(blockStorage));
// Start the web server
WebServer.builder()
.port(8080)
- .addRouting(GrpcRouting.builder()
- .service(blockStreamService)
- .bidi(BlockStreamServiceGrpcProto.getDescriptor(),
- SERVICE_NAME,
- CLIENT_STREAMING_METHOD_NAME,
- clientBidiStreamingMethod)
- .bidi(BlockStreamServiceGrpcProto.getDescriptor(),
- SERVICE_NAME,
- SERVER_STREAMING_METHOD_NAME,
- serverBidiStreamingMethod))
+ .addRouting(
+ GrpcRouting.builder()
+ .service(blockStreamService)
+ .bidi(
+ BlockStreamServiceGrpcProto.getDescriptor(),
+ SERVICE_NAME,
+ CLIENT_STREAMING_METHOD_NAME,
+ clientBidiStreamingMethod)
+ .bidi(
+ BlockStreamServiceGrpcProto.getDescriptor(),
+ SERVICE_NAME,
+ SERVER_STREAMING_METHOD_NAME,
+ serverBidiStreamingMethod)
+ .unary(
+ BlockStreamServiceGrpcProto.getDescriptor(),
+ SERVICE_NAME,
+ GET_BLOCK_METHOD_NAME,
+ Server::grpcGetBlock))
.build()
.start();
-
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
throw new RuntimeException(e);
}
}
+
+ static void grpcGetBlock(
+ BlockStreamServiceGrpcProto.BlockRequest request,
+ StreamObserver responseObserver) {}
}
diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java
new file mode 100644
index 000000000..fffb58847
--- /dev/null
+++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.server;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import com.hedera.block.protos.BlockStreamServiceGrpcProto;
+import com.hedera.block.server.mediator.StreamMediator;
+import com.hedera.block.server.persistence.BlockPersistenceHandler;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class BlockStreamServiceTest {
+
+ private final long TIMEOUT_THRESHOLD_MILLIS = 52L;
+
+ @Mock private StreamObserver responseObserver;
+
+ @Mock
+ private BlockPersistenceHandler blockPersistenceHandler;
+
+ @Mock
+ private StreamMediator<
+ BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse>
+ streamMediator;
+
+ @Test
+ void getBlockHappyPath() {
+ BlockStreamServiceGrpcProto.Block block =
+ BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build();
+ BlockStreamService blockStreamService =
+ new BlockStreamService(
+ TIMEOUT_THRESHOLD_MILLIS, streamMediator, blockPersistenceHandler);
+ when(blockPersistenceHandler.read(1))
+ .thenReturn(
+ Optional.of(
+ BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build()));
+ blockStreamService.getBlock(block, responseObserver);
+ verify(responseObserver, times(1)).onNext(block);
+ }
+
+ @Test
+ void getBlockErrorPath() {
+ BlockStreamServiceGrpcProto.Block block =
+ BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build();
+ BlockStreamService blockStreamService =
+ new BlockStreamService(
+ TIMEOUT_THRESHOLD_MILLIS, streamMediator, blockPersistenceHandler);
+ when(blockPersistenceHandler.read(1)).thenReturn(Optional.empty());
+ blockStreamService.getBlock(block, responseObserver);
+ verify(responseObserver, times(1))
+ .onNext(BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build());
+ }
+}