From f78143f94584b6ee4b67fcf7801a84cfa99a0fab Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Fri, 20 Sep 2024 09:10:11 -0600 Subject: [PATCH] feat: Simulator gRPC Client implementation and initial configuration (#182) Signed-off-by: Alfredo Gutierrez --- .../com.hedera.block.jpms-modules.gradle.kts | 20 +- codecov.yml | 1 + gradle/modules.properties | 2 + settings.gradle.kts | 10 +- simulator/build.gradle.kts | 3 + .../block/simulator/BlockStreamSimulator.java | 3 +- .../simulator/BlockStreamSimulatorApp.java | 49 +++- ...lockStreamSimulatorInjectionComponent.java | 2 + .../hedera/block/simulator/Translator.java | 209 +++++++++++++++ .../config/data/BlockStreamConfig.java | 9 +- .../BlockAsDirBlockStreamManager.java | 164 ++++++++++++ .../BlockAsFileBlockStreamManager.java | 3 + .../generator/GeneratorInjectionModule.java | 21 +- .../simulator/grpc/GrpcInjectionModule.java | 37 +++ .../grpc/PublishStreamGrpcClient.java | 41 +++ .../grpc/PublishStreamGrpcClientImpl.java | 84 ++++++ .../simulator/grpc/PublishStreamObserver.java | 51 ++++ simulator/src/main/java/module-info.java | 3 + simulator/src/main/resources/app.properties | 4 +- .../simulator/BlockStreamSimulatorTest.java | 24 +- .../com/hedera/block/simulator/TestUtils.java | 56 ++++ .../simulator/config/TestConfigBuilder.java | 242 ++++++++++++++++++ .../config/data/BlockStreamConfigTest.java | 44 +++- .../BlockAsDirBlockStreamManagerTest.java | 84 ++++++ .../BlockAsFileBlockStreamManagerTest.java | 22 +- .../GeneratorInjectionModuleTest.java | 72 ++++++ .../grpc/PublishStreamGrpcClientImplTest.java | 62 +++++ .../grpc/PublishStreamObserverTest.java | 42 +++ .../resources/BlockAsDirException/1/1.blk | 1 + .../000000000000000000000000000000000001.blk | Bin .../000000000000000000000000000000000002.blk | Bin .../000000000000000000000000000000000003.blk | Bin .../000000000000000000000000000000000004.blk | Bin .../000000000000000000000000000000000005.blk | Bin .../000000000000000000000000000000000006.blk | Bin .../000000000000000000000000000000000007.blk | Bin .../resources/blockAsDirExample/1/1.blk.gz | Bin 0 -> 95 bytes .../test/resources/blockAsDirExample/1/10.blk | 1 + .../test/resources/blockAsDirExample/1/2.blk | 2 + .../test/resources/blockAsDirExample/1/3.blk | 2 + .../test/resources/blockAsDirExample/1/4.blk | 2 + .../test/resources/blockAsDirExample/1/5.blk | 2 + .../test/resources/blockAsDirExample/1/6.blk | 2 + .../test/resources/blockAsDirExample/1/7.blk | 2 + .../test/resources/blockAsDirExample/1/8.blk | 2 + .../test/resources/blockAsDirExample/1/9.blk | 2 + .../blockAsDirExample/1/notABlockFile.txt | 1 + .../test/resources/blockAsDirExample/2/1.blk | 3 + .../test/resources/blockAsDirExample/2/10.blk | 1 + .../test/resources/blockAsDirExample/2/2.blk | 2 + .../test/resources/blockAsDirExample/2/3.blk | 2 + .../test/resources/blockAsDirExample/2/4.blk | 2 + .../test/resources/blockAsDirExample/2/5.blk | 2 + .../test/resources/blockAsDirExample/2/6.blk | 2 + .../test/resources/blockAsDirExample/2/7.blk | 2 + .../test/resources/blockAsDirExample/2/8.blk | 2 + .../test/resources/blockAsDirExample/2/9.blk | 2 + 57 files changed, 1358 insertions(+), 45 deletions(-) create mode 100644 simulator/src/main/java/com/hedera/block/simulator/Translator.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/TestUtils.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/config/TestConfigBuilder.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java create mode 100644 simulator/src/test/resources/BlockAsDirException/1/1.blk rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000001.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000002.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000003.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000004.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000005.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000006.blk (100%) rename simulator/src/{main => test}/resources/block-0.0.3-blk/000000000000000000000000000000000007.blk (100%) create mode 100644 simulator/src/test/resources/blockAsDirExample/1/1.blk.gz create mode 100644 simulator/src/test/resources/blockAsDirExample/1/10.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/2.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/3.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/4.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/5.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/6.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/7.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/8.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/9.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/1/notABlockFile.txt create mode 100644 simulator/src/test/resources/blockAsDirExample/2/1.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/10.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/2.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/3.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/4.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/5.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/6.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/7.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/8.blk create mode 100644 simulator/src/test/resources/blockAsDirExample/2/9.blk diff --git a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts index 600342b75..b6374db6f 100644 --- a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts +++ b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts @@ -36,6 +36,8 @@ jvmDependencyConflicts.patch { "org.codehaus.mojo:animal-sniffer-annotations" ) + module("io.grpc:grpc-netty-shaded") { annotationLibraries.forEach { removeDependency(it) } } + module("io.grpc:grpc-api") { annotationLibraries.forEach { removeDependency(it) } } module("io.grpc:grpc-core") { annotationLibraries.forEach { removeDependency(it) } } module("io.grpc:grpc-context") { annotationLibraries.forEach { removeDependency(it) } } @@ -87,9 +89,16 @@ extraJavaModuleInfo { exportAllPackages() requireAllDefinedDependencies() requires("java.logging") + uses("io.grpc.ManagedChannelProvider") + uses("io.grpc.NameResolverProvider") + uses("io.grpc.LoadBalancerProvider") } - module("io.grpc:grpc-core", "io.grpc.internal") + module("io.grpc:grpc-core", "io.grpc.internal") { + exportAllPackages() + requireAllDefinedDependencies() + requires("java.logging") + } module("io.grpc:grpc-context", "io.grpc.context") module("io.grpc:grpc-stub", "io.grpc.stub") { exportAllPackages() @@ -101,6 +110,15 @@ extraJavaModuleInfo { module("io.grpc:grpc-util", "io.grpc.util") module("io.grpc:grpc-protobuf", "io.grpc.protobuf") module("io.grpc:grpc-protobuf-lite", "io.grpc.protobuf.lite") + + module("io.grpc:grpc-netty-shaded", "io.grpc.netty.shaded") { + exportAllPackages() + requireAllDefinedDependencies() + requires("java.logging") + requires("jdk.unsupported") + ignoreServiceProvider("reactor.blockhound.integration.BlockHoundIntegration") + } + module("com.github.spotbugs:spotbugs-annotations", "com.github.spotbugs.annotations") module("com.google.code.findbugs:jsr305", "java.annotation") { exportAllPackages() diff --git a/codecov.yml b/codecov.yml index 620cc39a3..ec4798fb7 100644 --- a/codecov.yml +++ b/codecov.yml @@ -22,3 +22,4 @@ ignore: - "server/src/main/java/com/hedera/block/server/Server.java" - "server/src/main/java/com/hedera/block/server/Translator.java" - "simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java" + - "simulator/src/main/java/com/hedera/block/simulator/Translator.java" diff --git a/gradle/modules.properties b/gradle/modules.properties index fcc1dcb94..b63ad263a 100644 --- a/gradle/modules.properties +++ b/gradle/modules.properties @@ -11,6 +11,7 @@ com.github.spotbugs.annotations=com.github.spotbugs:spotbugs-annotations com.lmax.disruptor=com.lmax:disruptor io.helidon.webserver=io.helidon.webserver:helidon-webserver io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc + io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5 io.helidon.logging=io.helidon.logging:helidon-logging-jul @@ -21,6 +22,7 @@ google.proto=com.google.protobuf:protoc io.grpc=io.grpc:grpc-api io.grpc.protobuf=io.grpc:grpc-protobuf io.grpc.stub=io.grpc:grpc-stub +io.grpc.netty.shaded=io.grpc:grpc-netty-shaded com.hedera.pbj.runtime=com.hedera.pbj:pbj-runtime com.google.protobuf=com.google.protobuf:protobuf-java diff --git a/settings.gradle.kts b/settings.gradle.kts index 0d1d79c6d..9dd268f44 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -43,6 +43,9 @@ dependencyResolutionManagement { // Define a constant for the Dagger version. val daggerVersion = "2.42" + // Define a constant for protobuf version. + val protobufVersion = "4.28.2" + // Compile time dependencies version("io.helidon.webserver.http2", "4.1.0") version("io.helidon.webserver.grpc", "4.1.0") @@ -63,14 +66,15 @@ dependencyResolutionManagement { version("io.grpc", "1.65.1") version("io.grpc.protobuf", "1.65.1") version("io.grpc.stub", "1.65.1") + version("io.grpc.netty.shaded", "1.65.1") // Reference from the protobuf plugin - version("google.proto", "4.27.3") + version("google.proto", protobufVersion) version("grpc.protobuf.grpc", "1.65.1") // Google protobuf dependencies - version("com.google.protobuf", "4.27.3") - version("com.google.protobuf.util", "4.27.3") + version("com.google.protobuf", protobufVersion) + version("com.google.protobuf.util", protobufVersion) // PBJ dependencies plugin("pbj", "com.hedera.pbj.pbj-compiler").version("0.9.2") diff --git a/simulator/build.gradle.kts b/simulator/build.gradle.kts index 2a09c9b29..ea112413a 100644 --- a/simulator/build.gradle.kts +++ b/simulator/build.gradle.kts @@ -30,6 +30,8 @@ mainModuleInfo { annotationProcessor("dagger.compiler") annotationProcessor("com.google.auto.service.processor") runtimeOnly("com.swirlds.config.impl") + runtimeOnly("org.apache.logging.log4j.slf4j2.impl") + runtimeOnly("io.grpc.netty.shaded") } testModuleInfo { @@ -37,6 +39,7 @@ testModuleInfo { requires("org.mockito") requires("org.mockito.junit.jupiter") requiresStatic("com.github.spotbugs.annotations") + requires("com.swirlds.common") } tasks.register("untarTestBlockStream") { diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java index c376d2778..16c5fb9c9 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java @@ -39,8 +39,9 @@ private BlockStreamSimulator() {} * * @param args the arguments to be passed to the block stream simulator * @throws IOException if an I/O error occurs + * @throws InterruptedException if the thread is interrupted */ - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException, InterruptedException { LOGGER.log(INFO, "Starting Block Stream Simulator"); diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index 4bef7a217..958d1bd77 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -16,7 +16,10 @@ package com.hedera.block.simulator; +import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.generator.BlockStreamManager; +import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.BlockItem; import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import javax.inject.Inject; @@ -29,6 +32,10 @@ public class BlockStreamSimulatorApp { Configuration configuration; BlockStreamManager blockStreamManager; + PublishStreamGrpcClient publishStreamGrpcClient; + BlockStreamConfig blockStreamConfig; + + private final int delayBetweenBlockItems; boolean isRunning = false; @@ -37,33 +44,51 @@ public class BlockStreamSimulatorApp { * * @param configuration the configuration to be used by the block stream simulator * @param blockStreamManager the block stream manager to be used by the block stream simulator + * @param publishStreamGrpcClient the gRPC client to be used by the block stream simulator */ @Inject public BlockStreamSimulatorApp( - @NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager) { + @NonNull Configuration configuration, + @NonNull BlockStreamManager blockStreamManager, + @NonNull PublishStreamGrpcClient publishStreamGrpcClient) { this.configuration = configuration; this.blockStreamManager = blockStreamManager; - } + this.publishStreamGrpcClient = publishStreamGrpcClient; - /** Starts the block stream simulator. */ - public void start() { + blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); + + delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems(); + } - // use blockStreamManager to get block stream + /** + * Starts the block stream simulator. + * + * @throws InterruptedException if the thread is interrupted + */ + public void start() throws InterruptedException { + int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; + int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; - // use PublishStreamGrpcClient to stream it to the block-node. isRunning = true; LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started"); - // while + boolean streamBlockItem = true; + int blockItemsStreamed = 0; - // get block item - // send block item + while (streamBlockItem) { + // get block item + BlockItem blockItem = blockStreamManager.getNextBlockItem(); + publishStreamGrpcClient.streamBlockItem(blockItem); + blockItemsStreamed++; - // verify if ack is needed - // wait for ack async... + Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); - // verify exit condition + if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) { + streamBlockItem = false; + } + } + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); } /** diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java index cf5589e1a..93ca848dd 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java @@ -18,6 +18,7 @@ import com.hedera.block.simulator.config.ConfigInjectionModule; import com.hedera.block.simulator.generator.GeneratorInjectionModule; +import com.hedera.block.simulator.grpc.GrpcInjectionModule; import com.swirlds.config.api.Configuration; import dagger.BindsInstance; import dagger.Component; @@ -29,6 +30,7 @@ modules = { ConfigInjectionModule.class, GeneratorInjectionModule.class, + GrpcInjectionModule.class, }) public interface BlockStreamSimulatorInjectionComponent { diff --git a/simulator/src/main/java/com/hedera/block/simulator/Translator.java b/simulator/src/main/java/com/hedera/block/simulator/Translator.java new file mode 100644 index 000000000..79c5412b2 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/Translator.java @@ -0,0 +1,209 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator; + +import static java.lang.System.Logger; +import static java.lang.System.Logger.Level.ERROR; +import static java.util.Objects.requireNonNull; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.hedera.hapi.block.PublishStreamRequest; +import com.hedera.hapi.block.PublishStreamResponse; +import com.hedera.hapi.block.SingleBlockResponse; +import com.hedera.hapi.block.SubscribeStreamRequest; +import com.hedera.hapi.block.SubscribeStreamResponse; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.Codec; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import com.hedera.pbj.runtime.io.stream.WritableStreamingData; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * Translator class to convert between PBJ and google protoc objects. + * + *

TODO: Remove this class once the Helidon PBJ gRPC work is integrated. + */ +public final class Translator { + private static final Logger LOGGER = System.getLogger(Translator.class.getName()); + + private static final String INVALID_BUFFER_MESSAGE = + "Invalid protocol buffer converting %s from PBJ to protoc for %s"; + + private Translator() {} + + /** + * Converts a {@link BlockItem} to a {@link com.hedera.hapi.block.stream.protoc.BlockItem}. + * + * @param blockItem the {@link BlockItem} to convert + * @return the converted {@link com.hedera.hapi.block.stream.protoc.BlockItem} + */ + @NonNull + public static com.hedera.hapi.block.stream.protoc.BlockItem fromPbj( + @NonNull final BlockItem blockItem) { + try { + final byte[] pbjBytes = + asBytes(com.hedera.hapi.block.stream.BlockItem.PROTOBUF, blockItem); + return com.hedera.hapi.block.stream.protoc.BlockItem.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted("SingleBlockResponse", blockItem); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts a {@link SingleBlockResponse} to a {@link + * com.hedera.hapi.block.protoc.SingleBlockResponse}. + * + * @param singleBlockResponse the {@link SingleBlockResponse} to convert + * @return the converted {@link com.hedera.hapi.block.protoc.SingleBlockResponse} + */ + @NonNull + public static com.hedera.hapi.block.protoc.SingleBlockResponse fromPbj( + @NonNull final SingleBlockResponse singleBlockResponse) { + try { + final byte[] pbjBytes = asBytes(SingleBlockResponse.PROTOBUF, singleBlockResponse); + return com.hedera.hapi.block.protoc.SingleBlockResponse.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted("SingleBlockResponse", singleBlockResponse); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts a {@link PublishStreamResponse} to a {@link + * com.hedera.hapi.block.protoc.PublishStreamResponse}. + * + * @param publishStreamResponse the {@link PublishStreamResponse} to convert + * @return the converted {@link com.hedera.hapi.block.protoc.PublishStreamResponse} + */ + @NonNull + public static com.hedera.hapi.block.protoc.PublishStreamResponse fromPbj( + @NonNull final PublishStreamResponse publishStreamResponse) { + try { + final byte[] pbjBytes = asBytes(PublishStreamResponse.PROTOBUF, publishStreamResponse); + return com.hedera.hapi.block.protoc.PublishStreamResponse.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted( + "PublishStreamResponse", publishStreamResponse); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts a {@link PublishStreamRequest} to a {@link + * com.hedera.hapi.block.protoc.PublishStreamRequest}. + * + * @param publishStreamRequest the {@link PublishStreamRequest} to convert + * @return the converted {@link com.hedera.hapi.block.protoc.PublishStreamRequest} + */ + @NonNull + public static com.hedera.hapi.block.protoc.PublishStreamRequest fromPbj( + @NonNull final PublishStreamRequest publishStreamRequest) { + try { + final byte[] pbjBytes = asBytes(PublishStreamRequest.PROTOBUF, publishStreamRequest); + return com.hedera.hapi.block.protoc.PublishStreamRequest.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted("PublishStreamRequest", publishStreamRequest); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts a {@link SubscribeStreamResponse} to a {@link + * com.hedera.hapi.block.protoc.SubscribeStreamResponse}. + * + * @param subscribeStreamResponse the {@link SubscribeStreamResponse} to convert + * @return the converted {@link com.hedera.hapi.block.protoc.SubscribeStreamResponse} + */ + @NonNull + public static com.hedera.hapi.block.protoc.SubscribeStreamResponse fromPbj( + @NonNull final SubscribeStreamResponse subscribeStreamResponse) { + try { + final byte[] pbjBytes = + asBytes(SubscribeStreamResponse.PROTOBUF, subscribeStreamResponse); + return com.hedera.hapi.block.protoc.SubscribeStreamResponse.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted( + "SubscribeStreamResponse", subscribeStreamResponse); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts a {@link SubscribeStreamRequest} to a {@link + * com.hedera.hapi.block.protoc.SubscribeStreamRequest}. + * + * @param subscribeStreamRequest the {@link SubscribeStreamRequest} to convert + * @return the converted {@link com.hedera.hapi.block.protoc.SubscribeStreamRequest} + */ + @NonNull + public static com.hedera.hapi.block.protoc.SubscribeStreamRequest fromPbj( + @NonNull final SubscribeStreamRequest subscribeStreamRequest) { + try { + final byte[] pbjBytes = + asBytes(SubscribeStreamRequest.PROTOBUF, subscribeStreamRequest); + return com.hedera.hapi.block.protoc.SubscribeStreamRequest.parseFrom(pbjBytes); + } catch (InvalidProtocolBufferException e) { + final String message = + INVALID_BUFFER_MESSAGE.formatted( + "SubscribeStreamRequest", subscribeStreamRequest); + LOGGER.log(ERROR, message); + throw new RuntimeException(message, e); + } + } + + /** + * Converts protoc bytes to a PBJ record of the same type. + * + * @param the type of PBJ record to convert to + * @param codec the record codec to convert the bytes to a PBJ record + * @param bytes the protoc bytes to convert to a PBJ record + * @return the converted PBJ record + * @throws ParseException if the conversion between the protoc bytes and PBJ objects fails + */ + @NonNull + public static T toPbj( + @NonNull final Codec codec, @NonNull final byte[] bytes) throws ParseException { + return codec.parse(Bytes.wrap(bytes)); + } + + @NonNull + private static byte[] asBytes(@NonNull Codec codec, @NonNull T tx) { + requireNonNull(codec); + requireNonNull(tx); + try { + final var bytes = new ByteArrayOutputStream(); + codec.write(tx, new WritableStreamingData(bytes)); + return bytes.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Unable to convert from PBJ to bytes", e); + } + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 5ee82acd6..299ec0b5c 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -28,11 +28,18 @@ * * @param generationMode the mode of generation for the block stream * @param folderRootPath the root path of the folder containing the block stream + * @param delayBetweenBlockItems the delay between block items + * @param managerImplementation the implementation of the block stream manager + * @param maxBlockItemsToStream the maximum number of block items to stream */ @ConfigData("blockStream") public record BlockStreamConfig( @ConfigProperty(defaultValue = "DIR") GenerationMode generationMode, - @ConfigProperty(defaultValue = "") String folderRootPath) { + @ConfigProperty(defaultValue = "") String folderRootPath, + @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, + @ConfigProperty(defaultValue = "BlockAsFileBlockStreamManager") + String managerImplementation, + @ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream) { /** * Constructor to set the default root path if not provided, it will be set to the data diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java new file mode 100644 index 000000000..fcf7c9349 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.generator; + +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.ERROR; +import static java.lang.System.Logger.Level.INFO; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Stream; +import javax.inject.Inject; + +/** + * The BlockAsDirBlockStreamManager class implements the BlockStreamManager interface to manage the + * block stream from a directory. + */ +public class BlockAsDirBlockStreamManager implements BlockStreamManager { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + final String rootFolder; + + final List blocks = new ArrayList<>(); + + int currentBlockIndex = 0; + int currentBlockItemIndex = 0; + int lastGivenBlockNumber = 0; + + /** + * Constructor to initialize the BlockAsDirBlockStreamManager with the block stream + * configuration. + * + * @param blockStreamConfig the block stream configuration + */ + @Inject + public BlockAsDirBlockStreamManager(@NonNull BlockStreamConfig blockStreamConfig) { + this.rootFolder = blockStreamConfig.folderRootPath(); + try { + this.loadBlocks(); + } catch (IOException | ParseException | IllegalArgumentException e) { + LOGGER.log(ERROR, "Error loading blocks", e); + throw new RuntimeException(e); + } + + LOGGER.log(INFO, "Loaded " + blocks.size() + " blocks into memory"); + } + + /** Generation Mode of the implementation */ + @Override + public GenerationMode getGenerationMode() { + return GenerationMode.DIR; + } + + /** gets the next block item from the manager */ + @Override + public BlockItem getNextBlockItem() { + BlockItem nextBlockItem = blocks.get(currentBlockIndex).items().get(currentBlockItemIndex); + currentBlockItemIndex++; + if (currentBlockItemIndex >= blocks.get(currentBlockIndex).items().size()) { + currentBlockItemIndex = 0; + currentBlockIndex++; + if (currentBlockIndex >= blocks.size()) { + currentBlockIndex = 0; + } + } + return nextBlockItem; + } + + /** gets the next block from the manager */ + @Override + public Block getNextBlock() { + Block nextBlock = blocks.get(currentBlockIndex); + currentBlockIndex++; + lastGivenBlockNumber++; + if (currentBlockIndex >= blocks.size()) { + currentBlockIndex = 0; + } + return nextBlock; + } + + private void loadBlocks() throws IOException, ParseException { + Path rootPath = Path.of(rootFolder); + + try (Stream blockDirs = Files.list(rootPath).filter(Files::isDirectory)) { + List sortedBlockDirs = + blockDirs.sorted(Comparator.comparing(Path::getFileName)).toList(); + + for (Path blockDirPath : sortedBlockDirs) { + List parsedBlockItems = new ArrayList<>(); + + try (Stream blockItems = + Files.list(blockDirPath).filter(Files::isRegularFile)) { + List sortedBlockItems = + blockItems + .sorted( + Comparator.comparing( + BlockAsDirBlockStreamManager + ::extractNumberFromPath)) + .toList(); + + for (Path pathBlockItem : sortedBlockItems) { + byte[] blockItemBytes = readBlockItemBytes(pathBlockItem); + // if null means the file is not a block item and we can skip the file. + if (blockItemBytes == null) { + continue; + } + BlockItem blockItem = BlockItem.PROTOBUF.parse(Bytes.wrap(blockItemBytes)); + parsedBlockItems.add(blockItem); + } + } + + blocks.add(Block.newBuilder().items(parsedBlockItems).build()); + LOGGER.log(DEBUG, "Loaded block: " + blockDirPath); + } + } + } + + private byte[] readBlockItemBytes(Path pathBlockItem) throws IOException { + if (pathBlockItem.toString().endsWith(".gz")) { + return Utils.readGzFile(pathBlockItem); + } else if (pathBlockItem.toString().endsWith(".blk")) { + return Files.readAllBytes(pathBlockItem); + } + return null; + } + + // Method to extract the numeric part of the filename from a Path object + // Returns -1 if the filename is not a valid number + private static int extractNumberFromPath(Path path) { + String filename = path.getFileName().toString(); + String numPart = filename.split("\\.")[0]; // Get the part before the first dot + try { + return Integer.parseInt(numPart); + } catch (NumberFormatException e) { + return -1; // Return -1 if parsing fails + } + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java index d4ae102b2..d973b71ba 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java @@ -79,6 +79,9 @@ public BlockItem getNextBlockItem() { if (currentBlockItemIndex >= blocks.get(currentBlockIndex).items().size()) { currentBlockItemIndex = 0; currentBlockIndex++; + if (currentBlockIndex >= blocks.size()) { + currentBlockIndex = 0; + } } return nextBlockItem; } diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java index 9bf0c319f..62b7451e4 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java @@ -16,8 +16,9 @@ package com.hedera.block.simulator.generator; -import dagger.Binds; +import com.hedera.block.simulator.config.data.BlockStreamConfig; import dagger.Module; +import dagger.Provides; import javax.inject.Singleton; /** The module used to inject the block stream manager. */ @@ -25,13 +26,21 @@ public interface GeneratorInjectionModule { /** - * Provides the block stream manager. + * Provides the block stream manager based on the configuration, either + * BlockAsFileBlockStreamManager or BlockAsDirBlockStreamManager. by default, it will be + * BlockAsFileBlockStreamManager. * - * @param blockAsFileBlockStreamManager the block as file block stream manager + * @param config the block stream configuration * @return the block stream manager */ @Singleton - @Binds - BlockStreamManager provideBlockStreamManager( - BlockAsFileBlockStreamManager blockAsFileBlockStreamManager); + @Provides + static BlockStreamManager providesBlockStreamManager(BlockStreamConfig config) { + + if ("BlockAsDirBlockStreamManager".equalsIgnoreCase(config.managerImplementation())) { + return new BlockAsDirBlockStreamManager(config); + } + + return new BlockAsFileBlockStreamManager(config); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java new file mode 100644 index 000000000..dd058323e --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import dagger.Binds; +import dagger.Module; +import javax.inject.Singleton; + +/** The module used to inject the gRPC client. */ +@Module +public interface GrpcInjectionModule { + + /** + * Binds the PublishStreamGrpcClient to the PublishStreamGrpcClientImpl. + * + * @param publishStreamGrpcClient the PublishStreamGrpcClientImpl + * @return the PublishStreamGrpcClient + */ + @Singleton + @Binds + PublishStreamGrpcClient bindPublishStreamGrpcClient( + PublishStreamGrpcClientImpl publishStreamGrpcClient); +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java new file mode 100644 index 000000000..51cead506 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; + +/** + * The PublishStreamGrpcClient interface provides the methods to stream the block and block item. + */ +public interface PublishStreamGrpcClient { + /** + * Streams the block item. + * + * @param blockItem the block item to be streamed + * @return true if the block item is streamed successfully, false otherwise + */ + boolean streamBlockItem(BlockItem blockItem); + + /** + * Streams the block. + * + * @param block the block to be streamed + * @return true if the block is streamed successfully, false otherwise + */ + boolean streamBlock(Block block); +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java new file mode 100644 index 000000000..d8d85803d --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import com.hedera.block.simulator.Translator; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc; +import com.hedera.hapi.block.protoc.PublishStreamRequest; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import javax.inject.Inject; + +/** + * The PublishStreamGrpcClientImpl class provides the methods to stream the block and block item. + */ +public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { + + private final BlockStreamServiceGrpc.BlockStreamServiceStub stub; + private final StreamObserver requestStreamObserver; + + /** + * Creates a new PublishStreamGrpcClientImpl instance. + * + * @param grpcConfig the gRPC configuration + */ + @Inject + public PublishStreamGrpcClientImpl(@NonNull GrpcConfig grpcConfig) { + ManagedChannel channel = + ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port()) + .usePlaintext() + .build(); + stub = BlockStreamServiceGrpc.newStub(channel); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + requestStreamObserver = stub.publishBlockStream(publishStreamObserver); + } + + /** + * The PublishStreamObserver class implements the StreamObserver interface to observe the + * stream. + */ + @Override + public boolean streamBlockItem(BlockItem blockItem) { + requestStreamObserver.onNext( + PublishStreamRequest.newBuilder() + .setBlockItem(Translator.fromPbj(blockItem)) + .build()); + + return true; + } + + /** + * The PublishStreamObserver class implements the StreamObserver interface to observe the + * stream. + */ + @Override + public boolean streamBlock(Block block) { + for (BlockItem blockItem : block.items()) { + streamBlockItem(blockItem); + } + + // wait for ack on the block + // if and when the ack is received return true + + return true; + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java new file mode 100644 index 000000000..0a48f0846 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import com.hedera.hapi.block.protoc.PublishStreamResponse; +import io.grpc.stub.StreamObserver; +import java.lang.System.Logger; + +/** + * The PublishStreamObserver class provides the methods to observe the stream of the published + * stream. + */ +public class PublishStreamObserver implements StreamObserver { + + private final Logger logger = System.getLogger(getClass().getName()); + + /** Creates a new PublishStreamObserver instance. */ + public PublishStreamObserver() {} + + /** what will the stream observer do with the response from the server */ + @Override + public void onNext(PublishStreamResponse publishStreamResponse) { + logger.log(Logger.Level.INFO, "Received Response: " + publishStreamResponse.toString()); + } + + /** what will the stream observer do when an error occurs */ + @Override + public void onError(Throwable throwable) { + logger.log(Logger.Level.ERROR, "Error: " + throwable.toString()); + } + + /** what will the stream observer do when the stream is completed */ + @Override + public void onCompleted() { + logger.log(Logger.Level.DEBUG, "Completed"); + } +} diff --git a/simulator/src/main/java/module-info.java b/simulator/src/main/java/module-info.java index 39021c53d..9750258d5 100644 --- a/simulator/src/main/java/module-info.java +++ b/simulator/src/main/java/module-info.java @@ -7,10 +7,13 @@ requires static com.github.spotbugs.annotations; requires static com.google.auto.service; requires com.hedera.block.stream; + requires com.google.protobuf; requires com.hedera.pbj.runtime; requires com.swirlds.config.api; requires com.swirlds.config.extensions; requires dagger; + requires io.grpc.stub; + requires io.grpc; requires javax.inject; provides com.swirlds.config.api.ConfigurationExtension with diff --git a/simulator/src/main/resources/app.properties b/simulator/src/main/resources/app.properties index 8b1378917..4cff5ac6d 100644 --- a/simulator/src/main/resources/app.properties +++ b/simulator/src/main/resources/app.properties @@ -1 +1,3 @@ - +#blockStream.delayBetweenBlockItems=2000000 +#blockStream.folderRootPath=/Users/user/Projects/hedera-block-node/server/FakeProducedBlock +#blockStream.managerImplementation=BlockAsDirBlockStreamManager diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 2572d6b02..bd646cf53 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -19,27 +19,37 @@ import static org.junit.jupiter.api.Assertions.*; import com.hedera.block.simulator.generator.BlockStreamManager; +import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; import com.swirlds.config.api.Configuration; +import java.io.IOException; +import java.util.Map; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class BlockStreamSimulatorAppTest { +class BlockStreamSimulatorTest { - @Mock private Configuration configuration; + private Configuration configuration; @Mock private BlockStreamManager blockStreamManager; - @InjectMocks private BlockStreamSimulatorApp blockStreamSimulator; + @Mock private PublishStreamGrpcClient publishStreamGrpcClient; + + private BlockStreamSimulatorApp blockStreamSimulator; @BeforeEach - void setUp() { - blockStreamSimulator = new BlockStreamSimulatorApp(configuration, blockStreamManager); + void setUp() throws IOException { + + configuration = + TestUtils.getTestConfiguration(Map.of("blockStream.maxBlockItemsToStream", "100")); + + blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); } @AfterEach @@ -48,7 +58,7 @@ void tearDown() { } @Test - void start_logsStartedMessage() { + void start_logsStartedMessage() throws InterruptedException { blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); } diff --git a/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java b/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java new file mode 100644 index 000000000..952c0be49 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator; + +import com.hedera.block.simulator.config.TestConfigBuilder; +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.swirlds.config.api.Configuration; +import com.swirlds.config.extensions.sources.ClasspathFileConfigSource; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; + +public class TestUtils { + + private static final String TEST_APP_PROPERTIES_FILE = "app.properties"; + + public static Configuration getTestConfiguration(@NonNull Map customProperties) + throws IOException { + // create test configuration + TestConfigBuilder testConfigBuilder = + new TestConfigBuilder(true) + .withSource( + new ClasspathFileConfigSource(Path.of(TEST_APP_PROPERTIES_FILE))); + + for (Map.Entry entry : customProperties.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + testConfigBuilder = testConfigBuilder.withValue(key, value); + } + + testConfigBuilder = testConfigBuilder.withConfigDataType(BlockStreamConfig.class); + testConfigBuilder = testConfigBuilder.withConfigDataType(GrpcConfig.class); + + return testConfigBuilder.getOrCreateConfig(); + } + + public static Configuration getTestConfiguration() throws IOException { + return getTestConfiguration(Map.of()); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/TestConfigBuilder.java b/simulator/src/test/java/com/hedera/block/simulator/config/TestConfigBuilder.java new file mode 100644 index 000000000..fad02a684 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/config/TestConfigBuilder.java @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config; + +import com.swirlds.common.config.singleton.ConfigurationHolder; +import com.swirlds.common.threading.locks.AutoClosableLock; +import com.swirlds.common.threading.locks.Locks; +import com.swirlds.common.threading.locks.locked.Locked; +import com.swirlds.config.api.ConfigData; +import com.swirlds.config.api.Configuration; +import com.swirlds.config.api.ConfigurationBuilder; +import com.swirlds.config.api.converter.ConfigConverter; +import com.swirlds.config.api.source.ConfigSource; +import com.swirlds.config.api.validation.ConfigValidator; +import com.swirlds.config.extensions.sources.SimpleConfigSource; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.Objects; + +/** + * Helper for use the config in test and change the config for specific tests. Instance can be used + * per class or per test. + */ +public class TestConfigBuilder { + + private final AutoClosableLock configLock = Locks.createAutoLock(); + + private Configuration configuration; + + private final ConfigurationBuilder builder; + + /** + * Creates a new instance and automatically registers all config data records (see {@link + * ConfigData}) on classpath / modulepath that are part of the packages {@code com.swirlds} and + * {@code com.hedera}. + */ + public TestConfigBuilder() { + this(true); + } + + /** + * Creates a new instance and automatically registers all given config data records. This call + * will not do a class scan for config data records on classpath / modulepath like some of the + * other constructors do. + * + * @param dataTypes + */ + public TestConfigBuilder(@Nullable final Class dataTypes) { + this(false); + if (dataTypes != null) { + this.builder.withConfigDataTypes(dataTypes); + } + } + + /** + * Creates a new instance and automatically registers all config data records (see {@link + * ConfigData}) on classpath / modulepath that are part of the packages {@code com.swirlds} and + * {@code com.hedera}. if the {@code registerAllTypes} param is true. + * + * @param registerAllTypes if true all config data records on classpath will automatically be + * registered + */ + public TestConfigBuilder(final boolean registerAllTypes) { + if (registerAllTypes) { + this.builder = ConfigurationBuilder.create().autoDiscoverExtensions(); + } else { + this.builder = ConfigurationBuilder.create(); + } + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue( + @NonNull final String propertyName, @Nullable final String value) { + return withSource(new SimpleConfigSource(propertyName, value)); + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue(@NonNull final String propertyName, final int value) { + return withSource(new SimpleConfigSource(propertyName, value)); + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue(@NonNull final String propertyName, final double value) { + return withSource(new SimpleConfigSource(propertyName, value)); + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue(@NonNull final String propertyName, final long value) { + return withSource(new SimpleConfigSource(propertyName, value)); + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue(@NonNull final String propertyName, final boolean value) { + return withSource(new SimpleConfigSource(propertyName, value)); + } + + /** + * Sets the value for the config. + * + * @param propertyName name of the property + * @param value the value + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValue( + @NonNull final String propertyName, @NonNull final Object value) { + Objects.requireNonNull(value, "value must not be null"); + return withSource(new SimpleConfigSource(propertyName, value.toString())); + } + + /** + * This method returns the {@link Configuration} instance. If the method is called for the first + * time the {@link Configuration} instance will be created. All values that have been set (see + * {@link #withValue(String, int)}) methods will be part of the config. Next to this the config + * will support all config data record types (see {@link ConfigData}) that are on the classpath. + * + * @return the created configuration + */ + @NonNull + @SuppressWarnings({"removal"}) + public Configuration getOrCreateConfig() { + try (final Locked ignore = configLock.lock()) { + if (configuration == null) { + configuration = builder.build(); + ConfigurationHolder.getInstance().setConfiguration(configuration); + } + return configuration; + } + } + + private void checkConfigState() { + try (final Locked ignore = configLock.lock()) { + if (configuration != null) { + throw new IllegalStateException("Configuration already created!"); + } + } + } + + /** + * Adds the given config source to the builder + * + * @param configSource the config source that will be added + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withSource(@NonNull final ConfigSource configSource) { + checkConfigState(); + builder.withSource(configSource); + return this; + } + + /** + * Adds the given config converter to the builder + * + * @param converterType the type of the config converter + * @param converter the config converter that will be added + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withConverter( + @NonNull final Class converterType, @NonNull final ConfigConverter converter) { + checkConfigState(); + builder.withConverter(converterType, converter); + return this; + } + + /** + * Adds the given config validator to the builder + * + * @param validator the config validator that will be added + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withValidator(@NonNull final ConfigValidator validator) { + checkConfigState(); + builder.withValidator(validator); + return this; + } + + /** + * Adds the given config data type to the builder + * + * @param type the config data type that will be added + * @param the type of the config data type + * @return the {@link TestConfigBuilder} instance (for fluent API) + */ + @NonNull + public TestConfigBuilder withConfigDataType(@NonNull final Class type) { + checkConfigState(); + builder.withConfigDataType(type); + return this; + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index f270aa4cf..940c4edb5 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -26,6 +26,10 @@ class BlockStreamConfigTest { + private final int delayBetweenBlockItems = 1_500_000; + private final String blockStreamManagerImplementation = "BlockAsFileBlockStreamManager"; + private final int maxBlockItemsToStream = 10_000; + private String getAbsoluteFolder(String relativePath) { return Paths.get(relativePath).toAbsolutePath().toString(); } @@ -42,7 +46,13 @@ void testValidAbsolutePath() { assertTrue(Files.exists(path), "The folder must exist for this test."); // No exception should be thrown - BlockStreamConfig config = new BlockStreamConfig(generationMode, folderRootPath); + BlockStreamConfig config = + new BlockStreamConfig( + generationMode, + folderRootPath, + delayBetweenBlockItems, + blockStreamManagerImplementation, + maxBlockItemsToStream); assertEquals(folderRootPath, config.folderRootPath()); assertEquals(GenerationMode.DIR, config.generationMode()); @@ -55,7 +65,13 @@ void testEmptyFolderRootPath() { GenerationMode generationMode = GenerationMode.DIR; // No exception should be thrown, and the default folder should be used - BlockStreamConfig config = new BlockStreamConfig(generationMode, folderRootPath); + BlockStreamConfig config = + new BlockStreamConfig( + generationMode, + folderRootPath, + delayBetweenBlockItems, + blockStreamManagerImplementation, + maxBlockItemsToStream); // Verify that the path is set to the default Path expectedPath = Paths.get("src/main/resources/block-0.0.3/").toAbsolutePath(); @@ -73,7 +89,13 @@ void testRelativeFolderPathThrowsException() { IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, - () -> new BlockStreamConfig(generationMode, relativeFolderPath)); + () -> + new BlockStreamConfig( + generationMode, + relativeFolderPath, + delayBetweenBlockItems, + blockStreamManagerImplementation, + maxBlockItemsToStream)); // Verify the exception message assertEquals(relativeFolderPath + " Root path must be absolute", exception.getMessage()); @@ -93,7 +115,13 @@ void testNonExistentFolderThrowsException() { IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, - () -> new BlockStreamConfig(generationMode, folderRootPath)); + () -> + new BlockStreamConfig( + generationMode, + folderRootPath, + delayBetweenBlockItems, + blockStreamManagerImplementation, + maxBlockItemsToStream)); // Verify the exception message assertEquals("Folder does not exist: " + path, exception.getMessage()); @@ -106,7 +134,13 @@ void testGenerationModeNonDirDoesNotCheckFolderExistence() { GenerationMode generationMode = GenerationMode.ADHOC; // No exception should be thrown because generation mode is not DIR - BlockStreamConfig config = new BlockStreamConfig(generationMode, folderRootPath); + BlockStreamConfig config = + new BlockStreamConfig( + generationMode, + folderRootPath, + delayBetweenBlockItems, + blockStreamManagerImplementation, + maxBlockItemsToStream); // Verify that the configuration was created successfully assertEquals(folderRootPath, config.folderRootPath()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java new file mode 100644 index 000000000..1fb6d3b7a --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.generator; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.GenerationMode; +import java.nio.file.Paths; +import org.junit.jupiter.api.Test; + +class BlockAsDirBlockStreamManagerTest { + + private final String rootFolder = "src/test/resources/blockAsDirExample/"; + + private String getAbsoluteFolder(String relativePath) { + return Paths.get(relativePath).toAbsolutePath().toString(); + } + + @Test + void getGenerationMode() { + BlockStreamManager blockStreamManager = + getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); + assertEquals(GenerationMode.DIR, blockStreamManager.getGenerationMode()); + + assertEquals(GenerationMode.DIR, blockStreamManager.getGenerationMode()); + } + + @Test + void getNextBlockItem() { + BlockStreamManager blockStreamManager = + getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); + + for (int i = 0; i < 1000; i++) { + assertNotNull(blockStreamManager.getNextBlockItem()); + } + } + + @Test + void getNextBlock() { + BlockStreamManager blockStreamManager = + getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); + + for (int i = 0; i < 3000; i++) { + assertNotNull(blockStreamManager.getNextBlock()); + } + } + + @Test + void BlockAsFileBlockStreamManagerInvalidRootPath() { + assertThrows( + RuntimeException.class, + () -> + getBlockAsDirBlockStreamManager( + getAbsoluteFolder("src/test/resources/BlockAsDirException/"))); + } + + private BlockStreamManager getBlockAsDirBlockStreamManager(String rootFolder) { + BlockStreamConfig blockStreamConfig = + new BlockStreamConfig( + GenerationMode.DIR, + rootFolder, + 1_500_000, + "BlockAsDirBlockStreamManager", + 10_000); + return new BlockAsDirBlockStreamManager(blockStreamConfig); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java index d257a6345..d22ca15c4 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java @@ -38,11 +38,6 @@ void getGenerationMode() { assertEquals(GenerationMode.DIR, blockStreamManager.getGenerationMode()); } - @Test - void BlockAsFileBlockStreamManagerInvalidRootPath() { - assertThrows(RuntimeException.class, () -> getBlockAsFileBlockStreamManager("/etc")); - } - @Test void getNextBlock() { BlockStreamManager blockStreamManager = @@ -56,21 +51,32 @@ void getNextBlock() { void getNextBlockItem() { BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder)); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 35000; i++) { assertNotNull(blockStreamManager.getNextBlockItem()); } } @Test void loadBlockBlk() { - String blkRootFolder = "src/main/resources/block-0.0.3-blk/"; + String blkRootFolder = "src/test/resources/block-0.0.3-blk/"; BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(blkRootFolder)); assertNotNull(blockStreamManager.getNextBlock()); } + @Test + void BlockAsFileBlockStreamManagerInvalidRootPath() { + assertThrows(RuntimeException.class, () -> getBlockAsFileBlockStreamManager("/etc")); + } + private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String rootFolder) { - BlockStreamConfig blockStreamConfig = new BlockStreamConfig(GenerationMode.DIR, rootFolder); + BlockStreamConfig blockStreamConfig = + new BlockStreamConfig( + GenerationMode.DIR, + rootFolder, + 1_500_000, + "BlockAsFileBlockStreamManager", + 10_000); return new BlockAsFileBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java new file mode 100644 index 000000000..f47296900 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.generator; + +import static org.junit.jupiter.api.Assertions.*; + +import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import java.io.IOException; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class GeneratorInjectionModuleTest { + + @Test + void providesBlockStreamManager_AsFile() throws IOException { + BlockStreamConfig blockStreamConfig = + TestUtils.getTestConfiguration().getConfigData(BlockStreamConfig.class); + + BlockStreamManager blockStreamManager = + GeneratorInjectionModule.providesBlockStreamManager(blockStreamConfig); + + assertEquals( + blockStreamManager.getClass().getName(), + BlockAsFileBlockStreamManager.class.getName()); + } + + @Test + void providesBlockStreamManager_AsDir() throws IOException { + BlockStreamConfig blockStreamConfig = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.managerImplementation", + "BlockAsDirBlockStreamManager")) + .getConfigData(BlockStreamConfig.class); + + BlockStreamManager blockStreamManager = + GeneratorInjectionModule.providesBlockStreamManager(blockStreamConfig); + + assertEquals( + blockStreamManager.getClass().getName(), + BlockAsDirBlockStreamManager.class.getName()); + } + + @Test + void providesBlockStreamManager_default() throws IOException { + BlockStreamConfig blockStreamConfig = + TestUtils.getTestConfiguration(Map.of("blockStream.managerImplementation", "")) + .getConfigData(BlockStreamConfig.class); + + BlockStreamManager blockStreamManager = + GeneratorInjectionModule.providesBlockStreamManager(blockStreamConfig); + + assertEquals( + blockStreamManager.getClass().getName(), + BlockAsFileBlockStreamManager.class.getName()); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java new file mode 100644 index 000000000..526aaafce --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import static org.junit.jupiter.api.Assertions.*; + +import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import java.io.IOException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class PublishStreamGrpcClientImplTest { + + GrpcConfig grpcConfig; + + @BeforeEach + void setUp() throws IOException { + + grpcConfig = TestUtils.getTestConfiguration().getConfigData(GrpcConfig.class); + } + + @AfterEach + void tearDown() {} + + @Test + void streamBlockItem() { + BlockItem blockItem = BlockItem.newBuilder().build(); + PublishStreamGrpcClientImpl publishStreamGrpcClient = + new PublishStreamGrpcClientImpl(grpcConfig); + boolean result = publishStreamGrpcClient.streamBlockItem(blockItem); + assertTrue(result); + } + + @Test + void streamBlock() { + BlockItem blockItem = BlockItem.newBuilder().build(); + Block block = Block.newBuilder().items(blockItem).build(); + + PublishStreamGrpcClientImpl publishStreamGrpcClient = + new PublishStreamGrpcClientImpl(grpcConfig); + boolean result = publishStreamGrpcClient.streamBlock(block); + assertTrue(result); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java new file mode 100644 index 000000000..73c2c8222 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc; + +import com.hedera.hapi.block.protoc.PublishStreamResponse; +import org.junit.jupiter.api.Test; + +class PublishStreamObserverTest { + + @Test + void onNext() { + PublishStreamResponse response = PublishStreamResponse.newBuilder().build(); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + publishStreamObserver.onNext(response); + } + + @Test + void onError() { + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + publishStreamObserver.onError(new Throwable()); + } + + @Test + void onCompleted() { + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + publishStreamObserver.onCompleted(); + } +} diff --git a/simulator/src/test/resources/BlockAsDirException/1/1.blk b/simulator/src/test/resources/BlockAsDirException/1/1.blk new file mode 100644 index 000000000..bf7e89904 --- /dev/null +++ b/simulator/src/test/resources/BlockAsDirException/1/1.blk @@ -0,0 +1 @@ +asdasd diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000001.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000001.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000001.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000001.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000002.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000002.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000002.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000002.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000003.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000003.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000003.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000003.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000004.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000004.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000004.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000004.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000005.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000005.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000005.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000005.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000006.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000006.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000006.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000006.blk diff --git a/simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000007.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000007.blk similarity index 100% rename from simulator/src/main/resources/block-0.0.3-blk/000000000000000000000000000000000007.blk rename to simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000007.blk diff --git a/simulator/src/test/resources/blockAsDirExample/1/1.blk.gz b/simulator/src/test/resources/blockAsDirExample/1/1.blk.gz new file mode 100644 index 0000000000000000000000000000000000000000..a87ab5adcdfe368f9fe6ef08b3ed6bcd09aa9a45 GIT binary patch literal 95 zcmV-l0HFULiwFqpM(kz)12HaQY-<4H^5oLyU{n$>EX`CXEl~hc#R`cjnFX1}naP=X z=~{d#`8oMT3i-*&iK)qnB|;J$OiIkD#U)z&#ie;A3dNZ~0<1)W5diY5UQ|l}005?2 BD3$;K literal 0 HcmV?d00001 diff --git a/simulator/src/test/resources/blockAsDirExample/1/10.blk b/simulator/src/test/resources/blockAsDirExample/1/10.blk new file mode 100644 index 000000000..6460e717a --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/10.blk @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/2.blk b/simulator/src/test/resources/blockAsDirExample/1/2.blk new file mode 100644 index 000000000..4ee31e2dd --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/2.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/3.blk b/simulator/src/test/resources/blockAsDirExample/1/3.blk new file mode 100644 index 000000000..e2851d8dc --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/3.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/4.blk b/simulator/src/test/resources/blockAsDirExample/1/4.blk new file mode 100644 index 000000000..7a94cf50b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/4.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/5.blk b/simulator/src/test/resources/blockAsDirExample/1/5.blk new file mode 100644 index 000000000..f7c52fce5 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/5.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/6.blk b/simulator/src/test/resources/blockAsDirExample/1/6.blk new file mode 100644 index 000000000..7b6b12e77 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/6.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/7.blk b/simulator/src/test/resources/blockAsDirExample/1/7.blk new file mode 100644 index 000000000..d2b53440b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/7.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/8.blk b/simulator/src/test/resources/blockAsDirExample/1/8.blk new file mode 100644 index 000000000..9f6e1af8b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/8.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/9.blk b/simulator/src/test/resources/blockAsDirExample/1/9.blk new file mode 100644 index 000000000..3bfcd3800 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/9.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/1/notABlockFile.txt b/simulator/src/test/resources/blockAsDirExample/1/notABlockFile.txt new file mode 100644 index 000000000..a79a5780b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/1/notABlockFile.txt @@ -0,0 +1 @@ +not_empty diff --git a/simulator/src/test/resources/blockAsDirExample/2/1.blk b/simulator/src/test/resources/blockAsDirExample/2/1.blk new file mode 100644 index 000000000..3e55973d0 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/1.blk @@ -0,0 +1,3 @@ + +I ++"qui ut quis adipisicing*dolor occaecat"est*sunt sint dolor \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/10.blk b/simulator/src/test/resources/blockAsDirExample/2/10.blk new file mode 100644 index 000000000..e5b2881f1 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/10.blk @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/2.blk b/simulator/src/test/resources/blockAsDirExample/2/2.blk new file mode 100644 index 000000000..4ee31e2dd --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/2.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/3.blk b/simulator/src/test/resources/blockAsDirExample/2/3.blk new file mode 100644 index 000000000..e2851d8dc --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/3.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/4.blk b/simulator/src/test/resources/blockAsDirExample/2/4.blk new file mode 100644 index 000000000..7a94cf50b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/4.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/5.blk b/simulator/src/test/resources/blockAsDirExample/2/5.blk new file mode 100644 index 000000000..f7c52fce5 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/5.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/6.blk b/simulator/src/test/resources/blockAsDirExample/2/6.blk new file mode 100644 index 000000000..7b6b12e77 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/6.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/7.blk b/simulator/src/test/resources/blockAsDirExample/2/7.blk new file mode 100644 index 000000000..d2b53440b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/7.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/8.blk b/simulator/src/test/resources/blockAsDirExample/2/8.blk new file mode 100644 index 000000000..9f6e1af8b --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/8.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/simulator/src/test/resources/blockAsDirExample/2/9.blk b/simulator/src/test/resources/blockAsDirExample/2/9.blk new file mode 100644 index 000000000..3bfcd3800 --- /dev/null +++ b/simulator/src/test/resources/blockAsDirExample/2/9.blk @@ -0,0 +1,2 @@ + + \ No newline at end of file