diff --git a/.gitignore b/.gitignore index 25b06fa21..3d41d1db2 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ # Mobile Tools for Java (J2ME) .mtj.tmp/ +bin/ # Package Files # *.jar diff --git a/common/src/main/java/com/hedera/block/common/utils/Preconditions.java b/common/src/main/java/com/hedera/block/common/utils/Preconditions.java index 13910978d..08be1e517 100644 --- a/common/src/main/java/com/hedera/block/common/utils/Preconditions.java +++ b/common/src/main/java/com/hedera/block/common/utils/Preconditions.java @@ -75,6 +75,42 @@ public static int requirePositive(final int toCheck) { return requirePositive(toCheck, null); } + /** + * This method asserts a given long is a whole number. A long is whole + * if it is greater or equal to zero. + * + * @param toCheck the long to check if it is a whole number + * @return the number to check if it is whole number + * @throws IllegalArgumentException if the input number to check is not + * positive + */ + public static long requireWhole(final long toCheck) { + return requireWhole(toCheck, null); + } + + /** + * This method asserts a given long is a whole number. A long is whole + * if it is greater or equal to zero. + * + * @param toCheck the long to check if it is a whole number + * @param errorMessage the error message to be used in the exception if the + * input long to check is not a whole number, if null, a default message will + * be used + * @return the number to check if it is whole number + * @throws IllegalArgumentException if the input number to check is not + * positive + */ + public static long requireWhole(final long toCheck, final String errorMessage) { + if (toCheck >= 0) { + return toCheck; + } + + final String message = Objects.isNull(errorMessage) + ? "The input integer [%d] is required be whole.".formatted(toCheck) + : errorMessage; + throw new IllegalArgumentException(message); + } + /** * This method asserts a given integer is a positive. An integer is positive * if it is NOT equal to zero and is greater than zero. diff --git a/common/src/test/java/com/hedera/block/common/CommonsTestUtility.java b/common/src/test/java/com/hedera/block/common/CommonsTestUtility.java index 00c113be3..1146ca3a1 100644 --- a/common/src/test/java/com/hedera/block/common/CommonsTestUtility.java +++ b/common/src/test/java/com/hedera/block/common/CommonsTestUtility.java @@ -183,11 +183,17 @@ public static Stream positiveIntegers() { } /** - * Zero and some negative integers. + * Some whole numbers. */ - public static Stream zeroAndNegativeIntegers() { + public static Stream wholeNumbers() { + return Stream.concat(Stream.of(Arguments.of(0)), positiveIntegers()); + } + + /** + * Some negative integers. + */ + public static Stream negativeIntegers() { return Stream.of( - Arguments.of(0), Arguments.of(-1), Arguments.of(-2), Arguments.of(-3), @@ -201,5 +207,12 @@ public static Stream zeroAndNegativeIntegers() { Arguments.of(-10_000_000)); } + /** + * Zero and some negative integers. + */ + public static Stream zeroAndNegativeIntegers() { + return Stream.concat(Stream.of(Arguments.of(0)), negativeIntegers()); + } + private CommonsTestUtility() {} } diff --git a/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java b/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java index cbdd17c9f..b6c2dd8e2 100644 --- a/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java +++ b/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java @@ -67,6 +67,45 @@ void testRequireNotBlankFail(final String toTest) { .withMessage(testErrorMessage); } + /** + * This test aims to verify that the + * {@link Preconditions#requireWhole(long)} will return the input 'toTest' + * parameter if the positive check passes. Test includes overloads. + * + * @param toTest parameterized, the number to test + */ + @ParameterizedTest + @MethodSource("com.hedera.block.common.CommonsTestUtility#wholeNumbers") + void testRequireWholePass(final int toTest) { + final Consumer asserts = + actual -> assertThat(actual).isGreaterThanOrEqualTo(0).isEqualTo(toTest); + + final int actual = (int) Preconditions.requireWhole(toTest); + assertThat(actual).satisfies(asserts); + + final int actualOverload = (int) Preconditions.requireWhole(toTest, "test error message"); + assertThat(actualOverload).satisfies(asserts); + } + + /** + * This test aims to verify that the + * {@link Preconditions#requireWhole(long)} will throw an + * {@link IllegalArgumentException} if the positive check fails. Test + * includes overloads. + * + * @param toTest parameterized, the number to test + */ + @ParameterizedTest + @MethodSource("com.hedera.block.common.CommonsTestUtility#negativeIntegers") + void testRequireWholeFail(final int toTest) { + assertThatIllegalArgumentException().isThrownBy(() -> Preconditions.requireWhole(toTest)); + + final String testErrorMessage = "test error message"; + assertThatIllegalArgumentException() + .isThrownBy(() -> Preconditions.requireWhole(toTest, testErrorMessage)) + .withMessage(testErrorMessage); + } + /** * This test aims to verify that the * {@link Preconditions#requirePositive(int)} will return the input 'toTest' diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index f40431bdc..16dbc7767 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull; import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.data.StreamStatus; import com.hedera.block.simulator.config.types.SimulatorMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; @@ -101,10 +102,30 @@ public boolean isRunning() { return isRunning.get(); } - /** Stops the Block Stream Simulator and closes off all grpc channels. */ - public void stop() { + /** + * Stops the Block Stream Simulator and closes off all grpc channels. + * + * @throws InterruptedException if the thread is interrupted + */ + public void stop() throws InterruptedException { + simulatorModeHandler.stop(); + publishStreamGrpcClient.completeStreaming(); + publishStreamGrpcClient.shutdown(); isRunning.set(false); + LOGGER.log(INFO, "Block Stream Simulator has stopped"); } + + /** + * Gets the stream status from both the publisher and the consumer. + * + * @return the stream status + */ + public StreamStatus getStreamStatus() { + return StreamStatus.builder() + .publishedBlocks(publishStreamGrpcClient.getPublishedBlocks()) + .lastKnownPublisherStatuses(publishStreamGrpcClient.getLastKnownStatuses()) + .build(); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java new file mode 100644 index 000000000..68abf9ec4 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.data; + +import static com.hedera.block.common.utils.Preconditions.requireWhole; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the status of the stream. + * + * @param publishedBlocks the number of published blocks + * @param consumedBlocks the number of consumed blocks + * @param lastKnownPublisherStatuses the last known publisher statuses + * @param lastKnownConsumersStatuses the last known consumers statuses + */ +public record StreamStatus( + long publishedBlocks, + long consumedBlocks, + List lastKnownPublisherStatuses, + List lastKnownConsumersStatuses) { + + /** + * Creates a new {@link Builder} instance for constructing a {@code StreamStatus}. + * + * @return a new {@code Builder} + */ + public static Builder builder() { + return new Builder(); + } + + /** + * A builder for creating instances of {@link StreamStatus}. + */ + public static class Builder { + private long publishedBlocks = 0; + private long consumedBlocks = 0; + private List lastKnownPublisherStatuses = new ArrayList<>(); + private List lastKnownConsumersStatuses = new ArrayList<>(); + + /** + * Creates a new instance of the {@code Builder} class with default configuration values. + */ + public Builder() { + // Default constructor + } + + /** + * Sets the number of published blocks. + * + * @param publishedBlocks the number of published blocks + * @return the builder instance + */ + public Builder publishedBlocks(long publishedBlocks) { + requireWhole(publishedBlocks); + this.publishedBlocks = publishedBlocks; + return this; + } + + /** + * Sets the number of consumed blocks. + * + * @param consumedBlocks the number of consumed blocks + * @return the builder instance + */ + public Builder consumedBlocks(long consumedBlocks) { + requireWhole(consumedBlocks); + this.consumedBlocks = consumedBlocks; + return this; + } + + /** + * Sets the last known publisher statuses. + * + * @param lastKnownPublisherStatuses the last known publisher statuses + * @return the builder instance + */ + public Builder lastKnownPublisherStatuses(List lastKnownPublisherStatuses) { + requireNonNull(lastKnownPublisherStatuses); + this.lastKnownPublisherStatuses = new ArrayList<>(lastKnownPublisherStatuses); + return this; + } + + /** + * Sets the last known consumers statuses. + * + * @param lastKnownConsumersStatuses the last known consumers statuses + * @return the builder instance + */ + public Builder lastKnownConsumersStatuses(List lastKnownConsumersStatuses) { + requireNonNull(lastKnownConsumersStatuses); + this.lastKnownConsumersStatuses = new ArrayList<>(lastKnownConsumersStatuses); + return this; + } + + /** + * Builds a new {@link StreamStatus} instance. + * + * @return a new {@link StreamStatus} instance + */ + public StreamStatus build() { + return new StreamStatus( + publishedBlocks, consumedBlocks, lastKnownPublisherStatuses, lastKnownConsumersStatuses); + } + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java index 27901561d..0f07cca60 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java @@ -45,6 +45,27 @@ public interface PublishStreamGrpcClient { */ boolean streamBlock(Block block); + /** + * Sends a onCompleted message to the server and waits for a short period of time to ensure the message is sent. + * + * @throws InterruptedException if the thread is interrupted + */ + void completeStreaming() throws InterruptedException; + + /** + * Gets the number of published blocks. + * + * @return the number of published blocks + */ + long getPublishedBlocks(); + + /** + * Gets the last known statuses. + * + * @return the last known statuses + */ + List getLastKnownStatuses(); + /** * Shutdowns the channel. */ diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java index afc85ae61..e11b76b5a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -17,6 +17,7 @@ package com.hedera.block.simulator.grpc; import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlockItemsSent; +import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksSent; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.INFO; @@ -35,12 +36,14 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; /** - * The PublishStreamGrpcClientImpl class provides the methods to stream the block and block item. + * The PublishStreamGrpcClientImpl class provides the methods to stream the + * block and block item. */ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { @@ -52,14 +55,16 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { private final AtomicBoolean streamEnabled; private ManagedChannel channel; private final MetricsService metricsService; + private final List lastKnownStatuses = new ArrayList<>(); /** * Creates a new PublishStreamGrpcClientImpl instance. * - * @param grpcConfig the gRPC configuration + * @param grpcConfig the gRPC configuration * @param blockStreamConfig the block stream configuration - * @param metricsService the metrics service - * @param streamEnabled the flag responsible for enabling and disabling of the streaming + * @param metricsService the metrics service + * @param streamEnabled the flag responsible for enabling and disabling of + * the streaming */ @Inject public PublishStreamGrpcClientImpl( @@ -74,7 +79,8 @@ public PublishStreamGrpcClientImpl( } /** - * Initialize the channel and stub for publishBlockStream with the desired configuration. + * Initialize the channel and stub for publishBlockStream with the desired + * configuration. */ @Override public void init() { @@ -82,12 +88,14 @@ public void init() { .usePlaintext() .build(); BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); requestStreamObserver = stub.publishBlockStream(publishStreamObserver); + lastKnownStatuses.clear(); } /** - * The PublishStreamObserver class implements the StreamObserver interface to observe the + * The PublishStreamObserver class implements the StreamObserver interface to + * observe the * stream. */ @Override @@ -113,7 +121,8 @@ public boolean streamBlockItem(List blockItems) { } /** - * The PublishStreamObserver class implements the StreamObserver interface to observe the + * The PublishStreamObserver class implements the StreamObserver interface to + * observe the * stream. */ @Override @@ -138,10 +147,46 @@ public boolean streamBlock(Block block) { break; } } - + metricsService.get(LiveBlocksSent).increment(); return streamEnabled.get(); } + /** + * Sends a onCompleted message to the server and waits for a short period of + * time to ensure the message is sent. + * + * @throws InterruptedException if the thread is interrupted + */ + @Override + public void completeStreaming() throws InterruptedException { + requestStreamObserver.onCompleted(); + // todo(352) Find a suitable solution for removing the sleep + Thread.sleep(100); + } + + /** + * Gets the number of published blocks. + * + * @return the number of published blocks + */ + @Override + public long getPublishedBlocks() { + return metricsService.get(LiveBlocksSent).get(); + } + + /** + * Gets the last known statuses. + * + * @return the last known statuses + */ + @Override + public List getLastKnownStatuses() { + return List.copyOf(lastKnownStatuses); + } + + /** + * Shutdowns the channel. + */ @Override public void shutdown() { channel.shutdown(); diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java index 8daddfae1..a843a16c1 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java @@ -24,36 +24,50 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.lang.System.Logger; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** - * The PublishStreamObserver class provides the methods to observe the stream of the published + * The PublishStreamObserver class provides the methods to observe the stream of + * the published * stream. */ public class PublishStreamObserver implements StreamObserver { private final Logger logger = System.getLogger(getClass().getName()); private final AtomicBoolean streamEnabled; + private final List lastKnownStatuses; - /** Creates a new PublishStreamObserver instance. + /** + * Creates a new PublishStreamObserver instance. * - * @param streamEnabled is responsible for signaling, whether streaming should continue + * @param streamEnabled is responsible for signaling, whether streaming + * should continue + * @param lastKnownStatuses the last known statuses */ - public PublishStreamObserver(@NonNull final AtomicBoolean streamEnabled) { + public PublishStreamObserver( + @NonNull final AtomicBoolean streamEnabled, @NonNull final List lastKnownStatuses) { this.streamEnabled = requireNonNull(streamEnabled); + this.lastKnownStatuses = requireNonNull(lastKnownStatuses); } /** what will the stream observer do with the response from the server */ @Override public void onNext(PublishStreamResponse publishStreamResponse) { + lastKnownStatuses.add(publishStreamResponse.toString()); logger.log(INFO, "Received Response: " + publishStreamResponse.toString()); } - /** Responsible for stream observer behaviour, in case of error. For now, we will stop the stream for every error. In the future we'd want to have a retry mechanism depending on the error. */ + /** + * Responsible for stream observer behaviour, in case of error. For now, we will + * stop the stream for every error. In the future we'd want to have a retry + * mechanism depending on the error. + */ @Override public void onError(@NonNull final Throwable streamError) { streamEnabled.set(false); Status status = Status.fromThrowable(streamError); + lastKnownStatuses.add(status.toString()); logger.log(Logger.Level.ERROR, "Error %s with status %s.".formatted(streamError, status), streamError); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java index 3a0c5c1e8..3f6f0c164 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java +++ b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java @@ -36,7 +36,9 @@ private SimulatorMetricTypes() {} public enum Counter implements SimulatorMetricMetadata { // Standard counters /** The number of live block items sent by the simulator . */ - LiveBlockItemsSent("live_block_items_sent", "Live Block Items Sent"); + LiveBlockItemsSent("live_block_items_sent", "Live Block Items Sent"), + /** The number of live blocks sent by the simulator */ + LiveBlocksSent("live_blocks_sent", "Live Blocks Sent"); private final String grafanaLabel; private final String description; diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java index 4a4353e1c..a4b092e0e 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java @@ -56,4 +56,12 @@ public CombinedModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) { public void start() { throw new UnsupportedOperationException(); } + + /** + * Stops the handler and manager from streaming. + */ + @Override + public void stop() { + throw new UnsupportedOperationException(); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java index 50a66ad27..2b593867a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java @@ -53,4 +53,12 @@ public ConsumerModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) { public void start() { throw new UnsupportedOperationException(); } + + /** + * Stops the handler and manager from streaming. + */ + @Override + public void stop() { + throw new UnsupportedOperationException(); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java index 83cbb1d29..ff89b1a33 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java @@ -30,14 +30,19 @@ import com.hedera.hapi.block.stream.protoc.Block; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** - * The {@code PublisherModeHandler} class implements the {@link SimulatorModeHandler} interface + * The {@code PublisherModeHandler} class implements the + * {@link SimulatorModeHandler} interface * and provides the behavior for a mode where only publishing of block data * occurs. * - *

