Skip to content

Commit

Permalink
fix: repaired type errors after proto change
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 9, 2024
1 parent 582cfae commit d5b1a9f
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public class BlockStreamService implements GrpcService {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
}
Expand Down Expand Up @@ -92,11 +92,11 @@ public void update(final Routing routing) {
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
* @return a custom StreamObserver to handle streaming blockItems from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
private StreamObserver<BlockStreamServiceGrpcProto.BlockItem> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockItemObserver(streamMediator, responseStreamObserver);
Expand All @@ -111,7 +111,7 @@ private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
private StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
public class Server {

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItem>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItemResponse>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> serverBidiStreamingMethod;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

Expand All @@ -62,7 +62,7 @@ public static void main(final String[] args) {
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
*/
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
private final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver;

private final long timeoutThresholdMillis;

Expand All @@ -45,7 +45,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv

private final CountDownLatch shutdownLatch = new CountDownLatch(1);

private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;

/**
* Constructor for the LiveStreamObserverImpl class.
Expand All @@ -56,8 +56,8 @@ public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final InstantSource consumerLivenessClock,
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator,
final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.producerLivenessClock = producerLivenessClock;
Expand All @@ -75,7 +75,7 @@ public ConsumerBlockItemObserver(
*
*/
@Override
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.Block> event, final long l, final boolean b) throws Exception {
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> event, final long l, final boolean b) throws Exception {

// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
if (isThresholdExceeded(consumerLivenessMillis)) {
Expand All @@ -91,18 +91,18 @@ public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.Block> event,
/**
* The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream.
*
* @param blockResponse the BlockResponse passed back to the server via the bidirectional stream to the downstream consumer.
* @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer.
*/
@Override
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItemResponse) {

// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
if (isThresholdExceeded(producerLivenessMillis)) {
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
streamMediator.unsubscribe(this);
} else {
// Refresh the consumer liveness
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockItemResponse);
consumerLivenessMillis = consumerLivenessClock.millis();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,45 @@
* managing the subscribe and unsubscribe operations of downstream consumers. It also proxies live
* blocks to the subscribers as they arrive and persists the blocks to the block persistence store.
*/
public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final RingBuffer<ObjectEvent<BlockStreamServiceGrpcProto.Block>> ringBuffer;
private final RingBuffer<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>> ringBuffer;
private final ExecutorService executor;

private final Map<BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse>,
BatchEventProcessor<ObjectEvent<BlockStreamServiceGrpcProto.Block>>> subscribers = new HashMap<>();
private final Map<BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse>,
BatchEventProcessor<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>>> subscribers = new HashMap<>();

private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> blockPersistenceHandler;

/**
* Constructor for the LiveStreamMediatorImpl class.
*
* @param blockPersistenceHandler the block persistence handler
*/
public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> blockPersistenceHandler) {
this.blockPersistenceHandler = blockPersistenceHandler;

// Initialize and start the disruptor
final Disruptor<ObjectEvent<BlockStreamServiceGrpcProto.Block>> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
final Disruptor<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
this.ringBuffer = disruptor.start();
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
}

@Override
public void publishEvent(BlockStreamServiceGrpcProto.Block block) {
public void publishEvent(BlockStreamServiceGrpcProto.BlockItem blockItem) {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", block);
ringBuffer.publishEvent((event, sequence) -> event.set(block));
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem);
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));

// Block persistence
blockPersistenceHandler.persist(block);
blockPersistenceHandler.persist(blockItem);
}

@Override
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> handler) {

// Initialize the batch event processor and set it on the ring buffer
final var batchEventProcessor = new BatchEventProcessorBuilder()
Expand All @@ -87,7 +87,7 @@ public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamService
}

