diff --git a/.gitignore b/.gitignore index 756cde067..cfc4ee374 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,5 @@ gradle-app.setting .project # JDT-specific (Eclipse Java Development Tools) -.classpath \ No newline at end of file +.classpath + diff --git a/README.md b/README.md index e536a704e..c6cfc2528 100644 --- a/README.md +++ b/README.md @@ -28,3 +28,13 @@ to [oss@hedera.com](mailto:oss@hedera.com). Please do not file a public ticket mentioning the vulnerability. Refer to the security policy defined in the [SECURITY.md](https://github.com/hashgraph/hedera-sourcify/blob/main/SECURITY.md). +--- + +# Running Locally + +1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory +2) export BLOCKNODE_STORAGE_ROOT_PATH= # You can add this to your .zshrc, etc +3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode + +# Running Tests +1) ./gradlew build \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts index aca3f89df..fc612ae20 100644 --- a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts +++ b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts @@ -87,4 +87,8 @@ extraJavaModuleInfo { } module("io.grpc:grpc-util", "io.grpc.util") module("io.perfmark:perfmark-api", "io.perfmark") -} \ No newline at end of file + + module("junit:junit", "junit") + module("org.mockito:mockito-core", "org.mockito") + module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter") +} diff --git a/gradle/modules.properties b/gradle/modules.properties index 37160ba74..f95b58a4f 100644 --- a/gradle/modules.properties +++ b/gradle/modules.properties @@ -3,3 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5 io.grpc=io.grpc:grpc-stub +grpc.protobuf=io.grpc:grpc-protobuf diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto new file mode 100644 index 000000000..87ff477f7 --- /dev/null +++ b/protos/src/main/protobuf/blockstream.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; + +/*- + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "com.hedera.block.protos"; +option java_outer_classname = "BlockStreamServiceGrpcProto"; + +/** + * The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for + * exchanging blocks with the Block Node server. + * + * A producer (e.g. Consensus Node) can use the StreamSink method to stream blocks to the + * Block Node server. The Block Node server will respond with a BlockResponse message for + * each block received. + * + * A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of + * blocks from the server. The consumer is expected to respond with a BlockResponse message + * with the id of each block received. + */ +service BlockStreamGrpc { + + /** + * StreamSink is a bidirectional streaming method that allows a producer to stream blocks + * to the Block Node server. The server will respond with a BlockResponse message for each + * block received. + */ + rpc StreamSink(stream Block) returns (stream BlockResponse) {} + + /** + * StreamSource is a bidirectional streaming method that allows a consumer to request a + * stream of blocks from the server. The consumer is expected to respond with a BlockResponse + * message with the id of each block received. + */ + rpc StreamSource(stream BlockResponse) returns (stream Block) {} +} + +/** + * A block is a simple message that contains an id and a value. + * This specification is a simple example meant to expedite development. + * It will be replaced with a PBJ implementation in the future. + */ +message Block { + /** + * The id of the block. Each block id should be unique. + */ + int64 id = 1; + + /** + * The value of the block. The value can be any string. + */ + string value = 2; +} + +/** + * A block response is a simple message that contains an id. + * The block response message is simply meant to disambiguate it + * from the original request. This specification is a simple + * example meant to expedite development. It will be replaced with + * a PBJ implementation in the future. + */ +message BlockResponse { + /** + * The id of the block which was received. Each block id should + * correlate with the id of a Block message id. + */ + int64 id = 1; +} diff --git a/protos/src/main/protobuf/echo.proto b/protos/src/main/protobuf/echo.proto deleted file mode 100644 index 273a5e36f..000000000 --- a/protos/src/main/protobuf/echo.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; -option java_package = "com.hedera.block.protos"; -option java_outer_classname = "EchoServiceGrpcProto"; - -service EchoService { - rpc Echo (EchoRequest) returns (EchoResponse) {} -} - -message EchoRequest { - string message = 1; -} - -message EchoResponse { - string message = 1; -} diff --git a/server/build.gradle.kts b/server/build.gradle.kts index f93c135d6..a09bef8ac 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -25,8 +25,7 @@ application { } testModuleInfo { - // requires("org.assertj.core") - // requires("net.bytebuddy") - // requires("org.junit.jupiter.api") - // requires("org.junit.jupiter.params") + requires("org.junit.jupiter.api") + requires("org.mockito") + requires("org.mockito.junit.jupiter") } diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java new file mode 100644 index 000000000..f301272df --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server; + +import com.google.protobuf.Descriptors; +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.consumer.LiveStreamObserver; +import com.hedera.block.server.consumer.LiveStreamObserverImpl; +import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.producer.ProducerBlockStreamObserver; +import io.grpc.stub.StreamObserver; +import io.helidon.webserver.grpc.GrpcService; + +import java.time.Clock; + +import static com.hedera.block.server.Constants.*; + +/** + * This class implements the GrpcService interface and provides the functionality for the BlockStreamService. + * It sets up the bidirectional streaming methods for the service and handles the routing for these methods. + * It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation. + * + *