This mode handles single operation in the block streaming process, utilizing the - * {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where + *

+ * This mode handles single operation in the block streaming process, utilizing + * the + * {@link BlockStreamConfig} for configuration settings. It is designed for + * scenarios where * the simulator needs to handle publication of blocks. */ public class PublisherModeHandler implements SimulatorModeHandler { @@ -49,14 +54,19 @@ public class PublisherModeHandler implements SimulatorModeHandler { private final int delayBetweenBlockItems; private final int millisecondsPerBlock; private final MetricsService metricsService; + private final AtomicBoolean shouldPublish = new AtomicBoolean(true); /** - * Constructs a new {@code PublisherModeHandler} with the specified block stream configuration and publisher client. + * Constructs a new {@code PublisherModeHandler} with the specified block stream + * configuration and publisher client. * - * @param blockStreamConfig the configuration data for managing block streams + * @param blockStreamConfig the configuration data for managing block + * streams * @param publishStreamGrpcClient the grpc client used for streaming blocks - * @param blockStreamManager the block stream manager, responsible for generating blocks - * @param metricsService the metrics service to record and report usage statistics + * @param blockStreamManager the block stream manager, responsible for + * generating blocks + * @param metricsService the metrics service to record and report usage + * statistics */ public PublisherModeHandler( @NonNull final BlockStreamConfig blockStreamConfig, @@ -75,9 +85,17 @@ public PublisherModeHandler( /** * Starts the simulator and initiate streaming, depending on the working mode. + * + * @throws BlockSimulatorParsingException if an error occurs while parsing + * blocks + * @throws IOException if an I/O error occurs during block + * streaming + * @throws InterruptedException if the thread running the simulator is + * interrupted */ @Override public void start() throws BlockSimulatorParsingException, IOException, InterruptedException { + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started streaming."); if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) { millisPerBlockStreaming(); } else { @@ -90,7 +108,7 @@ private void millisPerBlockStreaming() throws IOException, InterruptedException, final long secondsPerBlockNanos = (long) millisecondsPerBlock * NANOS_PER_MILLI; Block nextBlock = blockStreamManager.getNextBlock(); - while (nextBlock != null) { + while (nextBlock != null && shouldPublish.get()) { long startTime = System.nanoTime(); if (!publishStreamGrpcClient.streamBlock(nextBlock)) { LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator stopped streaming due to errors."); @@ -122,10 +140,9 @@ private void millisPerBlockStreaming() throws IOException, InterruptedException, private void constantRateStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException { int delayMSBetweenBlockItems = delayBetweenBlockItems / NANOS_PER_MILLI; int delayNSBetweenBlockItems = delayBetweenBlockItems % NANOS_PER_MILLI; - boolean streamBlockItem = true; int blockItemsStreamed = 0; - while (streamBlockItem) { + while (shouldPublish.get()) { // get block Block block = blockStreamManager.getNextBlock(); @@ -144,8 +161,16 @@ private void constantRateStreaming() throws InterruptedException, IOException, B if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) { LOGGER.log(INFO, "Block Stream Simulator has reached the maximum number of block items to" + " stream"); - streamBlockItem = false; + shouldPublish.set(false); } } } + + /** + * Stops the handler and manager from streaming. + */ + @Override + public void stop() { + shouldPublish.set(false); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java index c80685600..273025e95 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java @@ -48,4 +48,9 @@ public interface SimulatorModeHandler { * @throws InterruptedException if the thread running the simulator is interrupted */ void start() throws BlockSimulatorParsingException, IOException, InterruptedException; + + /** + * Stops the handler and manager from streaming. + */ + void stop(); } diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 67eff4690..bb217c89d 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -18,14 +18,19 @@ import static com.hedera.block.simulator.TestUtils.getTestMetrics; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.data.StreamStatus; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; @@ -75,8 +80,12 @@ void setUp() throws IOException { } @AfterEach - void tearDown() { - blockStreamSimulator.stop(); + void tearDown() throws InterruptedException { + try { + blockStreamSimulator.stop(); + } catch (UnsupportedOperationException e) { + // @todo (121) Implement consumer logic in the Simulator, which will fix this + } } @Test @@ -126,9 +135,10 @@ private String getAbsoluteFolder(String relativePath) { } @Test - void stop_doesNotThrowException() { + void stop_doesNotThrowException() throws InterruptedException { assertDoesNotThrow(() -> blockStreamSimulator.stop()); assertFalse(blockStreamSimulator.isRunning()); + verify(publishStreamGrpcClient, atLeast(1)).completeStreaming(); } @Test @@ -234,6 +244,29 @@ void constructor_throwsExceptionForNullSimulatorMode() { }); } + @Test + void testGetStreamStatus() { + long expectedPublishedBlocks = 5; + List expectedLastKnownStatuses = List.of("Status1", "Status2"); + + when(publishStreamGrpcClient.getPublishedBlocks()).thenReturn(expectedPublishedBlocks); + when(publishStreamGrpcClient.getLastKnownStatuses()).thenReturn(expectedLastKnownStatuses); + + StreamStatus streamStatus = blockStreamSimulator.getStreamStatus(); + + assertNotNull(streamStatus, "StreamStatus should not be null"); + assertEquals(expectedPublishedBlocks, streamStatus.publishedBlocks(), "Published blocks should match"); + assertEquals( + expectedLastKnownStatuses, + streamStatus.lastKnownPublisherStatuses(), + "Last known statuses should match"); + assertEquals(0, streamStatus.consumedBlocks(), "Consumed blocks should be 0 by default"); + assertEquals( + 0, + streamStatus.lastKnownConsumersStatuses().size(), + "Last known consumers statuses should be empty by default"); + } + private List captureLogs() { // Capture logs Logger logger = Logger.getLogger(PublisherModeHandler.class.getName()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java new file mode 100644 index 000000000..6bc772188 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java @@ -0,0 +1,221 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class StreamStatusTest { + + @Test + void testBuilderDefaultValues() { + StreamStatus streamStatus = StreamStatus.builder().build(); + + assertEquals(0, streamStatus.publishedBlocks(), "Default publishedBlocks should be 0"); + assertEquals(0, streamStatus.consumedBlocks(), "Default consumedBlocks should be 0"); + assertNotNull(streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should not be null"); + assertTrue(streamStatus.lastKnownPublisherStatuses().isEmpty(), "lastKnownPublisherStatuses should be empty"); + assertNotNull(streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should not be null"); + assertTrue(streamStatus.lastKnownConsumersStatuses().isEmpty(), "lastKnownConsumersStatuses should be empty"); + } + + @Test + void testBuilderWithValues() { + List publisherStatuses = List.of("Publisher1", "Publisher2"); + List consumerStatuses = List.of("Consumer1"); + + StreamStatus streamStatus = StreamStatus.builder() + .publishedBlocks(10) + .consumedBlocks(8) + .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownConsumersStatuses(consumerStatuses) + .build(); + + assertEquals(10, streamStatus.publishedBlocks(), "publishedBlocks should be 10"); + assertEquals(8, streamStatus.consumedBlocks(), "consumedBlocks should be 8"); + assertEquals( + publisherStatuses, + streamStatus.lastKnownPublisherStatuses(), + "lastKnownPublisherStatuses should match"); + assertEquals( + consumerStatuses, streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should match"); + } + + @Test + void testBuilderSetters() { + StreamStatus.Builder builder = StreamStatus.builder(); + + builder.publishedBlocks(5); + builder.consumedBlocks(3); + builder.lastKnownPublisherStatuses(List.of("PubStatus")); + builder.lastKnownConsumersStatuses(List.of("ConStatus")); + + StreamStatus streamStatus = builder.build(); + + assertEquals(5, streamStatus.publishedBlocks(), "publishedBlocks should be 5"); + assertEquals(3, streamStatus.consumedBlocks(), "consumedBlocks should be 3"); + assertEquals( + List.of("PubStatus"), + streamStatus.lastKnownPublisherStatuses(), + "lastKnownPublisherStatuses should match"); + assertEquals( + List.of("ConStatus"), + streamStatus.lastKnownConsumersStatuses(), + "lastKnownConsumersStatuses should match"); + } + + @Test + void testBuilderDefaultConstructor() { + StreamStatus.Builder builder = new StreamStatus.Builder(); + + assertNotNull(builder, "Builder should not be null"); + + StreamStatus streamStatus = builder.build(); + + assertEquals(0, streamStatus.publishedBlocks(), "Default publishedBlocks should be 0"); + assertEquals(0, streamStatus.consumedBlocks(), "Default consumedBlocks should be 0"); + assertNotNull(streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should not be null"); + assertTrue(streamStatus.lastKnownPublisherStatuses().isEmpty(), "lastKnownPublisherStatuses should be empty"); + assertNotNull(streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should not be null"); + assertTrue(streamStatus.lastKnownConsumersStatuses().isEmpty(), "lastKnownConsumersStatuses should be empty"); + } + + @Test + void testEqualsAndHashCode() { + List publisherStatuses = List.of("Publisher1"); + List consumerStatuses = List.of("Consumer1"); + + StreamStatus streamStatus1 = StreamStatus.builder() + .publishedBlocks(5) + .consumedBlocks(3) + .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownConsumersStatuses(consumerStatuses) + .build(); + + StreamStatus streamStatus2 = StreamStatus.builder() + .publishedBlocks(5) + .consumedBlocks(3) + .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownConsumersStatuses(consumerStatuses) + .build(); + + assertEquals(streamStatus1, streamStatus2, "StreamStatus instances should be equal"); + assertEquals(streamStatus1.hashCode(), streamStatus2.hashCode(), "Hash codes should be equal"); + } + + @Test + void testNotEquals() { + StreamStatus streamStatus1 = + StreamStatus.builder().publishedBlocks(5).consumedBlocks(3).build(); + + StreamStatus streamStatus2 = + StreamStatus.builder().publishedBlocks(6).consumedBlocks(3).build(); + + assertNotEquals(streamStatus1, streamStatus2, "StreamStatus instances should not be equal"); + } + + @Test + void testToString() { + List publisherStatuses = List.of("Pub1"); + List consumerStatuses = List.of("Con1"); + + StreamStatus streamStatus = StreamStatus.builder() + .publishedBlocks(5) + .consumedBlocks(3) + .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownConsumersStatuses(consumerStatuses) + .build(); + + String toString = streamStatus.toString(); + + assertNotNull(toString, "toString() should not return null"); + assertTrue(toString.contains("publishedBlocks=5"), "toString() should contain 'publishedBlocks=5'"); + assertTrue(toString.contains("consumedBlocks=3"), "toString() should contain 'consumedBlocks=3'"); + assertTrue( + toString.contains("lastKnownPublisherStatuses=[Pub1]"), + "toString() should contain 'lastKnownPublisherStatuses=[Pub1]'"); + assertTrue( + toString.contains("lastKnownConsumersStatuses=[Con1]"), + "toString() should contain 'lastKnownConsumersStatuses=[Con1]'"); + } + + @Test + void testStatusesLists() { + List publisherStatuses = new ArrayList<>(); + List consumerStatuses = new ArrayList<>(); + + publisherStatuses.add("Publisher1"); + consumerStatuses.add("Consumer1"); + + StreamStatus streamStatus = StreamStatus.builder() + .publishedBlocks(1) + .consumedBlocks(1) + .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownConsumersStatuses(consumerStatuses) + .build(); + + publisherStatuses.add("Publisher2"); + consumerStatuses.add("Consumer2"); + + assertNotEquals( + List.of("Publisher1", "Publisher2"), + streamStatus.lastKnownPublisherStatuses(), + "lastKnownPublisherStatuses should be immutable"); + assertNotEquals( + List.of("Consumer1", "Consumer2"), + streamStatus.lastKnownConsumersStatuses(), + "lastKnownConsumersStatuses should be immutable"); + } + + @Test + void testNullLists() { + assertThrows(NullPointerException.class, () -> StreamStatus.builder() + .publishedBlocks(0) + .consumedBlocks(0) + .lastKnownPublisherStatuses(null) + .lastKnownConsumersStatuses(null) + .build()); + } + + @Test + void testBuilderChaining() { + StreamStatus streamStatus = StreamStatus.builder() + .publishedBlocks(2) + .consumedBlocks(2) + .lastKnownPublisherStatuses(List.of("PubStatus")) + .lastKnownConsumersStatuses(List.of("ConStatus")) + .build(); + + assertEquals(2, streamStatus.publishedBlocks(), "publishedBlocks should be 2"); + assertEquals(2, streamStatus.consumedBlocks(), "consumedBlocks should be 2"); + assertEquals( + List.of("PubStatus"), + streamStatus.lastKnownPublisherStatuses(), + "lastKnownPublisherStatuses should match"); + assertEquals( + List.of("ConStatus"), + streamStatus.lastKnownConsumersStatuses(), + "lastKnownConsumersStatuses should match"); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java index 2d1a196cc..64ec0f7d3 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java @@ -17,7 +17,9 @@ package com.hedera.block.simulator.grpc; import static com.hedera.block.simulator.TestUtils.getTestMetrics; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -91,11 +93,31 @@ void streamBlock() { new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); publishStreamGrpcClient.init(); + assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty()); + boolean result = publishStreamGrpcClient.streamBlock(block); assertTrue(result); boolean result1 = publishStreamGrpcClient.streamBlock(block1); assertTrue(result1); + + assertEquals(2, publishStreamGrpcClient.getPublishedBlocks()); + } + + @Test + void streamBlockFailsBecauseOfCompletedStreaming() throws InterruptedException { + BlockItem blockItem = BlockItem.newBuilder().build(); + Block block = Block.newBuilder().addItems(blockItem).build(); + + PublishStreamGrpcClientImpl publishStreamGrpcClient = + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); + + publishStreamGrpcClient.init(); + assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty()); + + publishStreamGrpcClient.completeStreaming(); + + assertThrows(IllegalStateException.class, () -> publishStreamGrpcClient.streamBlock(block)); } @Test diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java index 178178b4c..967c3c3eb 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java @@ -16,10 +16,13 @@ package com.hedera.block.simulator.grpc; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.hedera.hapi.block.protoc.PublishStreamResponse; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -29,27 +32,33 @@ class PublishStreamObserverTest { void onNext() { PublishStreamResponse response = PublishStreamResponse.newBuilder().build(); AtomicBoolean streamEnabled = new AtomicBoolean(true); + List lastKnownStatuses = new ArrayList<>(); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); publishStreamObserver.onNext(response); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted"); + assertEquals(1, lastKnownStatuses.size(), "lastKnownStatuses should have one element after onNext"); } @Test void onError() { AtomicBoolean streamEnabled = new AtomicBoolean(true); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); + List lastKnownStatuses = new ArrayList<>(); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); publishStreamObserver.onError(new Throwable()); assertFalse(streamEnabled.get(), "streamEnabled should be set to false after onError"); + assertEquals(1, lastKnownStatuses.size(), "lastKnownStatuses should have one element after onError"); } @Test void onCompleted() { AtomicBoolean streamEnabled = new AtomicBoolean(true); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); + List lastKnownStatuses = new ArrayList<>(); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); publishStreamObserver.onCompleted(); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted"); + assertEquals(0, lastKnownStatuses.size(), "lastKnownStatuses should not have elements after onCompleted"); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/metrics/MetricsServiceTest.java b/simulator/src/test/java/com/hedera/block/simulator/metrics/MetricsServiceTest.java index e77451465..a14804e5b 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/metrics/MetricsServiceTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/metrics/MetricsServiceTest.java @@ -18,6 +18,7 @@ import static com.hedera.block.simulator.TestUtils.getTestMetrics; import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlockItemsSent; +import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksSent; import static org.junit.jupiter.api.Assertions.assertEquals; import com.hedera.block.simulator.TestUtils; @@ -51,4 +52,19 @@ void MetricsService_verifyLiveBlockItemsSentCounter() { metricsService.get(LiveBlockItemsSent).getDescription()); assertEquals(10, metricsService.get(LiveBlockItemsSent).get()); } + + @Test + void MetricsService_verifyLiveBlocksSentCounter() { + + for (int i = 0; i < 10; i++) { + metricsService.get(LiveBlocksSent).increment(); + } + + assertEquals( + LiveBlocksSent.grafanaLabel(), + metricsService.get(LiveBlocksSent).getName()); + assertEquals( + LiveBlocksSent.description(), metricsService.get(LiveBlocksSent).getDescription()); + assertEquals(10, metricsService.get(LiveBlocksSent).get()); + } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java index 78ee4b250..3c600535f 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java @@ -114,6 +114,48 @@ void testStartWithMillisPerBlockStreaming_NoBlocks() throws Exception { verify(blockStreamManager).getNextBlock(); } + @Test + void testStartWithMillisPerBlockStreaming_ShouldPublishFalse() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + + publisherModeHandler = new PublisherModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(null); + when(publishStreamGrpcClient.streamBlock(any(Block.class))).thenReturn(true); + + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); + + publisherModeHandler.stop(); + publisherModeHandler.start(); + + verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); + verify(blockStreamManager).getNextBlock(); + } + + @Test + void testStartWithMillisPerBlockStreaming_NoBlocksAndShouldPublishFalse() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + + publisherModeHandler = new PublisherModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + + when(blockStreamManager.getNextBlock()).thenReturn(null); + + publisherModeHandler.stop(); + publisherModeHandler.start(); + + verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); + verify(blockStreamManager).getNextBlock(); + } + @Test void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); @@ -151,6 +193,51 @@ void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { verify(blockStreamManager, times(3)).getNextBlock(); } + @Test + void testStartWithConstantRateStreaming_ExceedingMaxItems() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); + when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); + when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(5); + + publisherModeHandler = new PublisherModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + when(publishStreamGrpcClient.streamBlock(any(Block.class))).thenReturn(true); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + Block block3 = mock(Block.class); + Block block4 = mock(Block.class); + + BlockItem blockItem1 = mock(BlockItem.class); + BlockItem blockItem2 = mock(BlockItem.class); + BlockItem blockItem3 = mock(BlockItem.class); + BlockItem blockItem4 = mock(BlockItem.class); + + when(block1.getItemsList()).thenReturn(Arrays.asList(blockItem1, blockItem2)); + when(block2.getItemsList()).thenReturn(Arrays.asList(blockItem3, blockItem4)); + when(block3.getItemsList()).thenReturn(Arrays.asList(blockItem1, blockItem2)); + when(block4.getItemsList()).thenReturn(Arrays.asList(blockItem3, blockItem4)); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(block3) + .thenReturn(block4); + + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block3)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block4)).thenReturn(true); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient).streamBlock(block1); + verify(publishStreamGrpcClient).streamBlock(block2); + verify(publishStreamGrpcClient).streamBlock(block3); + verifyNoMoreInteractions(publishStreamGrpcClient); + verify(blockStreamManager, times(3)).getNextBlock(); + } + @Test void testStartWithConstantRateStreaming_NoBlocks() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); diff --git a/suites/src/main/java/com/hedera/block/suites/BaseSuite.java b/suites/src/main/java/com/hedera/block/suites/BaseSuite.java index 96a31d1ea..54408c41d 100644 --- a/suites/src/main/java/com/hedera/block/suites/BaseSuite.java +++ b/suites/src/main/java/com/hedera/block/suites/BaseSuite.java @@ -17,6 +17,8 @@ package com.hedera.block.suites; import com.hedera.block.simulator.BlockStreamSimulatorApp; +import com.hedera.block.simulator.BlockStreamSimulatorInjectionComponent; +import com.hedera.block.simulator.DaggerBlockStreamSimulatorInjectionComponent; import com.swirlds.config.api.Configuration; import com.swirlds.config.api.ConfigurationBuilder; import com.swirlds.config.extensions.sources.ClasspathFileConfigSource; @@ -27,6 +29,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.GenericContainer; @@ -54,8 +57,8 @@ public abstract class BaseSuite { /** Port that is used by the Block Node Application */ protected static int blockNodePort; - /** Block Simulator Application instance */ - protected static BlockStreamSimulatorApp blockStreamSimulatorApp; + /** Executor service for managing threads */ + protected static ErrorLoggingExecutor executorService; /** * Default constructor for the BaseSuite class. @@ -76,6 +79,7 @@ public BaseSuite() { public static void setup() { blockNodeContainer = createContainer(); blockNodeContainer.start(); + executorService = new ErrorLoggingExecutor(); } /** @@ -89,6 +93,9 @@ public static void teardown() { if (blockNodeContainer != null) { blockNodeContainer.stop(); } + if (executorService != null) { + executorService.shutdownNow(); + } } /** @@ -121,6 +128,34 @@ protected static GenericContainer createContainer() { return blockNodeContainer; } + /** + * Starts the block stream simulator in a separate thread. + * + * @param blockStreamSimulatorAppInstance the block stream simulator app instance + * @return a {@link Future} representing the asynchronous execution of the block stream simulator + */ + protected Future startSimulatorInThread(BlockStreamSimulatorApp blockStreamSimulatorAppInstance) { + return executorService.submit(() -> { + try { + blockStreamSimulatorAppInstance.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Creates a new instance of the block stream simulator. + * + * @return a new instance of the block stream simulator + * @throws IOException if an I/O error occurs + */ + protected BlockStreamSimulatorApp createBlockSimulator() throws IOException { + BlockStreamSimulatorInjectionComponent DIComponent = + DaggerBlockStreamSimulatorInjectionComponent.factory().create(loadSimulatorDefaultConfiguration()); + return DIComponent.getBlockStreamSimulatorApp(); + } + /** * Builds the default block simulator configuration * diff --git a/suites/src/main/java/com/hedera/block/suites/ErrorLoggingExecutor.java b/suites/src/main/java/com/hedera/block/suites/ErrorLoggingExecutor.java new file mode 100644 index 000000000..e89527cae --- /dev/null +++ b/suites/src/main/java/com/hedera/block/suites/ErrorLoggingExecutor.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.suites; + +import static java.lang.System.Logger.Level.ERROR; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An executor service that extends {@link ThreadPoolExecutor} to provide custom configurations + * and enhanced error logging for tasks executed within the thread pool. + * + *

This executor is configured with: + *

    + *
  • Core pool size: 8 threads + *
  • Maximum pool size: 8 threads + *
  • Keep-alive time: 10 seconds + *
  • Work queue: {@link LinkedBlockingQueue} with default capacity + *
+ * + *

The executor overrides the {@link #afterExecute(Runnable, Throwable)} method to capture and log + * any exceptions thrown during task execution, including exceptions thrown from {@code Runnable} tasks + * and uncaught exceptions from {@code Future} tasks. This aids in debugging and error tracking + * by ensuring that all exceptions are properly logged. + */ +public class ErrorLoggingExecutor extends ThreadPoolExecutor { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + /** + * Constructs a new {@code ErrorLoggingExecutor} with a fixed thread pool configuration. + * + *

The executor is set up with: + *

    + *
  • Core pool size of 8 threads: the minimum number of threads to keep in the pool. + *
  • Maximum pool size of 8 threads: the maximum number of threads allowed in the pool. + *
  • Keep-alive time of 10 seconds: the maximum time that excess idle threads will wait for new tasks before terminating. + *
  • Work queue: a {@link LinkedBlockingQueue} to hold tasks before they are executed. + *
+ * + *

This configuration ensures that the thread pool maintains 8 threads and does not create additional threads beyond that number. + */ + public ErrorLoggingExecutor() { + super(8, 8, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + + /** + * Invoked after the execution of the given {@code Runnable} task. This method provides an opportunity to perform + * actions after each task execution, such as logging exceptions thrown by the task. + * + *

This implementation enhances error logging by capturing exceptions that may not have been + * detected during the execution of the task. It handles both uncaught exceptions from {@code Runnable} tasks + * and exceptions thrown during the computation of {@code Future} tasks. + * + *

Specifically, if the {@code Throwable} parameter {@code t} is {@code null}, indicating that no exception + * was thrown during execution, but the task is an instance of {@code Future} and is done, it attempts to retrieve + * the result or exception from the {@code Future}. Any exceptions encountered are then logged. + * + *

If the task execution resulted in an exception, this method logs the error message and stack trace using the {@code System.Logger}. + * + * @param r the runnable task that has completed execution + * @param t the exception that caused termination, or {@code null} if execution completed normally + */ + @Override + protected void afterExecute(Runnable r, Throwable t) { + Throwable localThrowable = t; + if (localThrowable == null && r instanceof Future && ((Future) r).isDone()) { + try { + final Object result = ((Future) r).get(); + } catch (final CancellationException ce) { + localThrowable = ce; + } catch (final ExecutionException ee) { + localThrowable = ee.getCause(); + } catch (final InterruptedException ie) { // ignore/reset + Thread.currentThread().interrupt(); + } + } + if (localThrowable != null) { + LOGGER.log(ERROR, "Task encountered an error: ", localThrowable); + } + } +} diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/GrpcTestSuites.java b/suites/src/main/java/com/hedera/block/suites/grpc/GrpcTestSuites.java index 9210c8f9b..e7175edd8 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/GrpcTestSuites.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/GrpcTestSuites.java @@ -17,6 +17,7 @@ package com.hedera.block.suites.grpc; import com.hedera.block.suites.grpc.negative.NegativeServerAvailabilityTests; +import com.hedera.block.suites.grpc.positive.PositiveEndpointBehaviourTests; import com.hedera.block.suites.grpc.positive.PositiveServerAvailabilityTests; import org.junit.platform.suite.api.SelectClasses; import org.junit.platform.suite.api.Suite; @@ -30,7 +31,11 @@ * classes in a single test run. */ @Suite -@SelectClasses({PositiveServerAvailabilityTests.class, NegativeServerAvailabilityTests.class}) +@SelectClasses({ + PositiveServerAvailabilityTests.class, + PositiveEndpointBehaviourTests.class, + NegativeServerAvailabilityTests.class +}) public class GrpcTestSuites { /** diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java new file mode 100644 index 000000000..d51b7b592 --- /dev/null +++ b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.suites.grpc.positive; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.hedera.block.simulator.BlockStreamSimulatorApp; +import com.hedera.block.simulator.config.data.StreamStatus; +import com.hedera.block.suites.BaseSuite; +import java.io.IOException; +import java.util.concurrent.Future; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +/** + * Test class for verifying the positive scenarios for server availability, specifically related to + * the gRPC server. This class contains tests to check that the gRPC server and the exposed endpoints are working as expected + * and returning correct responses on valid requests. + * + *

Inherits from {@link BaseSuite} to reuse the container setup and teardown logic for the Block + * Node. + */ +@DisplayName("Positive Endpoint Behaviour Tests") +public class PositiveEndpointBehaviourTests extends BaseSuite { + + private BlockStreamSimulatorApp blockStreamSimulatorApp; + + private Future simulatorThread; + + @AfterEach + void teardownEnvironment() { + if (simulatorThread != null && !simulatorThread.isCancelled()) { + simulatorThread.cancel(true); + } + } + /** Default constructor for the {@link PositiveEndpointBehaviourTests} class. */ + public PositiveEndpointBehaviourTests() {} + + /** + * Tests the {@code PublishBlockStream} gRPC endpoint by starting the Block Stream Simulator, + * allowing it to publish blocks, and validating the following: + * + *

    + *
  • The number of published blocks is greater than zero. + *
  • The number of published blocks matches the size of the last known publisher statuses. + *
  • Each publisher status contains the word "acknowledgement" to confirm successful + * responses. + *
+ * + * @throws IOException if there is an error starting or stopping the Block Stream Simulator. + * @throws InterruptedException if the simulator thread is interrupted during execution. + */ + @Test + void verifyPublishBlockStreamEndpoint() throws IOException, InterruptedException { + blockStreamSimulatorApp = createBlockSimulator(); + simulatorThread = startSimulatorInThread(blockStreamSimulatorApp); + Thread.sleep(5000); + blockStreamSimulatorApp.stop(); + StreamStatus streamStatus = blockStreamSimulatorApp.getStreamStatus(); + assertTrue(streamStatus.publishedBlocks() > 0); + assertEquals( + streamStatus.publishedBlocks(), + streamStatus.lastKnownPublisherStatuses().size()); + + // Verify each status contains the word "acknowledgement" + streamStatus + .lastKnownPublisherStatuses() + .forEach(status -> assertTrue(status.toLowerCase().contains("acknowledgement"))); + } +} diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveServerAvailabilityTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveServerAvailabilityTests.java index e675e3f56..480fc0807 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveServerAvailabilityTests.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveServerAvailabilityTests.java @@ -40,7 +40,7 @@ public PositiveServerAvailabilityTests() {} /** * Test to verify that the gRPC server starts successfully. * - *

The test checks if the Block Node container is running and marked as healthy. + *

The test checks if the Block Node container is running and marked as healthy. */ @Test public void verifyGrpcServerStartsSuccessfully() { @@ -57,10 +57,7 @@ public void verifyGrpcServerStartsSuccessfully() { @Test public void verifyGrpcServerListeningOnCorrectPort() { assertTrue(blockNodeContainer.isRunning(), "Block Node container should be running."); - assertEquals( - 1, - blockNodeContainer.getExposedPorts().size(), - "There should be exactly one exposed port."); + assertEquals(1, blockNodeContainer.getExposedPorts().size(), "There should be exactly one exposed port."); assertEquals( blockNodePort, blockNodeContainer.getExposedPorts().getFirst(), diff --git a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java index 43cc984b6..26bf66fe5 100644 --- a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java +++ b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java @@ -20,14 +20,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.hedera.block.simulator.BlockStreamSimulatorApp; -import com.hedera.block.simulator.BlockStreamSimulatorInjectionComponent; -import com.hedera.block.simulator.DaggerBlockStreamSimulatorInjectionComponent; import com.hedera.block.suites.BaseSuite; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; @@ -41,19 +37,19 @@ @DisplayName("Positive Data Persistence Tests") public class PositiveDataPersistenceTests extends BaseSuite { private final String[] GET_BLOCKS_COMMAND = new String[] {"ls", "data", "-1"}; - private ExecutorService executorService; + + private BlockStreamSimulatorApp blockStreamSimulatorApp; + + private Future simulatorThread; /** Default constructor for the {@link PositiveDataPersistenceTests} class. */ public PositiveDataPersistenceTests() {} - @BeforeEach - void setupEnvironment() { - executorService = Executors.newFixedThreadPool(2); - } - @AfterEach void teardownEnvironment() { - executorService.shutdownNow(); + if (simulatorThread != null && !simulatorThread.isCancelled()) { + simulatorThread.cancel(true); + } } /** @@ -70,8 +66,8 @@ public void verifyBlockDataSavedInCorrectDirectory() throws InterruptedException String savedBlocksFolderBefore = getContainerCommandResult(GET_BLOCKS_COMMAND); int savedBlocksCountBefore = getSavedBlocksCount(savedBlocksFolderBefore); - BlockStreamSimulatorApp blockStreamSimulatorApp = createBlockSimulator(); - startSimulatorThread(blockStreamSimulatorApp); + blockStreamSimulatorApp = createBlockSimulator(); + simulatorThread = startSimulatorInThread(blockStreamSimulatorApp); Thread.sleep(5000); blockStreamSimulatorApp.stop(); @@ -81,22 +77,7 @@ public void verifyBlockDataSavedInCorrectDirectory() throws InterruptedException assertTrue(savedBlocksFolderBefore.isEmpty()); assertFalse(savedBlocksFolderAfter.isEmpty()); assertTrue(savedBlocksCountAfter > savedBlocksCountBefore); - } - - private BlockStreamSimulatorApp createBlockSimulator() throws IOException { - BlockStreamSimulatorInjectionComponent DIComponent = - DaggerBlockStreamSimulatorInjectionComponent.factory().create(loadSimulatorDefaultConfiguration()); - return DIComponent.getBlockStreamSimulatorApp(); - } - - private void startSimulatorThread(BlockStreamSimulatorApp blockStreamSimulatorAppInstance) { - executorService.submit(() -> { - try { - blockStreamSimulatorAppInstance.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + assertTrue(blockStreamSimulatorApp.getStreamStatus().publishedBlocks() > 0); } private int getSavedBlocksCount(String blocksFolders) {