Skip to content

Commit

Permalink
fix: added javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 4, 2024
1 parent d7825b8 commit 71ed006
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,9 @@
import java.util.Optional;

/**
* This class implements the GrpcService interface and provides the functionality for the
* BlockStreamService. It sets up the bidirectional streaming methods for the service and handles
* the routing for these methods. It also initializes the StreamMediator, BlockStorage, and
* BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and
* server streaming respectively. These methods return custom StreamObservers which are used to
* observe and respond to the streams.
* The BlockStreamService class defines the gRPC service for the block stream service. It provides
* the implementation for the bidirectional streaming, server streaming, and unary methods defined
* in the proto file.
*/
public class BlockStreamService implements GrpcService {

Expand All @@ -52,6 +47,20 @@ public class BlockStreamService implements GrpcService {
private final ServiceStatus serviceStatus;
private final BlockReader<Block> blockReader;

/**
* Constructor for the BlockStreamService class. It initializes the BlockStreamService with the
* given parameters.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to
* publish block items
* @param itemAckBuilder the item acknowledgement builder to send responses back to the producer
* @param streamMediator the stream mediator to proxy block items from the producer to the
* subscribers and manage the subscription lifecycle for subscribers
* @param blockReader the block reader to fetch blocks from storage for unary singleBlock
* service calls
* @param serviceStatus the service status provides methods to check service availability and to
* stop the service and web server in the event of an unrecoverable exception
*/
BlockStreamService(
final long timeoutThresholdMillis,
final ItemAckBuilder itemAckBuilder,
Expand All @@ -66,9 +75,10 @@ public class BlockStreamService implements GrpcService {
}

/**
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
* Returns the proto descriptor for the BlockStreamService. This descriptor corresponds to the
* proto file for the BlockStreamService.
*
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
* @return the proto descriptor for the BlockStreamService
*/
@Override
public Descriptors.FileDescriptor proto() {
Expand All @@ -87,8 +97,9 @@ public String serviceName() {
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming
* methods for the service.
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming method
* for publishBlockStream, server streaming method for subscribeBlockStream and a unary method
* for singleBlock.
*
* @param routing the routing for the BlockStreamService
*/
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/com/hedera/block/server/data/ObjectEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,29 @@

package com.hedera.block.server.data;

/**
* The ObjectEvent class defines a simple object event used to publish data to downstream
* subscribers through the LMAX Disruptor RingBuffer.
*
* @param <T> the type of the data to publish
*/
public class ObjectEvent<T> {
T val;

/**
* Sets the given value to the event.
*
* @param val the value to set
*/
public void set(final T val) {
this.val = val;
}

/**
* Gets the value of the event.
*
* @return the value of the event
*/
public T get() {
return val;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.concurrent.Executors;

/**
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
* LiveStreamMediatorImpl is an implementation of the StreamMediator interface. It is responsible
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
* block items to the subscribers as they arrive and persists the blocks to a block persistence
* block items to the subscribers as they arrive via a RingBuffer and persists the block items to a
* store.
*/
public class LiveStreamMediatorImpl
Expand All @@ -58,12 +58,13 @@ public class LiveStreamMediatorImpl
private final ServiceStatus serviceStatus;

/**
* Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block
* persistence handler, and service status.
* Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block writer,
* and service status.
*
* @param subscribers the map of subscribers to their corresponding representation batch event
* processor in the LMAX Disruptor data structure.
* @param serviceStatus the service status singleton instance
* @param subscribers the map of subscribers to batch event processors
* @param blockWriter the block writer to persist block items
* @param serviceStatus the service status to stop the service and web server if an exception
* occurs while persisting a block item, stop the web server for maintenance, etc
*/
public LiveStreamMediatorImpl(
final Map<
Expand All @@ -84,10 +85,23 @@ public LiveStreamMediatorImpl(
this.serviceStatus = serviceStatus;
}

/**
* Constructs a new LiveStreamMediatorImpl instance with the given block writer.
*
* @param blockWriter the block writer to persist block items
*/
public LiveStreamMediatorImpl(final BlockWriter<BlockItem> blockWriter) {
this(new ConcurrentHashMap<>(), blockWriter, new ServiceStatusImpl());
}

/**
* Constructs a new LiveStreamMediatorImpl instance with the given block writer and service
* status.
*
* @param blockWriter the block writer to persist block items
* @param serviceStatus the service status to stop the service and web server if an exception
* occurs while persisting a block item, stop the web server for maintenance, etc
*/
public LiveStreamMediatorImpl(
final BlockWriter<BlockItem> blockWriter, final ServiceStatus serviceStatus) {
this(new ConcurrentHashMap<>(), blockWriter, serviceStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import java.io.IOException;

/**
* The Publisher interface defines the contract for publishing items emitted by the producer to
* The Publisher interface defines the contract for publishing data emitted by the producer to
* downstream subscribers.
*
* @param <U> the type of the item to publish
* @param <U> the type of data to publish
*/
public interface Publisher<U> {

/**
* Publishes the given item to the downstream subscribers.
* Publishes the given data to the downstream subscribers.
*
* @param item the item emitted by an upstream producer to publish to downstream subscribers.
* @param data the data emitted by an upstream producer to publish to downstream subscribers.
* @throws IOException thrown if an I/O error occurs while publishing the item to the
* subscribers.
*/
void publish(final U item) throws IOException;
void publish(final U data) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
/**
* The StreamMediator marker interface defines the combination of Publisher and SubscriptionHandler
* contracts. It defines multiple views of the underlying implementation, allowing producers to
* publish items while the service and downstream subscribers can manage which consumers are
* publish data while the service and downstream subscribers can manage which consumers are
* subscribed to the stream of events.
*
* @param <U> the type of the item to publish
* @param <V> the type of the event
* @param <U> the type of the data to publish
* @param <V> the type of the events the SubscriptionHandler processes
*/
public interface StreamMediator<U, V> extends Publisher<U>, SubscriptionHandler<V> {}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public interface SubscriptionHandler<V> {

/**
* Subscribes the given handler to the stream of items.
* Subscribes the given handler to the stream of events.
*
* @param handler the handler to subscribe
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
import java.nio.file.attribute.PosixFilePermission;
import java.util.Set;

/**
* The BlockAsDirWriter class writes a block to the filesystem block item by block item. In this
* implementation, a block is represented as a directory of BlockItems. BlockAsDirWriter is stateful
* and uses the known, deterministic block item attributes to create new "blocks" (directories) and
* write block items to them. If an unexpected exception occurs during the write operation, the
* BlockAsDirWriter will first try to correct file permissions if appropriate. It will then attempt
* to remove the current, incomplete block (directory) before re-throwing the exception to the
* caller.
*/
public class BlockAsDirWriter implements BlockWriter<BlockItem> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
Expand Down Expand Up @@ -66,6 +75,14 @@ public BlockAsDirWriter(
createPath(blockNodeRootPath, System.Logger.Level.INFO);
}

/**
* Constructor for the BlockAsDirWriter class. It initializes the BlockAsDirWriter with the
* given key and config.
*
* @param key the key to use to retrieve the block node root path from the config
* @param config the config to use to retrieve the block node root path
* @throws IOException if an error occurs while initializing the BlockAsDirWriter
*/
public BlockAsDirWriter(final String key, final Config config) throws IOException {
this(
key,
Expand All @@ -74,6 +91,12 @@ public BlockAsDirWriter(final String key, final Config config) throws IOExceptio
Util.defaultPerms);
}

/**
* Writes the given block item to the filesystem.
*
* @param blockItem the block item to write
* @throws IOException if an error occurs while writing the block item
*/
@Override
public void write(final BlockItem blockItem) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
import java.io.IOException;
import java.util.Optional;

/**
* The BlockReader interface defines the contract for reading a block from the storage.
*
* @param <V> the type of the block to read
*/
public interface BlockReader<V> {

/**
* Reads the block with the given block number.
*
* @param blockNumber the block number to read
* @return the block with the given block number
* @throws IOException if an I/O error occurs fetching the block
*/
Optional<V> read(final long blockNumber) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

import java.io.IOException;

/** The BlockRemover interface defines the contract for removing a block from the storage. */
public interface BlockRemover {

/**
* Removes the block with the given block number.
*
* @param id the block number to remove
* @throws IOException if an I/O error occurs removing the block
*/
void remove(final long id) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@

import java.io.IOException;

/**
* The BlockWriter interface defines the contract for writing a block to the storage.
*
* @param <V> the type of the block item to write
*/
public interface BlockWriter<V> {

/**
* Writes the block item to the storage.
*
* @param blockItem the block item to write
* @throws IOException if an I/O error occurs writing the block item
*/
void write(final V blockItem) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;

/** Utility class for storage. */
public final class Util {
private Util() {}

// Default permissions: rwxr-xr-x
/** Default file permissions. Default permissions: rwxr-xr-x */
public static final FileAttribute<Set<PosixFilePermission>> defaultPerms =
PosixFilePermissions.asFileAttribute(
Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import java.io.IOException;
import java.security.NoSuchAlgorithmException;

/**
* The ItemAckBuilder class defines a simple item acknowledgement builder used to create an
* acknowledgement type response. This is a placeholder and should be replaced with real hash
* functionality once the hedera-protobufs types are integrated.
*/
public class ItemAckBuilder {
public ItemAcknowledgement buildAck(final BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {
// TODO: Use real hash
// TODO: Use real hash and real hedera-protobufs types
return ItemAcknowledgement.newBuilder()
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
/**
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
* service implementation. Helidon calls methods on this class as networking events occur with the
* connection to the upstream producer (e.g. blocks streamed from the Consensus Node to the server).
* connection to the upstream producer (e.g. block items streamed from the Consensus Node to the
* server).
*/
public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRequest> {

Expand All @@ -43,6 +44,16 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
* mediator with blocks as they arrive from the upstream producer. It also sends responses back
* to the upstream producer via the responseStreamObserver.
*
* @param publisher the block item publisher to used to pass block items to consumers as they
* arrive from the upstream producer
* @param publishStreamResponseObserver the response stream observer to send responses back to
* the upstream producer for each block item processed
* @param itemAckBuilder the item acknowledgement builder to use when sending responses back to
* the upstream producer for each block item processed
* @param serviceStatus the service status used to determine if the downstream service is
* accepting block items. In the event of an unrecoverable exception, it will be used to
* stop the web server.
*/
public ProducerBlockItemObserver(
final Publisher<BlockItem> publisher,
Expand All @@ -57,9 +68,11 @@ public ProducerBlockItemObserver(
}

/**
* Helidon triggers this method when it receives a new block from the upstream producer. The
* method notifies all the mediator subscribers and sends a response back to the upstream
* producer.
* Helidon triggers this method when it receives a new PublishStreamRequest from the upstream
* producer. The method publish the block item data to all subscribers via the Publisher and
* sends a response back to the upstream producer.
*
* @param publishStreamRequest the PublishStreamRequest received from the upstream producer
*/
@Override
public void onNext(final PublishStreamRequest publishStreamRequest) {
Expand Down
11 changes: 10 additions & 1 deletion server/src/main/java/com/hedera/block/server/producer/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
public final class Util {
private Util() {}

/**
* Gets a fake hash for the given block item. This is a placeholder and should be replaced with
* real hash functionality once the hedera-protobufs types are integrated.
*
* @param blockItem the block item to get the fake hash for
* @return the fake hash for the given block item
* @throws IOException thrown if an I/O error occurs while getting the fake hash
* @throws NoSuchAlgorithmException thrown if the SHA-384 algorithm is not available
*/
public static byte[] getFakeHash(BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {

Expand All @@ -38,7 +47,7 @@ public static byte[] getFakeHash(BlockItem blockItem)
// Get the serialized bytes
byte[] serializedObject = byteArrayOutputStream.toByteArray();

// Calculate the SHA-256 hash
// Calculate the SHA-384 hash
MessageDigest digest = MessageDigest.getInstance("SHA-384");
return digest.digest(serializedObject);
}
Expand Down

0 comments on commit 71ed006

Please sign in to comment.