Skip to content

Commit

Permalink
Live Stream processing support (#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs authored Jun 28, 2024
1 parent e1eeea3 commit 8e98b5b
Show file tree
Hide file tree
Showing 32 changed files with 1,791 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ gradle-app.setting
.project

# JDT-specific (Eclipse Java Development Tools)
.classpath
.classpath

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ to [[email protected]](mailto:[email protected]).

Please do not file a public ticket mentioning the vulnerability. Refer to the security policy defined in the [SECURITY.md](https://github.com/hashgraph/hedera-sourcify/blob/main/SECURITY.md).

---

# Running Locally

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode

# Running Tests
1) ./gradlew build
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ extraJavaModuleInfo {
}
module("io.grpc:grpc-util", "io.grpc.util")
module("io.perfmark:perfmark-api", "io.perfmark")
}

module("junit:junit", "junit")
module("org.mockito:mockito-core", "org.mockito")
module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter")
}
1 change: 1 addition & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
io.grpc=io.grpc:grpc-stub
grpc.protobuf=io.grpc:grpc-protobuf
81 changes: 81 additions & 0 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
syntax = "proto3";

/*-
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";

/**
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
* exchanging blocks with the Block Node server.
*
* A producer (e.g. Consensus Node) can use the StreamSink method to stream blocks to the
* Block Node server. The Block Node server will respond with a BlockResponse message for
* each block received.
*
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
* blocks from the server. The consumer is expected to respond with a BlockResponse message
* with the id of each block received.
*/
service BlockStreamGrpc {

/**
* StreamSink is a bidirectional streaming method that allows a producer to stream blocks
* to the Block Node server. The server will respond with a BlockResponse message for each
* block received.
*/
rpc StreamSink(stream Block) returns (stream BlockResponse) {}

/**
* StreamSource is a bidirectional streaming method that allows a consumer to request a
* stream of blocks from the server. The consumer is expected to respond with a BlockResponse
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
}

/**
* A block is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message Block {
/**
* The id of the block. Each block id should be unique.
*/
int64 id = 1;

/**
* The value of the block. The value can be any string.
*/
string value = 2;
}

/**
* A block response is a simple message that contains an id.
* The block response message is simply meant to disambiguate it
* from the original request. This specification is a simple
* example meant to expedite development. It will be replaced with
* a PBJ implementation in the future.
*/
message BlockResponse {
/**
* The id of the block which was received. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
}
15 changes: 0 additions & 15 deletions protos/src/main/protobuf/echo.proto

This file was deleted.

7 changes: 3 additions & 4 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ application {
}

testModuleInfo {
// requires("org.assertj.core")
// requires("net.bytebuddy")
// requires("org.junit.jupiter.api")
// requires("org.junit.jupiter.params")
requires("org.junit.jupiter.api")
requires("org.mockito")
requires("org.mockito.junit.jupiter")
}
134 changes: 134 additions & 0 deletions server/src/main/java/com/hedera/block/server/BlockStreamService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;

import static com.hedera.block.server.Constants.*;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
*/
public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
}

/**
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
*
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
*/
@Override
public Descriptors.FileDescriptor proto() {
return BlockStreamServiceGrpcProto.getDescriptor();
}

/**
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in
* the proto file.
*
* @return the service name corresponding to the service name in the proto file
*/
@Override
public String serviceName() {
return SERVICE_NAME;
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
@Override
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

// Subscribe the observer to the mediator
streamMediator.subscribe(streamObserver);

return streamObserver;
}
}


32 changes: 32 additions & 0 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

/**
* Constants used in the BlockNode service.
*/
public final class Constants {
private Constants() {}

// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
}
38 changes: 0 additions & 38 deletions server/src/main/java/com/hedera/block/server/EchoService.java

This file was deleted.

Loading

0 comments on commit 8e98b5b

Please sign in to comment.