@Override
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> handler) {
final var batchEventProcessor = subscribers.remove(handler);

// Stop the processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public interface StreamMediator<U, V> {

void publishEvent(final BlockStreamServiceGrpcProto.Block block);
void publishEvent(final BlockStreamServiceGrpcProto.BlockItem blockItem);

void subscribe(final BlockItemEventHandler<U, V> handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@
* Write-Through cache handler coordinates between the block storage and the block cache to ensure the block
* is persisted to the storage before being cached.
*/
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> {
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> {

private final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;
private final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage;

/**
* Constructor for the WriteThroughCacheHandler class.
*
* @param blockStorage the block storage
*/
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage) {
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage) {
this.blockStorage = blockStorage;
}

/**
* Persists the block to the block storage and cache the block.
*
* @param block the block to persist
* @param blockItem the block to persist
* @return the block id
*/
@Override
public Long persist(final BlockStreamServiceGrpcProto.Block block) {
public Long persist(final BlockStreamServiceGrpcProto.BlockItem blockItem) {

// Write-Through cache
blockStorage.write(block);
return block.getId();
blockStorage.write(blockItem);
return blockItem.getId();
}

/**
Expand All @@ -63,10 +63,10 @@ public Long persist(final BlockStreamServiceGrpcProto.Block block) {
* @return a queue of blocks
*/
@Override
public Queue<BlockStreamServiceGrpcProto.Block> readRange(final long startBlockId, final long endBlockId) {
final Queue<BlockStreamServiceGrpcProto.Block> blocks = new LinkedList<>();
public Queue<BlockStreamServiceGrpcProto.BlockItem> readRange(final long startBlockId, final long endBlockId) {
final Queue<BlockStreamServiceGrpcProto.BlockItem> blocks = new LinkedList<>();
for (long count = startBlockId; count <= endBlockId; count++) {
final Optional<BlockStreamServiceGrpcProto.Block> blockOpt = read(count);
final Optional<BlockStreamServiceGrpcProto.BlockItem> blockOpt = read(count);
blockOpt.ifPresent(blocks::add);
}

Expand All @@ -82,7 +82,7 @@ public Queue<BlockStreamServiceGrpcProto.Block> readRange(final long startBlockI
* @return an Optional with the block
*/
@Override
public Optional<BlockStreamServiceGrpcProto.Block> read(final long id) {
public Optional<BlockStreamServiceGrpcProto.BlockItem> read(final long id) {
return blockStorage.read(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public interface BlockStorage<V> {
/**
* Writes a block to storage.
*
* @param block the block to write
* @param blockItem the block to write
* @return the id of the block
*/
Optional<Long> write(final V block);
Optional<Long> write(final V blockItem);

/**
* Reads a block from storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* The FileSystemBlockStorage class implements the BlockStorage interface to store blocks to the filesystem.
*/
public class FileSystemBlockStorage implements BlockStorage<BlockStreamServiceGrpcProto.Block> {
public class FileSystemBlockStorage implements BlockStorage<BlockStreamServiceGrpcProto.BlockItem> {

public static final String BLOCK_FILE_EXTENSION = ".blk";

Expand Down Expand Up @@ -74,16 +74,16 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
/**
* Writes a block to the filesystem.
*
* @param block the block to write
* @param blockItem the block to write
* @return the id of the block
*/
@Override
public Optional<Long> write(final BlockStreamServiceGrpcProto.Block block) {
Long id = block.getId();
public Optional<Long> write(final BlockStreamServiceGrpcProto.BlockItem blockItem) {
Long id = blockItem.getId();
final String fullPath = resolvePath(id);

try (FileOutputStream fos = new FileOutputStream(fullPath)) {
block.writeTo(fos);
blockItem.writeTo(fos);
LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath);

return Optional.of(id);
Expand All @@ -100,14 +100,14 @@ public Optional<Long> write(final BlockStreamServiceGrpcProto.Block block) {
* @return the block
*/
@Override
public Optional<BlockStreamServiceGrpcProto.Block> read(final Long id) {
public Optional<BlockStreamServiceGrpcProto.BlockItem> read(final Long id) {
return read(resolvePath(id));
}

private Optional<BlockStreamServiceGrpcProto.Block> read(final String filePath) {
private Optional<BlockStreamServiceGrpcProto.BlockItem> read(final String filePath) {

try (FileInputStream fis = new FileInputStream(filePath)) {
return Optional.of(BlockStreamServiceGrpcProto.Block.parseFrom(fis));
return Optional.of(BlockStreamServiceGrpcProto.BlockItem.parseFrom(fis));
} catch (FileNotFoundException io) {
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io);
return Optional.empty();
Expand Down
Loading

0 comments on commit d5b1a9f

Please sign in to comment.