Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live Stream processing support #23

Merged
merged 34 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0608e65
Issues 16, 17, 18 and 21
mattp-swirldslabs Jun 6, 2024
1916732
fix: logger to LOGGER per code review
mattp-swirldslabs Jun 25, 2024
c24dd2b
fix: missed a Logger entry
mattp-swirldslabs Jun 25, 2024
07df87d
fix: removed extra javadoc line
mattp-swirldslabs Jun 25, 2024
cd7ad67
fix: added final keyword per code review requests
mattp-swirldslabs Jun 25, 2024
f7f037b
fix: trimmed Empty protobuf definition
mattp-swirldslabs Jun 25, 2024
3e73a5d
fix: trimmed imports
mattp-swirldslabs Jun 25, 2024
d60a907
fix: changed from JUL to System.Logger
mattp-swirldslabs Jun 25, 2024
2f51579
fix: removing this prefixes when they are not needed
mattp-swirldslabs Jun 25, 2024
678d527
fixed: java file license headers
mattp-swirldslabs Jun 26, 2024
34d6acc
fix: removing hyphens and javadoc message grooming
mattp-swirldslabs Jun 26, 2024
95e4da4
fix: adjusted error logging to log a throwable as a separate parameter
mattp-swirldslabs Jun 26, 2024
48c8d90
fix: enabled mockito. changed LiveStreamObserverImpl to use Java Cloc…
mattp-swirldslabs Jun 26, 2024
606757c
fix: removed unnecessary static requires
mattp-swirldslabs Jun 26, 2024
b4b8245
fix: refactored code and tests to use InstantSource
mattp-swirldslabs Jun 26, 2024
f087319
fix: removed existing property until we can make use of application.y…
mattp-swirldslabs Jun 27, 2024
f25f2b2
fix: removed the manual scripts which write data.txt and the .gitigno…
mattp-swirldslabs Jun 27, 2024
def0452
fix: added API specification documentation for blockstream.proto
mattp-swirldslabs Jun 27, 2024
babcb8a
fix: added array wrap-around functionality for the TestClock
mattp-swirldslabs Jun 27, 2024
6d3e617
fix: added underscores to the TEST_TIME to improve readability
mattp-swirldslabs Jun 27, 2024
0418475
fix: removed unsubscribeall. we do not want a producer to evict all …
mattp-swirldslabs Jun 27, 2024
2f348e5
fix: replaced type with var to improve readability
mattp-swirldslabs Jun 27, 2024
4c46aa3
fix: allow the map item to be updated
mattp-swirldslabs Jun 27, 2024
6ff6a8d
fix: remove caching
mattp-swirldslabs Jun 27, 2024
1e3386e
fixed: switched queue implementation details
mattp-swirldslabs Jun 27, 2024
c623b48
fixed: added error message
mattp-swirldslabs Jun 27, 2024
59e60fc
fix: added additional documentation regarding the service definition …
mattp-swirldslabs Jun 27, 2024
a03863f
fix: added additional method level documentation
mattp-swirldslabs Jun 27, 2024
59bd8e3
fix: adjusted logging to emit a message and the stacktrace
mattp-swirldslabs Jun 28, 2024
95eb138
upgrade: upgraded to Helidon 4.0.10
mattp-swirldslabs Jun 28, 2024
8d22594
fix: removed helidon greet http routing
mattp-swirldslabs Jun 28, 2024
6d3bfe6
fix: added end of file newline
mattp-swirldslabs Jun 28, 2024
845c4bd
fix: added TODO to make hardcoded value configurable
mattp-swirldslabs Jun 28, 2024
e87e233
fix: added newline
mattp-swirldslabs Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ gradle-app.setting
.project

# JDT-specific (Eclipse Java Development Tools)
.classpath
.classpath

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ to [[email protected]](mailto:[email protected]).

Please do not file a public ticket mentioning the vulnerability. Refer to the security policy defined in the [SECURITY.md](https://github.com/hashgraph/hedera-sourcify/blob/main/SECURITY.md).

---

# Running Locally

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode

# Running Tests
1) ./gradlew build
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ extraJavaModuleInfo {
}
module("io.grpc:grpc-util", "io.grpc.util")
module("io.perfmark:perfmark-api", "io.perfmark")
}

module("junit:junit", "junit")
module("org.mockito:mockito-core", "org.mockito")
module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter")
}
1 change: 1 addition & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
io.grpc=io.grpc:grpc-stub
grpc.protobuf=io.grpc:grpc-protobuf
81 changes: 81 additions & 0 deletions protos/src/main/protobuf/blockstream.proto
rbair23 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
syntax = "proto3";

/*-
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";

/**
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
* exchanging blocks with the Block Node server.
*
* A producer (e.g. Consensus Node) can use the StreamSink method to stream blocks to the
* Block Node server. The Block Node server will respond with a BlockResponse message for
* each block received.
*
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
* blocks from the server. The consumer is expected to respond with a BlockResponse message
* with the id of each block received.
*/
service BlockStreamGrpc {

/**
* StreamSink is a bidirectional streaming method that allows a producer to stream blocks
* to the Block Node server. The server will respond with a BlockResponse message for each
* block received.
*/
rpc StreamSink(stream Block) returns (stream BlockResponse) {}

/**
* StreamSource is a bidirectional streaming method that allows a consumer to request a
* stream of blocks from the server. The consumer is expected to respond with a BlockResponse
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
}

/**
* A block is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message Block {
/**
* The id of the block. Each block id should be unique.
*/
int64 id = 1;

/**
* The value of the block. The value can be any string.
*/
string value = 2;
}

/**
* A block response is a simple message that contains an id.
* The block response message is simply meant to disambiguate it
* from the original request. This specification is a simple
* example meant to expedite development. It will be replaced with
* a PBJ implementation in the future.
*/
message BlockResponse {
/**
* The id of the block which was received. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
}
15 changes: 0 additions & 15 deletions protos/src/main/protobuf/echo.proto

This file was deleted.

7 changes: 3 additions & 4 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ application {
}

testModuleInfo {
// requires("org.assertj.core")
// requires("net.bytebuddy")
// requires("org.junit.jupiter.api")
// requires("org.junit.jupiter.params")
requires("org.junit.jupiter.api")
requires("org.mockito")
requires("org.mockito.junit.jupiter")
}
134 changes: 134 additions & 0 deletions server/src/main/java/com/hedera/block/server/BlockStreamService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;

import static com.hedera.block.server.Constants.*;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
*/
public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
}

/**
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
*
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
*/
@Override
public Descriptors.FileDescriptor proto() {
return BlockStreamServiceGrpcProto.getDescriptor();
}

/**
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in
* the proto file.
*
* @return the service name corresponding to the service name in the proto file
*/
@Override
public String serviceName() {
return SERVICE_NAME;
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
@Override
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

// Subscribe the observer to the mediator
streamMediator.subscribe(streamObserver);

return streamObserver;
}
}


32 changes: 32 additions & 0 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

/**
* Constants used in the BlockNode service.
*/
public final class Constants {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Would ServiceContracts be more appropriate.
I feel like this is one of those files that will get overloaded with many values over time

private Constants() {}

// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
}
38 changes: 0 additions & 38 deletions server/src/main/java/com/hedera/block/server/EchoService.java

This file was deleted.

Loading