Skip to content

Commit

Permalink
refactor: added system-wide exception handling. IOExceptions will not…
Browse files Browse the repository at this point in the history
…ify the producer, all consumers and unsubscribe all subscribers

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 25, 2024
1 parent c1a92fd commit a82c2e6
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class BlockStreamService implements GrpcService {

private final long timeoutThresholdMillis;
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

/**
* Constructor for the BlockStreamService class.
Expand All @@ -55,7 +55,7 @@ public class BlockStreamService implements GrpcService {
public BlockStreamService(
final long timeoutThresholdMillis,
final ItemAckBuilder itemAckBuilder,
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator) {
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
Expand Down
108 changes: 55 additions & 53 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/** Main class for the block node server */
public class Server {
Expand All @@ -54,60 +55,61 @@ private Server() {}
*/
public static void main(final String[] args) {

try {
// Set the global configuration
final Config config = Config.create();
Config.global(config);

// Build the gRPC service
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config);

// Start the web server
WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build();

webServer.start();
// .start();

}

private static GrpcRouting.Builder buildGrpcRouting(final Config config) {

// Set the global configuration
final Config config = Config.create();
Config.global(config);

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
.asLong()
.orElse(1500L);

// Initialize the reader and writer for the block storage
final BlockWriter<BlockItem> blockWriter =
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockReader<Block> blockReader =
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
new LiveStreamMediatorImpl(
new FileSystemPersistenceHandler(blockReader, blockWriter),
(streamMediator) -> {
LOGGER.log(
System.Logger.Level.ERROR,
"Shutting down the server due to an error.");
}));

// Start the web server
WebServer.builder()
.port(8080)
.addRouting(
GrpcRouting.builder()
.service(blockStreamService)
.bidi(
com.hedera.block.protos.BlockStreamService
.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.serverStream(
com.hedera.block.protos.BlockStreamService
.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverStreamingMethod))
.build()
.start();

} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
throw new RuntimeException(e);
try {
final BlockStreamService blockStreamService = buildBlockStreamService(config);
return GrpcRouting.builder()
.service(blockStreamService)
.bidi(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.serverStream(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverStreamingMethod);
} catch (IOException io) {
LOGGER.log(
System.Logger.Level.ERROR, "An exception was thrown starting the server", io);
throw new RuntimeException(io);
}
}

private static BlockStreamService buildBlockStreamService(final Config config)
throws IOException {
// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

// Initialize the reader and writer for the block storage
final BlockWriter<BlockItem> blockWriter =
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockReader<Block> blockReader =
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

return new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
new LiveStreamMediatorImpl(
new ConcurrentHashMap<>(32),
new FileSystemPersistenceHandler(blockReader, blockWriter)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.InstantSource;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
* the downstream consumer via the notify method and manage the bidirectional stream to the consumer
* via the onNext, onError, and onCompleted methods.
*/
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockItem>> {
public class ConsumerBlockItemObserver
implements BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

Expand All @@ -40,9 +42,10 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
private final InstantSource producerLivenessClock;
private long producerLivenessMillis;

private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

private boolean streamStarted;
private AtomicBoolean isEventTransitioning = new AtomicBoolean(false);

/**
* Constructor for the LiveStreamObserverImpl class.
Expand All @@ -52,7 +55,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
Expand Down Expand Up @@ -84,7 +87,10 @@ public ConsumerBlockItemObserver(

/** Pass the block to the observer provided by Helidon */
@Override
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {
public void onEvent(
final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {

isEventTransitioning.set(true);

final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
Expand All @@ -97,22 +103,31 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool

// Only start sending BlockItems after we've reached
// the beginning of a block.
final BlockItem blockItem = event.get();
final SubscribeStreamResponse subscribeStreamResponse = event.get();
final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
if (!streamStarted && blockItem.hasHeader()) {
streamStarted = true;
}

if (streamStarted) {
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem);

subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
}

isEventTransitioning.set(false);
}

@Override
public void awaitShutdown() throws InterruptedException {}
public void awaitShutdown() throws InterruptedException {
while (isEventTransitioning.get()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
LOGGER.log(
System.Logger.Level.ERROR,
"Interrupted while waiting for event to complete");
}
}
}
}
Loading

0 comments on commit a82c2e6

Please sign in to comment.