Skip to content

Commit

Permalink
fix: using additional hedera-protobuf types
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 12, 2024
1 parent 133167a commit 73b0ce0
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 29 deletions.
11 changes: 7 additions & 4 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ message PublishStreamRequest {
}

message PublishStreamResponse {
uint64 block_number = 1;
ItemAcknowledgement acknowledgement = 1;
}

message ItemAcknowledgement {
bytes item_ack = 1;
}

message SubscribeStreamRequest {
Expand All @@ -40,6 +44,7 @@ message SubscribeStreamRequest {

message SubscribeStreamResponse {
int32 status = 1;
BlockItem block_item = 2;
}

/**
Expand All @@ -51,7 +56,5 @@ message BlockItem {

int64 block_header = 1;

uint64 id = 2;

string value = 3;
string value = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
import static com.hedera.block.server.Constants.*;

import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -101,7 +102,7 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
}

private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(
final StreamObserver<BlockItem> subscribeStreamRequestObserver) {
final StreamObserver<SubscribeStreamResponse> subscribeStreamRequestObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamService;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
Expand All @@ -37,7 +40,7 @@ public class ConsumerBlockItemObserver

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final StreamObserver<BlockItem> subscribeStreamResponseObserver;
private final StreamObserver<BlockStreamService.SubscribeStreamResponse> subscribeStreamResponseObserver;

private final long timeoutThresholdMillis;
private final InstantSource producerLivenessClock;
Expand All @@ -58,7 +61,7 @@ public ConsumerBlockItemObserver(
final long timeoutThresholdMillis,
final InstantSource producerLivenessClock,
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
final StreamObserver<BlockItem> subscribeStreamResponseObserver) {
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.producerLivenessClock = producerLivenessClock;
Expand All @@ -72,7 +75,7 @@ public ConsumerBlockItemObserver(
// Unfortunately we have to cast the responseStreamObserver to a
// ServerCallStreamObserver
// to register the onCancelHandler.
((ServerCallStreamObserver<BlockItem>) subscribeStreamResponseObserver)
((ServerCallStreamObserver<SubscribeStreamResponse>) subscribeStreamResponseObserver)
.setOnCancelHandler(
() -> {
LOGGER.log(
Expand Down Expand Up @@ -102,7 +105,12 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
}

if (isReachedFirstBlock) {
subscribeStreamResponseObserver.onNext(blockItem);
SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse
.newBuilder()
.setBlockItem(blockItem)
.build();

subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public LiveStreamMediatorImpl(
public void publishEvent(BlockItem blockItem) {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem);
LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem);
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));

// Block persistence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Long persist(final BlockItem blockItem) {

// Write-Through cache
blockStorage.write(blockItem);
return blockItem.getId();
return blockItem.getBlockHeader();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
*/
@Override
public Optional<Long> write(final BlockItem blockItem) {
Long id = blockItem.getId();
Long id = blockItem.getBlockHeader();
final String fullPath = resolvePath(id);

try (FileOutputStream fos = new FileOutputStream(fullPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@

package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;

import com.google.protobuf.ByteString;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import static com.hedera.block.protos.BlockStreamService.*;

/**
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
* service implementation. Helidon calls methods on this class as networking events occur with the
Expand Down Expand Up @@ -57,13 +61,24 @@ public ProducerBlockItemObserver(
@Override
public void onNext(final PublishStreamRequest publishStreamRequest) {

BlockItem blockItem = publishStreamRequest.getBlockItem();
final BlockItem blockItem = publishStreamRequest.getBlockItem();
streamMediator.publishEvent(blockItem);

// Send a response back to the upstream producer
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().setBlockNumber(blockItem.getId()).build();
publishStreamResponseObserver.onNext(publishStreamResponse);
try {
// Send a response back to the upstream producer
// TODO: Use real hash
final ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
.build();
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder()
.setAcknowledgement(itemAck)
.build();
publishStreamResponseObserver.onNext(publishStreamResponse);
} catch (IOException | NoSuchAlgorithmException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e);
}
}

/**
Expand All @@ -87,4 +102,18 @@ public void onCompleted() {
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
publishStreamResponseObserver.onCompleted();
}

private static byte[] getFakeHash(BlockItem blockItem) throws IOException, NoSuchAlgorithmException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(blockItem);
}

// Get the serialized bytes
byte[] serializedObject = byteArrayOutputStream.toByteArray();

// Calculate the SHA-256 hash
MessageDigest digest = MessageDigest.getInstance("SHA-384");
return digest.digest(serializedObject);
}
}
12 changes: 5 additions & 7 deletions server/src/test/resources/producer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,17 @@ trap cleanup SIGINT
do

if [[ i -eq 1 ]]; then
echo "{\"block_item\": {\"block_header\": $iter,\"id\": $i,\"value\": \"Lorem ipsum dolor sit amet\"}}"
echo "{\"block_item\": {\"block_header\": $iter,\"value\": \"Payload[...]\"}}"
else
echo "{\"block_item\": {\"block_header\": -1,\"id\": $i,\"value\": \"est dolor nulla\"}}"
fi


if [ $iter -eq $2 ]; then
exit 0
echo "{\"block_item\": {\"block_header\": 0,\"value\": \"Payload[...]\"}}"
fi

sleep 0.5
done

if [ $iter -eq $2 ]; then
exit 0
fi
((iter++))

done
Expand Down

0 comments on commit 73b0ce0

Please sign in to comment.