Skip to content

Commit

Permalink
Issues 16, 17, 18 and 21
Browse files Browse the repository at this point in the history
- added license headers to java files
- added bidirectional gRPC live streaming blocks implementation
  with 1 producer and N consumers

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 25, 2024
1 parent e1eeea3 commit 0608e65
Show file tree
Hide file tree
Showing 33 changed files with 1,789 additions and 78 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ gradle-app.setting
.project

# JDT-specific (Eclipse Java Development Tools)
.classpath
.classpath

data.txt

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ to [[email protected]](mailto:[email protected]).

Please do not file a public ticket mentioning the vulnerability. Refer to the security policy defined in the [SECURITY.md](https://github.com/hashgraph/hedera-sourcify/blob/main/SECURITY.md).

---

# Running Locally

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode

# Running Tests
1) ./gradlew build
1 change: 1 addition & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
io.grpc=io.grpc:grpc-stub
grpc.protobuf=io.grpc:grpc-protobuf:1.20.0
21 changes: 21 additions & 0 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";

service BlockStreamGrpc {
rpc StreamSink(stream Block) returns (stream BlockResponse) {}
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
}

message Block {
int64 id = 1;
string value = 2;
}

message BlockResponse {
int64 id = 1;
}

message Empty {
}
15 changes: 0 additions & 15 deletions protos/src/main/protobuf/echo.proto

This file was deleted.

7 changes: 2 additions & 5 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,5 @@ application {
}

testModuleInfo {
// requires("org.assertj.core")
// requires("net.bytebuddy")
// requires("org.junit.jupiter.api")
// requires("org.junit.jupiter.params")
}
requires("org.junit.jupiter.api")
}
131 changes: 131 additions & 0 deletions server/src/main/java/com/hedera/block/server/BlockStreamService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Hedera Block Node
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.consumer.LiveStreamObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.util.logging.Logger;

import static com.hedera.block.server.Constants.*;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
*
*/
public class BlockStreamService implements GrpcService {

private final Logger logger = Logger.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
public BlockStreamService(long timeoutThresholdMillis,
StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
}

/**
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
*
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
*/
@Override
public Descriptors.FileDescriptor proto() {
return BlockStreamServiceGrpcProto.getDescriptor();
}

/**
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in the proto file.
*
* @return the service name
*/
@Override
public String serviceName() {
return SERVICE_NAME;
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
@Override
public void update(Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses back to the producer.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumers
* via the streamMediator as well as sending responses back to the producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
logger.finer("Executing bidirectional streamSink method");

return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
*
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well as
* handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
logger.finer("Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
streamMediator,
responseStreamObserver);
streamMediator.subscribe(streamObserver);

return streamObserver;
}
}


31 changes: 31 additions & 0 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Hedera Block Node
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

public final class Constants {
private Constants() {}

// Config Constants
public static String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

// Constants specified in the service definition of the .proto file
public static String SERVICE_NAME = "BlockStreamGrpc";
public static String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static String SERVER_STREAMING_METHOD_NAME = "StreamSource";
}
38 changes: 0 additions & 38 deletions server/src/main/java/com/hedera/block/server/EchoService.java

This file was deleted.

94 changes: 76 additions & 18 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,96 @@
/*
* Hedera Block Node
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import com.hedera.block.protos.EchoServiceGrpcProto;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.cache.LRUCache;
import com.hedera.block.server.persistence.storage.BlockStorage;
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import io.helidon.webserver.http.HttpRouting;

import java.io.IOException;
import java.util.logging.Logger;
import java.util.stream.Stream;

import static com.hedera.block.server.Constants.*;

/**
* Main class for the block node server
*/
public class Server {
private Server() {
// Not meant to be instantiated
}

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;

private static final Logger logger = Logger.getLogger(Server.class.getName());

private Server() {}

/**
* Main entrypoint for the block node server
*
* @param args Command line arguments. Not used at present,
*/
public static void main(String[] args) {
WebServer.builder()
.port(8080)
.addRouting(HttpRouting.builder()
.get("/greet", (req, res) -> res.send("Hello World!")))
.addRouting(GrpcRouting.builder()
.service(new EchoService())
.unary(EchoServiceGrpcProto.getDescriptor(),
"EchoService",
"Echo",
Server::grpcEcho))
.build()
.start();
}

static void grpcEcho(EchoServiceGrpcProto.EchoRequest request, StreamObserver<EchoServiceGrpcProto.EchoResponse> responseObserver) {}
try {

// Set the global configuration
Config config = Config.create();
Config.global(config);

// Initialize the block storage, cache, and service
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(1000);
BlockStreamService blockStreamService = new BlockStreamService(1500,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)));

// Start the web server
WebServer.builder()
.port(8080)
.addRouting(HttpRouting.builder()
.get("/greet", (req, res) -> res.send("Hello World!")))
.addRouting(GrpcRouting.builder()
.service(blockStreamService)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod))
.build()
.start();

} catch (IOException e) {
logger.severe("There was an exception starting the server: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 0608e65

Please sign in to comment.