The class provides two main methods, streamSink and streamSource, which handle the client and server streaming + * respectively. These methods return custom StreamObservers which are used to observe and respond to the streams. + */ +public class BlockStreamService implements GrpcService { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final long timeoutThresholdMillis; + private final StreamMediator streamMediator; + + /** + * Constructor for the BlockStreamService class. + * + * @param timeoutThresholdMillis the timeout threshold in milliseconds + * @param streamMediator the stream mediator + */ + public BlockStreamService(final long timeoutThresholdMillis, + final StreamMediator streamMediator) { + + this.timeoutThresholdMillis = timeoutThresholdMillis; + this.streamMediator = streamMediator; + } + + /** + * Returns the FileDescriptor for the BlockStreamServiceGrpcProto. + * + * @return the FileDescriptor for the BlockStreamServiceGrpcProto + */ + @Override + public Descriptors.FileDescriptor proto() { + return BlockStreamServiceGrpcProto.getDescriptor(); + } + + /** + * Returns the service name for the BlockStreamService. This service name corresponds to the service name in + * the proto file. + * + * @return the service name corresponding to the service name in the proto file + */ + @Override + public String serviceName() { + return SERVICE_NAME; + } + + /** + * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service. + * + * @param routing the routing for the BlockStreamService + */ + @Override + public void update(final Routing routing) { + routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink); + routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource); + } + + /** + * The streamSink method is called by Helidon each time a producer initiates a bidirectional stream. + * + * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer. + * + * @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer + * via the streamMediator as well as sending responses back to the producer. + */ + private StreamObserver streamSink( + final StreamObserver responseStreamObserver) { + LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method"); + + return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver); + } + + /** + * The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream. + * + * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer + * back to the server. + * + * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well + * as handling responses from the consumer. + */ + private StreamObserver streamSource(final StreamObserver responseStreamObserver) { + LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method"); + + // Return a custom StreamObserver to handle streaming blocks from the producer. + final LiveStreamObserver streamObserver = new LiveStreamObserverImpl( + timeoutThresholdMillis, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); + + // Subscribe the observer to the mediator + streamMediator.subscribe(streamObserver); + + return streamObserver; + } +} + + diff --git a/server/src/main/java/com/hedera/block/server/Constants.java b/server/src/main/java/com/hedera/block/server/Constants.java new file mode 100644 index 000000000..927caa4ee --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/Constants.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server; + +/** + * Constants used in the BlockNode service. + */ +public final class Constants { + private Constants() {} + + // Config Constants + public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path"; + + // Constants specified in the service definition of the .proto file + public static final String SERVICE_NAME = "BlockStreamGrpc"; + public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink"; + public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource"; +} diff --git a/server/src/main/java/com/hedera/block/server/EchoService.java b/server/src/main/java/com/hedera/block/server/EchoService.java deleted file mode 100644 index 598b2e9fb..000000000 --- a/server/src/main/java/com/hedera/block/server/EchoService.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.hedera.block.server; - -import com.google.protobuf.Descriptors; -import com.hedera.block.protos.EchoServiceGrpcProto; -import io.grpc.stub.StreamObserver; -import io.helidon.webserver.grpc.GrpcService; - -import java.util.logging.Logger; - -import static io.helidon.webserver.grpc.ResponseHelper.complete; - -public class EchoService implements GrpcService { - - Logger logger = Logger.getLogger(EchoService.class.getName()); - - @Override - public Descriptors.FileDescriptor proto() { - return EchoServiceGrpcProto.getDescriptor(); - } - - @Override - public void update(Routing routing) { - routing.unary("Echo", this::echo); - } - - /** - * Echo the message back to the caller. - * - * @param request the echo request containing the message to echo - * @param observer the response observer - */ - public void echo(EchoServiceGrpcProto.EchoRequest request, StreamObserver observer) { - String message = request.getMessage(); - logger.info("EchoService grpc request: " + message); - EchoServiceGrpcProto.EchoResponse response = EchoServiceGrpcProto.EchoResponse.newBuilder().setMessage(message).build(); - complete(observer, response); - } -} diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index c17a5cd15..fbf4d7082 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -1,38 +1,90 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.block.server; -import com.hedera.block.protos.EchoServiceGrpcProto; +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.LiveStreamMediatorImpl; +import com.hedera.block.server.persistence.WriteThroughCacheHandler; +import com.hedera.block.server.persistence.storage.BlockStorage; +import com.hedera.block.server.persistence.storage.FileSystemBlockStorage; +import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; +import io.helidon.config.Config; import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcRouting; import io.helidon.webserver.http.HttpRouting; +import java.io.IOException; +import java.util.stream.Stream; + +import static com.hedera.block.server.Constants.*; + /** * Main class for the block node server */ public class Server { - private Server() { - // Not meant to be instantiated - } + + // Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class. + private static ServerCalls.BidiStreamingMethod, StreamObserver> clientBidiStreamingMethod; + private static ServerCalls.BidiStreamingMethod, StreamObserver> serverBidiStreamingMethod; + + private static final System.Logger LOGGER = System.getLogger(Server.class.getName()); + + private Server() {} /** * Main entrypoint for the block node server * * @param args Command line arguments. Not used at present, */ - public static void main(String[] args) { - WebServer.builder() - .port(8080) - .addRouting(HttpRouting.builder() - .get("/greet", (req, res) -> res.send("Hello World!"))) - .addRouting(GrpcRouting.builder() - .service(new EchoService()) - .unary(EchoServiceGrpcProto.getDescriptor(), - "EchoService", - "Echo", - Server::grpcEcho)) - .build() - .start(); - } + public static void main(final String[] args) { + + try { + + // Set the global configuration + final Config config = Config.create(); + Config.global(config); + + // Initialize the block storage, cache, and service + final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - static void grpcEcho(EchoServiceGrpcProto.EchoRequest request, StreamObserver responseObserver) {} + // TODO: Make timeoutThresholdMillis configurable + final BlockStreamService blockStreamService = new BlockStreamService(1500, + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage))); + + // Start the web server + WebServer.builder() + .port(8080) + .addRouting(GrpcRouting.builder() + .service(blockStreamService) + .bidi(BlockStreamServiceGrpcProto.getDescriptor(), + SERVICE_NAME, + CLIENT_STREAMING_METHOD_NAME, + clientBidiStreamingMethod) + .bidi(BlockStreamServiceGrpcProto.getDescriptor(), + SERVICE_NAME, + SERVER_STREAMING_METHOD_NAME, + serverBidiStreamingMethod)) + .build() + .start(); + + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e); + throw new RuntimeException(e); + } + } } diff --git a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserver.java b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserver.java new file mode 100644 index 000000000..135ecb674 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserver.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.consumer; + +import io.grpc.stub.StreamObserver; + +/** + * The LiveStreamObserver interface augments the StreamObserver interface with the notify() method thereby + * allowing a caller to pass a block to the observer of a different type than the StreamObserver. In this way, + * the implementation of this interface can receive and process inbound messages with different types from + * the producer and response messages from the consumer. + * + * @param the type of the block + * @param the type of the StreamObserver + */ +public interface LiveStreamObserver extends StreamObserver { + + /** + * Pass the block to the observer. + * + * @param block - the block to be passed to the observer + */ + void notify(final U block); +} diff --git a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java new file mode 100644 index 000000000..7a0d7db7b --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.consumer; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.StreamMediator; +import io.grpc.stub.StreamObserver; + +import java.time.Clock; +import java.time.InstantSource; + +/** + * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer + * via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods. + */ +public class LiveStreamObserverImpl implements LiveStreamObserver { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final StreamMediator mediator; + private final StreamObserver responseStreamObserver; + + private final long timeoutThresholdMillis; + + private final InstantSource producerLivenessClock; + private long producerLivenessMillis; + + private final InstantSource consumerLivenessClock; + private long consumerLivenessMillis; + + /** + * Constructor for the LiveStreamObserverImpl class. + * + * @param mediator the mediator + * @param responseStreamObserver the response stream observer + */ + public LiveStreamObserverImpl( + final long timeoutThresholdMillis, + final InstantSource producerLivenessClock, + final InstantSource consumerLivenessClock, + final StreamMediator mediator, + final StreamObserver responseStreamObserver) { + + this.timeoutThresholdMillis = timeoutThresholdMillis; + this.producerLivenessClock = producerLivenessClock; + this.consumerLivenessClock = consumerLivenessClock; + this.mediator = mediator; + this.responseStreamObserver = responseStreamObserver; + + this.producerLivenessMillis = producerLivenessClock.millis(); + this.consumerLivenessMillis = consumerLivenessClock.millis(); + } + + /** + * Pass the block to the observer provided by Helidon + * + * @param block the block to be passed to the observer + */ + @Override + public void notify(final BlockStreamServiceGrpcProto.Block block) { + + // Check if the consumer has timed out. If so, unsubscribe the observer from the mediator. + if (consumerLivenessClock.millis() - consumerLivenessMillis > timeoutThresholdMillis) { + if (mediator.isSubscribed(this)) { + LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer."); + mediator.unsubscribe(this); + } + } else { + // Refresh the producer liveness and pass the block to the observer. + producerLivenessMillis = producerLivenessClock.millis(); + responseStreamObserver.onNext(block); + } + } + + /** + * The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream. + * + * @param blockResponse the BlockResponse passed back to the server via the bidirectional stream to the downstream consumer. + */ + @Override + public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) { + + if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) { + LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer."); + mediator.unsubscribe(this); + } else { + LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse); + consumerLivenessMillis = consumerLivenessClock.millis(); + } + } + + /** + * The onError() method is triggered by Helidon when an error occurs on the bidirectional stream to the downstream consumer. + * Unsubscribe the observer from the mediator. + * + * @param t the error occurred on the stream + */ + @Override + public void onError(final Throwable t) { + LOGGER.log(System.Logger.Level.ERROR, "Unexpected consumer stream communication failure: %s".formatted(t), t); + mediator.unsubscribe(this); + } + + /** + * The onCompleted() method is triggered by Helidon when the bidirectional stream to the downstream consumer is completed. + * This implementation will then unsubscribe the observer from the mediator. + */ + @Override + public void onCompleted() { + LOGGER.log(System.Logger.Level.DEBUG, "gRPC connection completed. Unsubscribing observer."); + mediator.unsubscribe(this); + LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed observer."); + } +} diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java new file mode 100644 index 000000000..c5ca1fe13 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.consumer.LiveStreamObserver; +import com.hedera.block.server.persistence.BlockPersistenceHandler; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible for + * managing the subscribe and unsubscribe operations of downstream consumers. It also proxies live + * blocks to the subscribers as they arrive and persists the blocks to the block persistence store. + */ +public class LiveStreamMediatorImpl implements StreamMediator { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + private final Set> subscribers = Collections.synchronizedSet(new LinkedHashSet<>()); + + private final BlockPersistenceHandler blockPersistenceHandler; + + /** + * Constructor for the LiveStreamMediatorImpl class. + * + * @param blockPersistenceHandler the block persistence handler + */ + public LiveStreamMediatorImpl(final BlockPersistenceHandler blockPersistenceHandler) { + this.blockPersistenceHandler = blockPersistenceHandler; + } + + /** + * Subscribe a new observer to the mediator + * + * @param liveStreamObserver the observer to be subscribed + */ + @Override + public void subscribe(final LiveStreamObserver liveStreamObserver) { + subscribers.add(liveStreamObserver); + } + + /** + * Unsubscribe an observer from the mediator + * + * @param liveStreamObserver the observer to be unsubscribed + */ + @Override + public void unsubscribe(final LiveStreamObserver liveStreamObserver) { + if (subscribers.remove(liveStreamObserver)) { + LOGGER.log(System.Logger.Level.DEBUG, "Successfully removed observer from subscription list"); + } + } + + /** + * Check if an observer is subscribed to the mediator + * + * @param observer the observer to be checked + * @return true if the observer is subscribed, false otherwise + */ + @Override + public boolean isSubscribed(final LiveStreamObserver observer) { + return subscribers.contains(observer); + } + + /** + * Notify all observers of a new block + * + * @param block the block to be notified to all observers + */ + @Override + public void notifyAll(final BlockStreamServiceGrpcProto.Block block) { + + LOGGER.log(System.Logger.Level.DEBUG, "Notifying " + subscribers.size() + " observers of a new block"); + + // Proxy the block to all live stream subscribers + for (final var subscriber : subscribers) { + subscriber.notify(block); + } + + // Persist the block + blockPersistenceHandler.persist(block); + } +} diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java new file mode 100644 index 000000000..07f448df7 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + +import com.hedera.block.server.consumer.LiveStreamObserver; + +/** + * The StreamMediator interface represents a one-to-many bridge between a bidirectional stream of blocks from a + * producer (e.g. a Consensus Node) and N consumers each requesting a bidirectional connection to get + * a "live stream" of blocks from the producer. StreamMediator satisfies Helidon's type requirements for a + * bidirectional StreamObserver representing a stream of blocks returned FROM the downstream consuming client. + * However, the StreamObserver type may be distinct from Block type streamed TO the client. The type definition + * for the onNext() method provides the flexibility for the StreamObserver and the Block types to vary independently. + * + * @param The type of the block + * @param The type of the StreamObserver + */ +public interface StreamMediator { + + /** + * Subscribes a new LiveStreamObserver to receive blocks from the producer as they arrive + * + * @param observer the LiveStreamObserver to subscribe + */ + void subscribe(final LiveStreamObserver observer); + + /** + * Unsubscribes a LiveStreamObserver from the producer + * + * @param observer the LiveStreamObserver to unsubscribe + */ + void unsubscribe(final LiveStreamObserver observer); + + /** + * Checks if the observer is subscribed to the producer + * + * @param observer the LiveStreamObserver to check + * @return true if the observer is subscribed, false otherwise + */ + boolean isSubscribed(final LiveStreamObserver observer); + + /** + * Passes the newly arrived block to all subscribers + * + * @param block the block to pass to the subscribers + */ + void notifyAll(final U block); +} diff --git a/server/src/main/java/com/hedera/block/server/package-info.java b/server/src/main/java/com/hedera/block/server/package-info.java deleted file mode 100644 index 344b1749a..000000000 --- a/server/src/main/java/com/hedera/block/server/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package com.hedera.block.server; diff --git a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java new file mode 100644 index 000000000..fd228b145 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import java.util.Optional; +import java.util.Queue; + +/** + * The BlockPersistenceHandler interface defines operations to persist and read blocks. + * The interface is used to abstract underlying storage mechanisms. + * + * @param the type of block to persist + */ +public interface BlockPersistenceHandler { + + /** + * Persists a block. + * + * @param block the block to persist + * @return the id of the block + */ + Long persist(final V block); + + /** + * Reads a block. + * + * @param id the id of the block to read + * @return an Optional of the block + */ + Optional read(final long id); + + /** + * Reads a range of blocks. + * + * @param startBlockId the id of the first block to read + * @param endBlockId the id of the last block to read + * @return a queue of blocks + */ + Queue readRange(final long startBlockId, final long endBlockId); +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java new file mode 100644 index 000000000..277e5cb2d --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.persistence.storage.BlockStorage; + +import java.util.ArrayDeque; +import java.util.LinkedList; +import java.util.Optional; +import java.util.Queue; + +/** + * Write-Through cache handler coordinates between the block storage and the block cache to ensure the block + * is persisted to the storage before being cached. + */ +public class WriteThroughCacheHandler implements BlockPersistenceHandler { + + private final BlockStorage blockStorage; + + /** + * Constructor for the WriteThroughCacheHandler class. + * + * @param blockStorage the block storage + */ + public WriteThroughCacheHandler(final BlockStorage blockStorage) { + this.blockStorage = blockStorage; + } + + /** + * Persists the block to the block storage and cache the block. + * + * @param block the block to persist + * @return the block id + */ + @Override + public Long persist(final BlockStreamServiceGrpcProto.Block block) { + + // Write-Through cache + blockStorage.write(block); + return block.getId(); + } + + /** + * Reads a range of blocks from the block storage and cache. + * + * @param startBlockId the start block id + * @param endBlockId the end block id + * @return a queue of blocks + */ + @Override + public Queue readRange(final long startBlockId, final long endBlockId) { + final Queue blocks = new LinkedList<>(); + + long count = startBlockId; + Optional blockOpt = read(count); + while (count <= endBlockId && blockOpt.isPresent()) { + final BlockStreamServiceGrpcProto.Block block = blockOpt.get(); + blocks.add(block); + blockOpt = read(++count); + } + + return blocks; + } + + /** + * The read method first checks the cache for the block. + * If the block is not in cache, then it reads from storage and + * updates the cache. + * + * @param id the block id + * @return an Optional with the block + */ + @Override + public Optional read(final long id) { + return blockStorage.read(id); + } +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java new file mode 100644 index 000000000..7f42807d7 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import java.util.Optional; + +/** + * The BlockStorage interface defines operations to write and read blocks to a persistent store. + * + * @param the type of block to store + */ +public interface BlockStorage { + + /** + * Writes a block to storage. + * + * @param block the block to write + * @return the id of the block + */ + Optional write(final V block); + + /** + * Reads a block from storage. + * + * @param blockId the id of the block to read + * @return the block + */ + Optional read(final Long blockId); +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java new file mode 100644 index 000000000..821ea4e92 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import io.helidon.config.Config; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY; + +/** + * The FileSystemBlockStorage class implements the BlockStorage interface to store blocks to the filesystem. + */ +public class FileSystemBlockStorage implements BlockStorage { + + public static final String BLOCK_FILE_EXTENSION = ".blk"; + + private final Path blockNodeRootPath; + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + /** + * Constructs a FileSystemBlockStorage object. + * + * @param key the key to use to retrieve the block node root path from the configuration + * @param config the configuration + * @throws IOException if an I/O error occurs while initializing the block node root directory + */ + public FileSystemBlockStorage(final String key, final Config config) throws IOException { + + LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage"); + LOGGER.log(System.Logger.Level.INFO, config.toString()); + + blockNodeRootPath = Path.of(config + .get(key) + .asString() + .get()); + + LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath); + + if (!blockNodeRootPath.isAbsolute()) { + throw new IllegalArgumentException(BLOCKNODE_STORAGE_ROOT_PATH_KEY+ " must be an absolute path"); + } + + // Initialize the block node root directory if it does not exist + if (Files.notExists(blockNodeRootPath)) { + Files.createDirectory(blockNodeRootPath); + LOGGER.log(System.Logger.Level.INFO, "Created block node root directory: " + blockNodeRootPath); + } else { + LOGGER.log(System.Logger.Level.INFO, "Using existing block node root directory: " + blockNodeRootPath); + } + } + + /** + * Writes a block to the filesystem. + * + * @param block the block to write + * @return the id of the block + */ + @Override + public Optional write(final BlockStreamServiceGrpcProto.Block block) { + Long id = block.getId(); + final String fullPath = resolvePath(id); + + try (FileOutputStream fos = new FileOutputStream(fullPath)) { + block.writeTo(fos); + LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath); + + return Optional.of(id); + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); + return Optional.empty(); + } + } + + /** + * Reads a block from the filesystem. + * + * @param id the id of the block to read + * @return the block + */ + @Override + public Optional read(final Long id) { + return read(resolvePath(id)); + } + + private Optional read(final String filePath) { + + try (FileInputStream fis = new FileInputStream(filePath)) { + return Optional.of(BlockStreamServiceGrpcProto.Block.parseFrom(fis)); + } catch (FileNotFoundException io) { + LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io); + return Optional.empty(); + } catch (IOException io) { + throw new RuntimeException("Error reading file: " + filePath, io); + } + } + + private String resolvePath(final Long id) { + + String fileName = id + BLOCK_FILE_EXTENSION; + Path fullPath = blockNodeRootPath.resolve(fileName); + LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath); + + return fullPath.toString(); + } +} diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockStreamObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockStreamObserver.java new file mode 100644 index 000000000..5691cc24e --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockStreamObserver.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.producer; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.StreamMediator; +import io.grpc.stub.StreamObserver; + +/** + * The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional + * gRPC service implementation. Helidon calls methods on this class as networking events occur + * with the connection to the upstream producer (e.g. blocks streamed from the Consensus Node to + * the server). + */ +public class ProducerBlockStreamObserver implements StreamObserver { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final StreamMediator streamMediator; + private final StreamObserver responseStreamObserver; + + /** + * Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the mediator with blocks + * as they arrive from the upstream producer. It also sends responses back to the upstream producer via the + * responseStreamObserver. + * + * @param streamMediator the stream mediator + * @param responseStreamObserver the response stream observer + */ + public ProducerBlockStreamObserver(final StreamMediator streamMediator, + final StreamObserver responseStreamObserver) { + this.streamMediator = streamMediator; + this.responseStreamObserver = responseStreamObserver; + } + + /** + * Helidon triggers this method when it receives a new block from the upstream producer. The method notifies all + * the mediator subscribers and sends a response back to the upstream producer. + * + * @param block the block streamed from the upstream producer + */ + @Override + public void onNext(final BlockStreamServiceGrpcProto.Block block) { + + // Notify all the mediator subscribers + streamMediator.notifyAll(block); + + // Send a response back to the upstream producer + final BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().setId(block.getId()).build(); + responseStreamObserver.onNext(blockResponse); + } + + /** + * Helidon triggers this method when an error occurs on the bidirectional stream to the upstream producer. + * + * @param t the error occurred on the stream + */ + @Override + public void onError(final Throwable t) { + LOGGER.log(System.Logger.Level.ERROR, "onError method invoked with an exception", t); + responseStreamObserver.onError(t); + } + + /** + * Helidon triggers this method when the bidirectional stream to the upstream producer is completed. + * Unsubscribe all the observers from the mediator. + */ + @Override + public void onCompleted() { + LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed"); + responseStreamObserver.onCompleted(); + } +} diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 0ec5d0618..c78fae8d4 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -4,7 +4,7 @@ requires com.google.protobuf; requires io.grpc.stub; requires io.helidon.common; + requires io.helidon.config; requires io.helidon.webserver.grpc; requires io.helidon.webserver; - requires java.logging; } diff --git a/server/src/main/resources/application.yaml b/server/src/main/resources/application.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java b/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java new file mode 100644 index 000000000..99af21cf5 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.consumer; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.StreamMediator; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Clock; +import java.time.Instant; +import java.time.InstantSource; +import java.time.ZoneId; + +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class LiveStreamObserverImplTest { + + private final long TIMEOUT_THRESHOLD_MILLIS = 50L; + private final long TEST_TIME = 1_719_427_664_950L; + + @Mock + private StreamMediator streamMediator; + + @Mock + private StreamObserver responseStreamObserver; + + + @Test + public void testConsumerTimeoutWithinWindow() { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + TIMEOUT_THRESHOLD_MILLIS, + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + responseStreamObserver); + BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + liveStreamObserver.notify(newBlock); + + // verify the observer is called with the next + // block and the stream mediator is not unsubscribed + verify(responseStreamObserver).onNext(newBlock); + verify(streamMediator, never()).unsubscribe(liveStreamObserver); + } + + @Test + public void testConsumerTimeoutOutsideWindow() throws InterruptedException { + + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + TIMEOUT_THRESHOLD_MILLIS, + buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + responseStreamObserver); + + final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true); + liveStreamObserver.notify(newBlock); + verify(streamMediator).unsubscribe(liveStreamObserver); + } + + @Test + public void testProducerTimeoutWithinWindow() { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + TIMEOUT_THRESHOLD_MILLIS, + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + responseStreamObserver); + + BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build(); + liveStreamObserver.onNext(blockResponse); + + // verify the mediator is NOT called to unsubscribe the observer + verify(streamMediator, never()).unsubscribe(liveStreamObserver); + } + + @Test + public void testProducerTimeoutOutsideWindow() throws InterruptedException { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + TIMEOUT_THRESHOLD_MILLIS, + buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + responseStreamObserver); + + Thread.sleep(51); + BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build(); + liveStreamObserver.onNext(blockResponse); + + verify(streamMediator).unsubscribe(liveStreamObserver); + } + + private static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) { + return new TestClock(testTime, testTime + timeoutThresholdMillis - 1); + } + + private static InstantSource buildClockOutsideWindow(long testTime, long timeoutThresholdMillis) { + return new TestClock(testTime, testTime + timeoutThresholdMillis + 1); + } + + static class TestClock implements InstantSource { + + private int index; + private final Long[] millis; + + TestClock(Long... millis) { + this.millis = millis; + } + + @Override + public long millis() { + long value = millis[index]; + + // cycle through the provided millis + // and wrap around if necessary + index = index > millis.length - 1 ? 0 : index + 1; + return value; + } + + @Override + public Instant instant() { + return null; + } + } +} diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java new file mode 100644 index 000000000..c467bb919 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.consumer.LiveStreamObserver; +import com.hedera.block.server.persistence.WriteThroughCacheHandler; +import com.hedera.block.server.persistence.storage.BlockStorage; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class LiveStreamMediatorImplTest { + + @Mock + private LiveStreamObserver liveStreamObserver1; + + @Mock + private LiveStreamObserver liveStreamObserver2; + + @Mock + private LiveStreamObserver liveStreamObserver3; + + @Mock + private BlockStorage blockStorage; + + @Test + public void testUnsubscribeEach() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + + // Set up the subscribers + streamMediator.subscribe(liveStreamObserver1); + streamMediator.subscribe(liveStreamObserver2); + streamMediator.subscribe(liveStreamObserver3); + + assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed"); + + streamMediator.unsubscribe(liveStreamObserver1); + assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1"); + + streamMediator.unsubscribe(liveStreamObserver2); + assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2"); + + streamMediator.unsubscribe(liveStreamObserver3); + assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3"); + } + + @Test + public void testMediatorPersistenceWithoutSubscribers() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + + final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + + // Acting as a producer, notify the mediator of a new block + streamMediator.notifyAll(newBlock); + + // Confirm the block was persisted to storage + // even though there are no subscribers + verify(blockStorage).write(newBlock); + } + + @Test + public void testMediatorNotifyAll() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + + // Set up the subscribers + streamMediator.subscribe(liveStreamObserver1); + streamMediator.subscribe(liveStreamObserver2); + streamMediator.subscribe(liveStreamObserver3); + + assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed"); + + final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + + // Acting as a producer, notify the mediator of a new block + streamMediator.notifyAll(newBlock); + + // Confirm each subscriber was notified of the new block + verify(liveStreamObserver1).notify(newBlock); + verify(liveStreamObserver2).notify(newBlock); + verify(liveStreamObserver3).notify(newBlock); + + // Confirm the block was persisted to storage and cache + verify(blockStorage).write(newBlock); + } + +} diff --git a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java new file mode 100644 index 000000000..87e1e83b5 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public final class PersistTestUtils { + + private PersistTestUtils() {} + + public static List generateBlocks(int numOfBlocks) { + return IntStream + .range(1, numOfBlocks + 1) + .mapToObj(i -> BlockStreamServiceGrpcProto.Block + .newBuilder() + .setId(i) + .setValue("block-node-" + i).build() + ) + .collect(Collectors.toList()); + }} diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java new file mode 100644 index 000000000..ef6539a07 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.persistence.storage.BlockStorage; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class RangeTest { + + @Test + public void testReadRangeWithEvenEntries() { + + int maxEntries = 100; + int numOfBlocks = 100; + + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + List blocks = generateBlocks(numOfBlocks); + for (BlockStreamServiceGrpcProto.Block block : blocks) { + blockPersistenceHandler.persist(block); + } + + int window = 10; + int numOfWindows = numOfBlocks / window; + + verifyReadRange(window, numOfWindows, blockPersistenceHandler); + } + + @Test + public void testReadRangeWithNoBlocks() { + int maxEntries = 100; + + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + Queue results = blockPersistenceHandler.readRange(1, 100); + assertNotNull(results); + assertEquals(0, results.size()); + } + + @Test + public void testReadRangeWhenBlocksLessThanWindow() { + int maxEntries = 100; + int numOfBlocks = 9; + + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + List blocks = generateBlocks(numOfBlocks); + for (BlockStreamServiceGrpcProto.Block block : blocks) { + blockPersistenceHandler.persist(block); + } + + int window = 10; + + Queue results = blockPersistenceHandler.readRange(1, window); + assertNotNull(results); + assertEquals(numOfBlocks, results.size()); + } + + private static void verifyReadRange( + int window, + int numOfWindows, + BlockPersistenceHandler blockPersistenceHandler) { + + for (int j = 0; j < numOfWindows;++j) { + + int startBlockId = (j * window) + 1; + int endBlockId = (startBlockId + window) - 1; + Queue results = blockPersistenceHandler.readRange(startBlockId, endBlockId); + + for (int i = startBlockId;i <= endBlockId && results.peek() != null;++i) { + BlockStreamServiceGrpcProto.Block block = results.poll(); + assertNotNull(block); + assertEquals(i, block.getId()); + } + } + } + + private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler(int maxEntries) { + // Mock up a simple, in-memory persistence handler + BlockStorage blockStorage = new NoOpTestBlockStorage(); + return new WriteThroughCacheHandler(blockStorage); + } + + private static class NoOpTestBlockStorage implements BlockStorage { + + private final Map cache; + + public NoOpTestBlockStorage() { + this.cache = new HashMap<>(); + } + + @Override + public Optional write(BlockStreamServiceGrpcProto.Block block) { + cache.put(block.getId(), block); + return Optional.of(block.getId()); + } + + @Override + public Optional read(Long blockId) { + return Optional.ofNullable(cache.get(blockId)); + } + } +} diff --git a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java new file mode 100644 index 000000000..1ec527584 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.persistence.storage.BlockStorage; +import com.hedera.block.server.persistence.storage.FileSystemBlockStorage; +import com.hedera.block.server.util.TestUtils; +import io.helidon.config.Config; +import io.helidon.config.MapConfigSource; +import io.helidon.config.spi.ConfigSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; +import static org.junit.jupiter.api.Assertions.*; + +public class WriteThroughCacheHandlerTest { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private static final String TEMP_DIR = "block-node-unit-test-dir"; + private static final String JUNIT = "my-junit-test"; + + private Path testPath; + private Config testConfig; + + @BeforeEach + public void setUp() throws IOException { + testPath = Files.createTempDirectory(TEMP_DIR); + LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString()); + + Map testProperties = Map.of(JUNIT, testPath.toString()); + ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); + testConfig = Config.builder(testConfigSource).build(); + } + + @AfterEach + public void tearDown() { + TestUtils.deleteDirectory(testPath.toFile()); + } + + @Test + public void testMaxEntriesGreaterThanBlocks() throws IOException { + + int numOfBlocks = 4; + + FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + + List blocks = generateBlocks(numOfBlocks); + verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + } + + @Test + public void testMaxEntriesEqualToBlocks() throws IOException { + int numOfBlocks = 3; + + FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + + List blocks = generateBlocks(numOfBlocks); + verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + } + + @Test + public void testMaxEntriesLessThanBlocks() throws IOException { + int maxEntries = 3; + int numOfBlocks = 4; + + BlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + + List blocks = generateBlocks(numOfBlocks); + verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + } + + private static void verifyPersistenceHandler( + List blocks, + BlockPersistenceHandler blockPersistenceHandler, + Path testPath) throws IOException { + + for (BlockStreamServiceGrpcProto.Block block : blocks) { + + // Save the block + blockPersistenceHandler.persist(block); + + // Read the block + long blockId = block.getId(); + verifyPersistedBlockIsAccessible(blockId, blockPersistenceHandler); + + // Verify the block was written to the fs + verifyFileExists(blockId, block, testPath); + } + } + + private static void verifyPersistedBlockIsAccessible(long blockId, BlockPersistenceHandler blockPersistenceHandler) { + + // Confirm the block is accessible + Optional blockOpt = blockPersistenceHandler.read(blockId); + if (blockOpt.isPresent()) { + assertEquals(blockId, blockOpt.get().getId()); + } else { + fail("Failed to persist block " + blockId); + } + } + + private static void verifyFileExists(long blockId, BlockStreamServiceGrpcProto.Block block, Path testPath) throws IOException { + // Verify the block was saved on the filesystem + Path fullTestPath = testPath.resolve(block.getId() + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); + try (FileInputStream fis = new FileInputStream(fullTestPath.toFile())) { + BlockStreamServiceGrpcProto.Block fetchedBlock = BlockStreamServiceGrpcProto.Block.parseFrom(fis); + assertEquals(blockId, fetchedBlock.getId()); + assertEquals(block.getValue(), fetchedBlock.getValue()); + } + } +} diff --git a/server/src/test/java/com/hedera/block/server/util/TestUtils.java b/server/src/test/java/com/hedera/block/server/util/TestUtils.java new file mode 100644 index 000000000..20f73b2e1 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/util/TestUtils.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.util; + +import java.io.File; + +public final class TestUtils { + private TestUtils() {} + + public static void deleteDirectory(File directoryToBeDeleted) { + File[] allContents = directoryToBeDeleted.listFiles(); + if (allContents != null) { + for (File file : allContents) { + deleteDirectory(file); + } + } + + directoryToBeDeleted.delete(); + } +} diff --git a/server/src/test/resources/consumer.sh b/server/src/test/resources/consumer.sh new file mode 100755 index 000000000..f23d5c7d2 --- /dev/null +++ b/server/src/test/resources/consumer.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +usage_error() { + echo "Usage: $0 [positive-integer]" + exit 1 +} + +# Check if the first argument is provided and is an integer +if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then + usage_error +fi + +# Check if the second argument is provided and if it's a positive integer +if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then + usage_error +fi + +# If the script reaches here, the parameters are valid +echo "The provided integer is: $1" +if [ "$#" -eq 2 ]; then + echo "The optional positive integer is: $2" +fi + +GRPC_SERVER="localhost:8080" +GRPC_METHOD="BlockStreamGrpc/StreamSource" +PATH_TO_PROTO="../../../../protos/src/main/protobuf/blockstream.proto" + +echo "Starting consumer..." + +# Signal handler to handle SIGINT (Ctrl+C) +function cleanup { + echo "Received SIGINT, stopping..." + kill $GRPC_PID + exit 0 +} + +# Trap SIGINT +trap cleanup SIGINT + +# Generate and push messages to the gRPC server as a consumer. +# Response block messages from the gRPC server are printed to stdout. +( + iter=$1 + while true; do + echo "{\"id\": $iter}" + + if [ $iter -eq $2 ]; then + exit 0 + fi + + ((iter++)) + + # Configure the message speed + sleep 1 + + done +) | grpcurl -plaintext -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD + diff --git a/server/src/test/resources/producer.sh b/server/src/test/resources/producer.sh new file mode 100755 index 000000000..aa9a4a4fd --- /dev/null +++ b/server/src/test/resources/producer.sh @@ -0,0 +1,63 @@ +#!/bin/bash + +usage_error() { + echo "Usage: $0 [positive-integer]" + exit 1 +} + +# Check if the first argument is provided and is an integer +if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then + usage_error +fi + +# Check if the second argument is provided and if it's a positive integer +if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then + usage_error +fi + +# If the script reaches here, the parameters are valid +echo "The provided integer is: $1" +if [ "$#" -eq 2 ]; then + echo "The optional positive integer is: $2" +fi + + +GRPC_SERVER="localhost:8080" +GRPC_METHOD="BlockStreamGrpc/StreamSink" +PATH_TO_PROTO="../../../../protos/src/main/protobuf/blockstream.proto" + +echo "Starting producer..." + +# Signal handler to handle SIGINT (Ctrl+C) +function cleanup { + echo "Received SIGINT, stopping..." + kill $GRPC_PID + exit 0 +} + +# Trap SIGINT +trap cleanup SIGINT + +# Generate and push messages to the gRPC server as a producer. +# Response messages from the gRPC server are printed to stdout. +( + iter=$1 + while true; do + echo "{\"id\": $iter, \"value\": \"block-stream-$iter\"}" + + if [ $iter -eq $2 ]; then + exit 0 + fi + + ((iter++)) + + sleep 1 + done +) | grpcurl -vv -plaintext -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD & + +GRPC_PID=$! + +# Wait for grpcurl to finish +wait $GRPC_PID + +echo "Finished" diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e0cdac7a..460789c92 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -35,12 +35,14 @@ dependencyResolutionManagement { create("libs") { // Compile time dependencies version("com.google.protobuf", "3.24.0") - version("io.helidon.webserver.http2", "4.0.8") - version("io.helidon.webserver.grpc", "4.0.8") + version("io.helidon.webserver.http2", "4.0.10") + version("io.helidon.webserver.grpc", "4.0.10") // Testing only versions version("org.assertj.core", "3.23.1") - version("org.junit.jupiter.api", "5.10.0") + version("org.junit.jupiter.api", "5.10.2") + version("org.mockito", "5.8.0") + version("org.mockito.junit.jupiter", "5.8.0") } } }