Skip to content

Commit

Permalink
fix: added annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 7, 2024
1 parent 4cbbe68 commit 8f52ae3
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ void singleBlock(
if (serviceStatus.isRunning()) {
final long blockNumber = singleBlockRequest.getBlockNumber();
try {
@NonNull
final Optional<Block> blockOpt = blockReader.read(blockNumber);
@NonNull final Optional<Block> blockOpt = blockReader.read(blockNumber);
if (blockOpt.isPresent()) {
LOGGER.log(
System.Logger.Level.DEBUG,
Expand Down
14 changes: 9 additions & 5 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,33 @@

package com.hedera.block.server;

import edu.umd.cs.findbugs.annotations.NonNull;

/** Constants used in the BlockNode service. */
public final class Constants {
private Constants() {}

/** Constant mapped to the root path config key where the block files are stored */
@NonNull
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

/** Constant mapped to the timeout for stream consumers in milliseconds */
@NonNull
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
"blocknode.server.consumer.timeout.threshold";

/** Constant mapped to the name of the service in the .proto file */
public static final String SERVICE_NAME = "BlockStreamGrpcService";
@NonNull public static final String SERVICE_NAME = "BlockStreamGrpcService";

/** Constant mapped to the publishBlockStream service method name in the .proto file */
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
@NonNull public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";

/** Constant mapped to the subscribeBlockStream service method name in the .proto file */
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";
@NonNull public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";

/** Constant mapped to the singleBlock service method name in the .proto file */
public static final String SINGLE_BLOCK_METHOD_NAME = "singleBlock";
@NonNull public static final String SINGLE_BLOCK_METHOD_NAME = "singleBlock";

/** Constant defining the block file extension */
public static final String BLOCK_FILE_EXTENSION = ".blk";
@NonNull public static final String BLOCK_FILE_EXTENSION = ".blk";
}
2 changes: 1 addition & 1 deletion server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static void main(final String[] args) {
LOGGER.log(System.Logger.Level.INFO, "Starting BlockNode Server");

Check warning on line 66 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L66

Added line #L66 was not covered by tests

// Set the global configuration
final Config config = Config.create();
@NonNull final Config config = Config.create();
Config.global(config);

Check warning on line 70 in server/src/main/java/com/hedera/block/server/Server.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/Server.java#L69-L70

Added lines #L69 - L70 were not covered by tests

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.WebServer;

/**
Expand Down Expand Up @@ -43,7 +44,7 @@ public interface ServiceStatus {
*
* @param webServer the web server instance
*/
void setWebServer(final WebServer webServer);
void setWebServer(@NonNull final WebServer webServer);

/**
* Stops the service and web server. This method is called to shut down the service and the web
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.WebServer;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -54,7 +55,7 @@ public void setRunning(final boolean running) {
*
* @param webServer the web server instance
*/
public void setWebServer(final WebServer webServer) {
public void setWebServer(@NonNull final WebServer webServer) {
this.webServer = webServer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.lmax.disruptor.EventHandler;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.InstantSource;
Expand Down Expand Up @@ -74,9 +75,12 @@ public class ConsumerStreamResponseObserver
*/
public ConsumerStreamResponseObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>> subscriptionHandler,
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
@NonNull final InstantSource producerLivenessClock,
@NonNull
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>>
subscriptionHandler,
@NonNull
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.subscriptionHandler = subscriptionHandler;
Expand Down Expand Up @@ -122,7 +126,9 @@ public ConsumerStreamResponseObserver(
/** Pass the block to the observer provided by Helidon */
@Override
public void onEvent(
final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {
@NonNull final ObjectEvent<SubscribeStreamResponse> event,
final long l,
final boolean b) {

// Only send the response if the consumer has not cancelled
// or closed the stream.
Expand All @@ -139,8 +145,8 @@ public void onEvent(

// Only start sending BlockItems after we've reached
// the beginning of a block.
final SubscribeStreamResponse subscribeStreamResponse = event.get();
final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
@NonNull final SubscribeStreamResponse subscribeStreamResponse = event.get();
@NonNull final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
if (!streamStarted && blockItem.hasHeader()) {
streamStarted = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.block.server.data;

import edu.umd.cs.findbugs.annotations.NonNull;

/**
* The ObjectEvent class defines a simple object event used to publish data to downstream
* subscribers through the LMAX Disruptor RingBuffer.
Expand All @@ -34,7 +36,7 @@ public ObjectEvent() {}
*
* @param val the value to set
*/
public void set(final T val) {
public void set(@NonNull final T val) {
this.val = val;
}

Expand All @@ -43,6 +45,7 @@ public void set(final T val) {
*
* @return the value of the event
*/
@NonNull
public T get() {
return val;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -67,17 +68,19 @@ public class LiveStreamMediatorImpl
* occurs while persisting a block item, stop the web server for maintenance, etc
*/
public LiveStreamMediatorImpl(
final Map<
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
subscribers,
final BlockWriter<BlockItem> blockWriter,
final ServiceStatus serviceStatus) {
@NonNull
final Map<
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
subscribers,
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final ServiceStatus serviceStatus) {

this.subscribers = subscribers;
this.blockWriter = blockWriter;

// Initialize and start the disruptor
@NonNull
final Disruptor<ObjectEvent<SubscribeStreamResponse>> disruptor =
new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
this.ringBuffer = disruptor.start();
Expand All @@ -90,7 +93,7 @@ public LiveStreamMediatorImpl(
*
* @param blockWriter the block writer to persist block items
*/
public LiveStreamMediatorImpl(final BlockWriter<BlockItem> blockWriter) {
public LiveStreamMediatorImpl(@NonNull final BlockWriter<BlockItem> blockWriter) {
this(new ConcurrentHashMap<>(), blockWriter, new ServiceStatusImpl());
}

Expand All @@ -103,17 +106,19 @@ public LiveStreamMediatorImpl(final BlockWriter<BlockItem> blockWriter) {
* occurs while persisting a block item, stop the web server for maintenance, etc
*/
public LiveStreamMediatorImpl(
final BlockWriter<BlockItem> blockWriter, final ServiceStatus serviceStatus) {
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final ServiceStatus serviceStatus) {
this(new ConcurrentHashMap<>(), blockWriter, serviceStatus);
}

@Override
public void publish(final BlockItem blockItem) throws IOException {
public void publish(@NonNull final BlockItem blockItem) throws IOException {

if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.DEBUG, "Publishing BlockItem: {0}", blockItem);
@NonNull
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));
Expand All @@ -133,11 +138,11 @@ public void publish(final BlockItem blockItem) throws IOException {
LOGGER.log(System.Logger.Level.DEBUG, "Send a response to end the stream");

// Publish the block for all subscribers to receive
final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
@NonNull final SubscribeStreamResponse endStreamResponse = buildEndStreamResponse();
ringBuffer.publishEvent((event, sequence) -> event.set(endStreamResponse));

// Unsubscribe all downstream consumers
for (final var subscriber : subscribers.keySet()) {
for (@NonNull final var subscriber : subscribers.keySet()) {
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribing: {0}", subscriber);
unsubscribe(subscriber);
}
Expand All @@ -150,9 +155,11 @@ public void publish(final BlockItem blockItem) throws IOException {
}

@Override
public void subscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
public void subscribe(
@NonNull final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Initialize the batch event processor and set it on the ring buffer
@NonNull
final var batchEventProcessor =
new BatchEventProcessorBuilder()
.build(ringBuffer, ringBuffer.newBarrier(), handler);
Expand All @@ -165,10 +172,11 @@ public void subscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> h
}

@Override
public void unsubscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
public void unsubscribe(
@NonNull final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Remove the subscriber
final var batchEventProcessor = subscribers.remove(handler);
@NonNull final var batchEventProcessor = subscribers.remove(handler);
if (batchEventProcessor == null) {
LOGGER.log(System.Logger.Level.ERROR, "Subscriber not found: {0}", handler);

Expand All @@ -183,10 +191,12 @@ public void unsubscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>>
}

@Override
public boolean isSubscribed(EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
public boolean isSubscribed(
@NonNull EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
return subscribers.containsKey(handler);
}

@NonNull
private static SubscribeStreamResponse buildEndStreamResponse() {
// The current spec does not contain a generic error code for
// SubscribeStreamResponseCode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.mediator;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;

/**
Expand All @@ -33,5 +34,5 @@ public interface Publisher<U> {
* @throws IOException thrown if an I/O error occurs while publishing the item to the
* subscribers.
*/
void publish(final U data) throws IOException;
void publish(@NonNull final U data) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.server.mediator;

import com.lmax.disruptor.EventHandler;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
* The SubscriptionHandler interface defines the contract for subscribing and unsubscribing
Expand All @@ -31,20 +32,20 @@ public interface SubscriptionHandler<V> {
*
* @param handler the handler to subscribe
*/
void subscribe(final EventHandler<V> handler);
void subscribe(@NonNull final EventHandler<V> handler);

/**
* Unsubscribes the given handler from the stream of events.
*
* @param handler the handler to unsubscribe
*/
void unsubscribe(final EventHandler<V> handler);
void unsubscribe(@NonNull final EventHandler<V> handler);

/**
* Checks if the given handler is subscribed to the stream of events.
*
* @param handler the handler to check
* @return true if the handler is subscribed, false otherwise
*/
boolean isSubscribed(final EventHandler<V> handler);
boolean isSubscribed(@NonNull final EventHandler<V> handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public BlockAsDirReader(

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");

@NonNull
final Path blockNodeRootPath = Path.of(config.get(key).asString().get());
@NonNull final Path blockNodeRootPath = Path.of(config.get(key).asString().get());

LOGGER.log(System.Logger.Level.INFO, config.toString());
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);
Expand Down Expand Up @@ -100,8 +99,7 @@ public Optional<Block> read(final long blockNumber) throws IOException {

// Verify path attributes of the block directory within the
// block node root path
@NonNull
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
@NonNull final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
if (isPathDisqualified(blockPath)) {
return Optional.empty();
}
Expand All @@ -117,11 +115,9 @@ public Optional<Block> read(final long blockNumber) throws IOException {
// 10.blk), the loop will directly fetch the BlockItems in order based on
// their file names. The loop will exit when it attempts to read a
// BlockItem file that does not exist (e.g., 11.blk).
@NonNull
final Builder builder = Block.newBuilder();
@NonNull final Builder builder = Block.newBuilder();
for (int i = 1; ; i++) {
@NonNull
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
@NonNull final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
@NonNull
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
if (blockItemOpt.isPresent()) {
Expand All @@ -142,7 +138,8 @@ public Optional<Block> read(final long blockNumber) throws IOException {
}

@NonNull
private Optional<BlockItem> readBlockItem(@NonNull final String blockItemPath) throws IOException {
private Optional<BlockItem> readBlockItem(@NonNull final String blockItemPath)
throws IOException {

try (FileInputStream fis = new FileInputStream(blockItemPath)) {
return Optional.of(BlockItem.parseFrom(fis));
Expand Down
Loading

0 comments on commit 8f52ae3

Please sign in to comment.