diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index 942be6f5..e1207e38 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -31,7 +31,11 @@ message PublishStreamRequest { } message PublishStreamResponse { - uint64 block_number = 1; + ItemAcknowledgement acknowledgement = 1; +} + +message ItemAcknowledgement { + bytes item_ack = 1; } message SubscribeStreamRequest { @@ -40,6 +44,7 @@ message SubscribeStreamRequest { message SubscribeStreamResponse { int32 status = 1; + BlockItem block_item = 2; } /** @@ -51,7 +56,5 @@ message BlockItem { int64 block_header = 1; - uint64 id = 2; - - string value = 3; + string value = 2; } diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index cf1798ab..8603ce64 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -20,6 +20,7 @@ import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest; import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse; import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest; +import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; import static com.hedera.block.server.Constants.*; import com.google.protobuf.Descriptors; @@ -101,7 +102,7 @@ private StreamObserver publishBlockStream( } private StreamObserver subscribeBlockStream( - final StreamObserver subscribeStreamRequestObserver) { + final StreamObserver subscribeStreamRequestObserver) { LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method"); // Return a custom StreamObserver to handle streaming blocks from the producer. diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index d32824ca..26317258 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -18,7 +18,10 @@ import static com.hedera.block.protos.BlockStreamService.BlockItem; import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest; +import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; +import com.google.protobuf.Descriptors; +import com.hedera.block.protos.BlockStreamService; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.ServerCallStreamObserver; @@ -37,7 +40,7 @@ public class ConsumerBlockItemObserver private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final StreamObserver subscribeStreamResponseObserver; + private final StreamObserver subscribeStreamResponseObserver; private final long timeoutThresholdMillis; private final InstantSource producerLivenessClock; @@ -58,7 +61,7 @@ public ConsumerBlockItemObserver( final long timeoutThresholdMillis, final InstantSource producerLivenessClock, final StreamMediator, SubscribeStreamRequest> streamMediator, - final StreamObserver subscribeStreamResponseObserver) { + final StreamObserver subscribeStreamResponseObserver) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.producerLivenessClock = producerLivenessClock; @@ -72,7 +75,7 @@ public ConsumerBlockItemObserver( // Unfortunately we have to cast the responseStreamObserver to a // ServerCallStreamObserver // to register the onCancelHandler. - ((ServerCallStreamObserver) subscribeStreamResponseObserver) + ((ServerCallStreamObserver) subscribeStreamResponseObserver) .setOnCancelHandler( () -> { LOGGER.log( @@ -102,7 +105,12 @@ public void onEvent(final ObjectEvent event, final long l, final bool } if (isReachedFirstBlock) { - subscribeStreamResponseObserver.onNext(blockItem); + SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse + .newBuilder() + .setBlockItem(blockItem) + .build(); + + subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 08d7ae90..4484b203 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -73,7 +73,7 @@ public LiveStreamMediatorImpl( public void publishEvent(BlockItem blockItem) { // Publish the block for all subscribers to receive - LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem); + LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); ringBuffer.publishEvent((event, sequence) -> event.set(blockItem)); // Block persistence diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java index 0298431b..5f661b68 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -51,7 +51,7 @@ public Long persist(final BlockItem blockItem) { // Write-Through cache blockStorage.write(blockItem); - return blockItem.getId(); + return blockItem.getBlockHeader(); } /** diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java index e07f789c..a07fe79a 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java @@ -81,7 +81,7 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx */ @Override public Optional write(final BlockItem blockItem) { - Long id = blockItem.getId(); + Long id = blockItem.getBlockHeader(); final String fullPath = resolvePath(id); try (FileOutputStream fos = new FileOutputStream(fullPath)) { diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 8460dcb6..51dddc25 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -16,15 +16,19 @@ package com.hedera.block.server.producer; -import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest; -import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse; -import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest; - +import com.google.protobuf.ByteString; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import static com.hedera.block.protos.BlockStreamService.*; + /** * The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC * service implementation. Helidon calls methods on this class as networking events occur with the @@ -57,13 +61,24 @@ public ProducerBlockItemObserver( @Override public void onNext(final PublishStreamRequest publishStreamRequest) { - BlockItem blockItem = publishStreamRequest.getBlockItem(); + final BlockItem blockItem = publishStreamRequest.getBlockItem(); streamMediator.publishEvent(blockItem); - // Send a response back to the upstream producer - final PublishStreamResponse publishStreamResponse = - PublishStreamResponse.newBuilder().setBlockNumber(blockItem.getId()).build(); - publishStreamResponseObserver.onNext(publishStreamResponse); + try { + // Send a response back to the upstream producer + // TODO: Use real hash + final ItemAcknowledgement itemAck = + ItemAcknowledgement.newBuilder() + .setItemAck(ByteString.copyFrom(getFakeHash(blockItem))) + .build(); + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder() + .setAcknowledgement(itemAck) + .build(); + publishStreamResponseObserver.onNext(publishStreamResponse); + } catch (IOException | NoSuchAlgorithmException e) { + LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e); + } } /** @@ -87,4 +102,18 @@ public void onCompleted() { LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed"); publishStreamResponseObserver.onCompleted(); } + + private static byte[] getFakeHash(BlockItem blockItem) throws IOException, NoSuchAlgorithmException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(blockItem); + } + + // Get the serialized bytes + byte[] serializedObject = byteArrayOutputStream.toByteArray(); + + // Calculate the SHA-256 hash + MessageDigest digest = MessageDigest.getInstance("SHA-384"); + return digest.digest(serializedObject); + } } diff --git a/server/src/test/resources/producer.sh b/server/src/test/resources/producer.sh index 8076ef94..a74621a5 100755 --- a/server/src/test/resources/producer.sh +++ b/server/src/test/resources/producer.sh @@ -49,19 +49,17 @@ trap cleanup SIGINT do if [[ i -eq 1 ]]; then - echo "{\"block_item\": {\"block_header\": $iter,\"id\": $i,\"value\": \"Lorem ipsum dolor sit amet\"}}" + echo "{\"block_item\": {\"block_header\": $iter,\"value\": \"Payload[...]\"}}" else - echo "{\"block_item\": {\"block_header\": -1,\"id\": $i,\"value\": \"est dolor nulla\"}}" - fi - - - if [ $iter -eq $2 ]; then - exit 0 + echo "{\"block_item\": {\"block_header\": 0,\"value\": \"Payload[...]\"}}" fi sleep 0.5 done + if [ $iter -eq $2 ]; then + exit 0 + fi ((iter++)) done