From 71ed006d243c30100ca525162c61a23662eceb76 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Sun, 4 Aug 2024 16:20:04 -0600 Subject: [PATCH] fix: added javadoc Signed-off-by: Matt Peterson --- .../block/server/BlockStreamService.java | 35 ++++++++++++------- .../hedera/block/server/data/ObjectEvent.java | 16 +++++++++ .../mediator/LiveStreamMediatorImpl.java | 28 +++++++++++---- .../block/server/mediator/Publisher.java | 10 +++--- .../block/server/mediator/StreamMediator.java | 6 ++-- .../server/mediator/SubscriptionHandler.java | 2 +- .../persistence/storage/BlockAsDirWriter.java | 23 ++++++++++++ .../persistence/storage/BlockReader.java | 13 +++++++ .../persistence/storage/BlockRemover.java | 8 +++++ .../persistence/storage/BlockWriter.java | 12 +++++++ .../server/persistence/storage/Util.java | 3 +- .../block/server/producer/ItemAckBuilder.java | 7 +++- .../producer/ProducerBlockItemObserver.java | 21 ++++++++--- .../hedera/block/server/producer/Util.java | 11 +++++- 14 files changed, 160 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index bf13cbff8..5a05f03c7 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -33,14 +33,9 @@ import java.util.Optional; /** - * This class implements the GrpcService interface and provides the functionality for the - * BlockStreamService. It sets up the bidirectional streaming methods for the service and handles - * the routing for these methods. It also initializes the StreamMediator, BlockStorage, and - * BlockCache upon creation. - * - *

The class provides two main methods, streamSink and streamSource, which handle the client and - * server streaming respectively. These methods return custom StreamObservers which are used to - * observe and respond to the streams. + * The BlockStreamService class defines the gRPC service for the block stream service. It provides + * the implementation for the bidirectional streaming, server streaming, and unary methods defined + * in the proto file. */ public class BlockStreamService implements GrpcService { @@ -52,6 +47,20 @@ public class BlockStreamService implements GrpcService { private final ServiceStatus serviceStatus; private final BlockReader blockReader; + /** + * Constructor for the BlockStreamService class. It initializes the BlockStreamService with the + * given parameters. + * + * @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to + * publish block items + * @param itemAckBuilder the item acknowledgement builder to send responses back to the producer + * @param streamMediator the stream mediator to proxy block items from the producer to the + * subscribers and manage the subscription lifecycle for subscribers + * @param blockReader the block reader to fetch blocks from storage for unary singleBlock + * service calls + * @param serviceStatus the service status provides methods to check service availability and to + * stop the service and web server in the event of an unrecoverable exception + */ BlockStreamService( final long timeoutThresholdMillis, final ItemAckBuilder itemAckBuilder, @@ -66,9 +75,10 @@ public class BlockStreamService implements GrpcService { } /** - * Returns the FileDescriptor for the BlockStreamServiceGrpcProto. + * Returns the proto descriptor for the BlockStreamService. This descriptor corresponds to the + * proto file for the BlockStreamService. * - * @return the FileDescriptor for the BlockStreamServiceGrpcProto + * @return the proto descriptor for the BlockStreamService */ @Override public Descriptors.FileDescriptor proto() { @@ -87,8 +97,9 @@ public String serviceName() { } /** - * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming - * methods for the service. + * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming method + * for publishBlockStream, server streaming method for subscribeBlockStream and a unary method + * for singleBlock. * * @param routing the routing for the BlockStreamService */ diff --git a/server/src/main/java/com/hedera/block/server/data/ObjectEvent.java b/server/src/main/java/com/hedera/block/server/data/ObjectEvent.java index fa141547c..47e4a1c28 100644 --- a/server/src/main/java/com/hedera/block/server/data/ObjectEvent.java +++ b/server/src/main/java/com/hedera/block/server/data/ObjectEvent.java @@ -16,13 +16,29 @@ package com.hedera.block.server.data; +/** + * The ObjectEvent class defines a simple object event used to publish data to downstream + * subscribers through the LMAX Disruptor RingBuffer. + * + * @param the type of the data to publish + */ public class ObjectEvent { T val; + /** + * Sets the given value to the event. + * + * @param val the value to set + */ public void set(final T val) { this.val = val; } + /** + * Gets the value of the event. + * + * @return the value of the event + */ public T get() { return val; } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index bfbaa3310..81822078a 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -36,9 +36,9 @@ import java.util.concurrent.Executors; /** - * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible + * LiveStreamMediatorImpl is an implementation of the StreamMediator interface. It is responsible * for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies - * block items to the subscribers as they arrive and persists the blocks to a block persistence + * block items to the subscribers as they arrive via a RingBuffer and persists the block items to a * store. */ public class LiveStreamMediatorImpl @@ -58,12 +58,13 @@ public class LiveStreamMediatorImpl private final ServiceStatus serviceStatus; /** - * Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block - * persistence handler, and service status. + * Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block writer, + * and service status. * - * @param subscribers the map of subscribers to their corresponding representation batch event - * processor in the LMAX Disruptor data structure. - * @param serviceStatus the service status singleton instance + * @param subscribers the map of subscribers to batch event processors + * @param blockWriter the block writer to persist block items + * @param serviceStatus the service status to stop the service and web server if an exception + * occurs while persisting a block item, stop the web server for maintenance, etc */ public LiveStreamMediatorImpl( final Map< @@ -84,10 +85,23 @@ public LiveStreamMediatorImpl( this.serviceStatus = serviceStatus; } + /** + * Constructs a new LiveStreamMediatorImpl instance with the given block writer. + * + * @param blockWriter the block writer to persist block items + */ public LiveStreamMediatorImpl(final BlockWriter blockWriter) { this(new ConcurrentHashMap<>(), blockWriter, new ServiceStatusImpl()); } + /** + * Constructs a new LiveStreamMediatorImpl instance with the given block writer and service + * status. + * + * @param blockWriter the block writer to persist block items + * @param serviceStatus the service status to stop the service and web server if an exception + * occurs while persisting a block item, stop the web server for maintenance, etc + */ public LiveStreamMediatorImpl( final BlockWriter blockWriter, final ServiceStatus serviceStatus) { this(new ConcurrentHashMap<>(), blockWriter, serviceStatus); diff --git a/server/src/main/java/com/hedera/block/server/mediator/Publisher.java b/server/src/main/java/com/hedera/block/server/mediator/Publisher.java index 7609f43f2..87edf987d 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/Publisher.java +++ b/server/src/main/java/com/hedera/block/server/mediator/Publisher.java @@ -19,19 +19,19 @@ import java.io.IOException; /** - * The Publisher interface defines the contract for publishing items emitted by the producer to + * The Publisher interface defines the contract for publishing data emitted by the producer to * downstream subscribers. * - * @param the type of the item to publish + * @param the type of data to publish */ public interface Publisher { /** - * Publishes the given item to the downstream subscribers. + * Publishes the given data to the downstream subscribers. * - * @param item the item emitted by an upstream producer to publish to downstream subscribers. + * @param data the data emitted by an upstream producer to publish to downstream subscribers. * @throws IOException thrown if an I/O error occurs while publishing the item to the * subscribers. */ - void publish(final U item) throws IOException; + void publish(final U data) throws IOException; } diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index f1696fa8a..87e2d4030 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -19,10 +19,10 @@ /** * The StreamMediator marker interface defines the combination of Publisher and SubscriptionHandler * contracts. It defines multiple views of the underlying implementation, allowing producers to - * publish items while the service and downstream subscribers can manage which consumers are + * publish data while the service and downstream subscribers can manage which consumers are * subscribed to the stream of events. * - * @param the type of the item to publish - * @param the type of the event + * @param the type of the data to publish + * @param the type of the events the SubscriptionHandler processes */ public interface StreamMediator extends Publisher, SubscriptionHandler {} diff --git a/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java index 04ae1b80a..be03f7075 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java +++ b/server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java @@ -27,7 +27,7 @@ public interface SubscriptionHandler { /** - * Subscribes the given handler to the stream of items. + * Subscribes the given handler to the stream of events. * * @param handler the handler to subscribe */ diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java index a3a6f7118..e986aeedf 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java @@ -29,6 +29,15 @@ import java.nio.file.attribute.PosixFilePermission; import java.util.Set; +/** + * The BlockAsDirWriter class writes a block to the filesystem block item by block item. In this + * implementation, a block is represented as a directory of BlockItems. BlockAsDirWriter is stateful + * and uses the known, deterministic block item attributes to create new "blocks" (directories) and + * write block items to them. If an unexpected exception occurs during the write operation, the + * BlockAsDirWriter will first try to correct file permissions if appropriate. It will then attempt + * to remove the current, incomplete block (directory) before re-throwing the exception to the + * caller. + */ public class BlockAsDirWriter implements BlockWriter { private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -66,6 +75,14 @@ public BlockAsDirWriter( createPath(blockNodeRootPath, System.Logger.Level.INFO); } + /** + * Constructor for the BlockAsDirWriter class. It initializes the BlockAsDirWriter with the + * given key and config. + * + * @param key the key to use to retrieve the block node root path from the config + * @param config the config to use to retrieve the block node root path + * @throws IOException if an error occurs while initializing the BlockAsDirWriter + */ public BlockAsDirWriter(final String key, final Config config) throws IOException { this( key, @@ -74,6 +91,12 @@ public BlockAsDirWriter(final String key, final Config config) throws IOExceptio Util.defaultPerms); } + /** + * Writes the given block item to the filesystem. + * + * @param blockItem the block item to write + * @throws IOException if an error occurs while writing the block item + */ @Override public void write(final BlockItem blockItem) throws IOException { diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java index c5e5104ff..f55b0ce95 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java @@ -19,6 +19,19 @@ import java.io.IOException; import java.util.Optional; +/** + * The BlockReader interface defines the contract for reading a block from the storage. + * + * @param the type of the block to read + */ public interface BlockReader { + + /** + * Reads the block with the given block number. + * + * @param blockNumber the block number to read + * @return the block with the given block number + * @throws IOException if an I/O error occurs fetching the block + */ Optional read(final long blockNumber) throws IOException; } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockRemover.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockRemover.java index e2dd649af..1ea2d5e03 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockRemover.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockRemover.java @@ -18,6 +18,14 @@ import java.io.IOException; +/** The BlockRemover interface defines the contract for removing a block from the storage. */ public interface BlockRemover { + + /** + * Removes the block with the given block number. + * + * @param id the block number to remove + * @throws IOException if an I/O error occurs removing the block + */ void remove(final long id) throws IOException; } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java index 531c6cfcc..b7289decc 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java @@ -18,6 +18,18 @@ import java.io.IOException; +/** + * The BlockWriter interface defines the contract for writing a block to the storage. + * + * @param the type of the block item to write + */ public interface BlockWriter { + + /** + * Writes the block item to the storage. + * + * @param blockItem the block item to write + * @throws IOException if an I/O error occurs writing the block item + */ void write(final V blockItem) throws IOException; } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java b/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java index db4f840cb..79cdceb02 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/Util.java @@ -21,10 +21,11 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; +/** Utility class for storage. */ public final class Util { private Util() {} - // Default permissions: rwxr-xr-x + /** Default file permissions. Default permissions: rwxr-xr-x */ public static final FileAttribute> defaultPerms = PosixFilePermissions.asFileAttribute( Set.of( diff --git a/server/src/main/java/com/hedera/block/server/producer/ItemAckBuilder.java b/server/src/main/java/com/hedera/block/server/producer/ItemAckBuilder.java index f4791f147..738d50db4 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ItemAckBuilder.java +++ b/server/src/main/java/com/hedera/block/server/producer/ItemAckBuilder.java @@ -24,10 +24,15 @@ import java.io.IOException; import java.security.NoSuchAlgorithmException; +/** + * The ItemAckBuilder class defines a simple item acknowledgement builder used to create an + * acknowledgement type response. This is a placeholder and should be replaced with real hash + * functionality once the hedera-protobufs types are integrated. + */ public class ItemAckBuilder { public ItemAcknowledgement buildAck(final BlockItem blockItem) throws IOException, NoSuchAlgorithmException { - // TODO: Use real hash + // TODO: Use real hash and real hedera-protobufs types return ItemAcknowledgement.newBuilder() .setItemAck(ByteString.copyFrom(getFakeHash(blockItem))) .build(); diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 4b906d2a9..162825e48 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -28,7 +28,8 @@ /** * The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC * service implementation. Helidon calls methods on this class as networking events occur with the - * connection to the upstream producer (e.g. blocks streamed from the Consensus Node to the server). + * connection to the upstream producer (e.g. block items streamed from the Consensus Node to the + * server). */ public class ProducerBlockItemObserver implements StreamObserver { @@ -43,6 +44,16 @@ public class ProducerBlockItemObserver implements StreamObserver publisher, @@ -57,9 +68,11 @@ public ProducerBlockItemObserver( } /** - * Helidon triggers this method when it receives a new block from the upstream producer. The - * method notifies all the mediator subscribers and sends a response back to the upstream - * producer. + * Helidon triggers this method when it receives a new PublishStreamRequest from the upstream + * producer. The method publish the block item data to all subscribers via the Publisher and + * sends a response back to the upstream producer. + * + * @param publishStreamRequest the PublishStreamRequest received from the upstream producer */ @Override public void onNext(final PublishStreamRequest publishStreamRequest) { diff --git a/server/src/main/java/com/hedera/block/server/producer/Util.java b/server/src/main/java/com/hedera/block/server/producer/Util.java index 7b5230b88..5764098ad 100644 --- a/server/src/main/java/com/hedera/block/server/producer/Util.java +++ b/server/src/main/java/com/hedera/block/server/producer/Util.java @@ -27,6 +27,15 @@ public final class Util { private Util() {} + /** + * Gets a fake hash for the given block item. This is a placeholder and should be replaced with + * real hash functionality once the hedera-protobufs types are integrated. + * + * @param blockItem the block item to get the fake hash for + * @return the fake hash for the given block item + * @throws IOException thrown if an I/O error occurs while getting the fake hash + * @throws NoSuchAlgorithmException thrown if the SHA-384 algorithm is not available + */ public static byte[] getFakeHash(BlockItem blockItem) throws IOException, NoSuchAlgorithmException { @@ -38,7 +47,7 @@ public static byte[] getFakeHash(BlockItem blockItem) // Get the serialized bytes byte[] serializedObject = byteArrayOutputStream.toByteArray(); - // Calculate the SHA-256 hash + // Calculate the SHA-384 hash MessageDigest digest = MessageDigest.getInstance("SHA-384"); return digest.digest(serializedObject); }