Skip to content

Commit

Permalink
Adding network latency simulator for block live stream consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Jun 29, 2024
1 parent 8e98b5b commit 66db659
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 5 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ Please do not file a public ticket mentioning the vulnerability. Refer to the se
# Running Locally

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
2) Configuration variables
```
export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
```
3) Optional Configuration variables
```
export BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD="<NumberInMiliseconds>" #Default is 1500
```

3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode

# Running Tests
1) ./gradlew build
1) ./gradlew build
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private Constants() {}

// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public static void main(final String[] args) {
final Config config = Config.create();
Config.global(config);

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

// TODO: Make timeoutThresholdMillis configurable
final BlockStreamService blockStreamService = new BlockStreamService(1500,
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));

// Start the web server
Expand Down
21 changes: 21 additions & 0 deletions server/src/test/network-latency-simulator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use an official Ubuntu base image
FROM ubuntu:latest

# Set environment variables
ENV DEBIAN_FRONTEND=noninteractive

# Install required packages
RUN apt-get update && \
apt-get install -y iproute2 iputils-ping curl net-tools iperf3 iptables kmod && \
curl -L https://github.com/fullstorydev/grpcurl/releases/download/v1.8.7/grpcurl_1.8.7_linux_x86_64.tar.gz -o grpcurl.tar.gz && \
tar -xvf grpcurl.tar.gz && mv grpcurl /usr/local/bin/grpcurl && rm grpcurl.tar.gz

# Copy scripts and protos folder into the container
COPY configure_latency.sh consumer.sh start.sh /usr/local/bin/
COPY protos /usr/local/protos

# Make the scripts executable
RUN chmod +x /usr/local/bin/configure_latency.sh /usr/local/bin/consumer.sh /usr/local/bin/start.sh

# Default command to run when starting the container
CMD ["bash", "-c", "start.sh"]
56 changes: 56 additions & 0 deletions server/src/test/network-latency-simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Network Latency Simulator

Due to the asynchronous nature of the Streaming Service, it is important to test the system under different network conditions, such as high latency, packet loss, and jitter. This tool allows you to simulate different network conditions by adding latency, packet loss, low bandwidth and jitter to the network traffic.

And making sure that a single `consumer` that is experiencing network issues does not affect the other `consumers` that are consuming the same stream from the BlockNode.

This test aims to make sure that the system is resilient to network issues and that a single consumer that is experiencing network issues does not affect the other consumers that are consuming the same stream from the BlockNode.

## Running Locally

1. Move to the `network-latency-simulator` directory.
2. Build the Docker Image with the network latency simulator.
```bash
cd server/src/test/network-latency-simulator
docker build -t network-latency-simulator .
```

3. Start the BlockNode Server. (Follow instructions on main README.md)
- Due to the Latency, the consumers might be disconnected from the BlockNode, since the current timeout is 1500 ms, you should increase the timeout to 100000ms to be able to correctly test the network issues. (see main README.md of the server for more details on how to change the timeout)
4. Start the producer and a single consumer (this consumer will be the control one without any network issues).
```bash
/server/src/test/resources/producer.sh 1 1000 # this will produce 1000 blocks
/server/src/test/resources/consumer.sh 1 1000 # this will consume 1000 blocks
```
5. Start the consumer inside the network latency simulator container, you can start as many as you want.
```bash
docker run -it --cap-add=NET_ADMIN latency-simulator
```

The consumer inside the container will start consuming the blocks from the BlockNode, and you can see the network issues being simulated.
The network latency simulator will simulate the following network issues:
- Latency, increases every 10 seconds (by default) by 1000ms
- Packet Loss (Drops 10% of the packets)
- Low Bandwidth, limits the bandwidth to 64kbps.
- Jitter, adds 500ms of jitter (latency variability) to the network.

There are some environment variables that you can set to change the behavior of the network latency simulator:

**configure_latency.sh:**
- `LATENCY_INCREASE_INTERVAL`: The interval in seconds to increase the latency, default is 10 seconds.
- `INITIAL_LATENCY`: The initial latency to start with, default is 500ms, once the MAX latency is reached, it will reset to the initial latency.
- `JITTER`: The jitter to add to the network, default is 500ms.
- `BANDWIDTH`: The bandwidth to limit the network to, default is 64kbps.
- `INCREASE_TIME`: The time in seconds to increase the latency, default is 10 (seconds).
- `MAX_LATENCY`: The maximum latency to reach, default is 12000 (ms).

**consumer.sh:**
- `GRPC_SERVER`: The gRPC server to connect to, default is `host.docker.internal:8080`, connects to the host BlockNode.
- `GRPC_METHOD`: The gRPC method to call, default is `BlockStreamGrpc/StreamSource`.
- `PATH_TO_PROTO`: The path to the proto file, default is `/usr/local/protos/blockstream.proto` (inside the container).
- `PROTO_IMPORT_PATH`: The import path of the proto file, default is `/usr/local/protos` (inside the container).

Example of how to set the environment variables when running the container:
```bash
docker run -it --cap-add=NET_ADMIN -e LATENCY_INCREASE_INTERVAL=5 -e INITIAL_LATENCY=1000 -e JITTER=1000 -e BANDWIDTH=128 -e INCREASE_TIME=5 -e MAX_LATENCY=10000
```
40 changes: 40 additions & 0 deletions server/src/test/network-latency-simulator/configure_latency.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash

# Default values
DEFAULT_INITIAL_LATENCY=1000
DEFAULT_JITTER=500
DEFAULT_BANDWIDTH=64
DEFAULT_INCREASE_TIME=10
MAX_LATENCY=12000

# Parameters with default values
INITIAL_LATENCY=${INITIAL_LATENCY:-$DEFAULT_INITIAL_LATENCY}
JITTER=${JITTER:-$DEFAULT_JITTER}
BANDWIDTH=${BANDWIDTH:-$DEFAULT_BANDWIDTH}
INCREASE_TIME=${INCREASE_TIME:-$DEFAULT_INCREASE_TIME}
CURRENT_LATENCY=$INITIAL_LATENCY
MAX_LATENCY=${MAX_LATENCY:-$MAX_LATENCY}

# Function to apply network configuration
apply_tc_config() {
# Remove any existing qdisc configuration on eth0
tc qdisc del dev eth0 root 2>/dev/null
# Apply the new latency and jitter configuration
tc qdisc add dev eth0 root handle 1: netem delay ${CURRENT_LATENCY}ms ${JITTER}ms
# Apply the bandwidth limitation
tc qdisc add dev eth0 parent 1:1 handle 10: tbf rate ${BANDWIDTH}kbit burst 32kbit latency 50ms
echo "Updated configuration: Latency = ${CURRENT_LATENCY}ms, Jitter = ${JITTER}ms, Bandwidth = ${BANDWIDTH}kbit"
}

# Initial configuration
apply_tc_config
echo "Initial configuration applied: Latency = ${CURRENT_LATENCY}ms, Jitter = ${JITTER}ms, Bandwidth = ${BANDWIDTH}kbit"

while true; do
sleep $INCREASE_TIME
CURRENT_LATENCY=$((CURRENT_LATENCY + 1000))
if [ $CURRENT_LATENCY -gt $MAX_LATENCY ]; then
CURRENT_LATENCY=$INITIAL_LATENCY
fi
apply_tc_config
done
64 changes: 64 additions & 0 deletions server/src/test/network-latency-simulator/consumer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash

usage_error() {
echo "Usage: $0 <integer> [positive-integer]"
exit 1
}

# Check if the first argument is provided and is an integer
if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then
usage_error
fi

# Check if the second argument is provided and if it's a positive integer
if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then
usage_error
fi

# If the script reaches here, the parameters are valid
echo "The provided integer is: $1"
if [ "$#" -eq 2 ]; then
echo "The optional positive integer is: $2"
fi

# Use environment variables or default values
GRPC_SERVER=${GRPC_SERVER:-"host.docker.internal:8080"}
GRPC_METHOD=${GRPC_METHOD:-"BlockStreamGrpc/StreamSource"}
PATH_TO_PROTO=${PATH_TO_PROTO:-"/usr/local/protos/blockstream.proto"}
PROTO_IMPORT_PATH=${PROTO_IMPORT_PATH:-"/usr/local/protos"}

# print the environment variables
echo "GRPC_SERVER: $GRPC_SERVER"
echo "GRPC_METHOD: $GRPC_METHOD"
echo "PATH_TO_PROTO: $PATH_TO_PROTO"

echo "Starting consumer..."

# Signal handler to handle SIGINT (Ctrl+C)
cleanup() {
echo "Received SIGINT, stopping..."
kill $GRPC_PID
exit 0
}

# Trap SIGINT
trap cleanup SIGINT

# Generate and push messages to the gRPC server as a consumer.
# Response block messages from the gRPC server are printed to stdout.
(
iter=$1
while true; do
echo "{\"id\": $iter}"

if [ $iter -eq $2 ]; then
exit 0
fi

((iter++))

# Configure the message speed
sleep 1

done
) | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD
81 changes: 81 additions & 0 deletions server/src/test/network-latency-simulator/protos/blockstream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
syntax = "proto3";

/*-
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";

/**
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
* exchanging blocks with the Block Node server.
*
* A producer (e.g. Consensus Node) can use the StreamSink method to stream blocks to the
* Block Node server. The Block Node server will respond with a BlockResponse message for
* each block received.
*
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
* blocks from the server. The consumer is expected to respond with a BlockResponse message
* with the id of each block received.
*/
service BlockStreamGrpc {

/**
* StreamSink is a bidirectional streaming method that allows a producer to stream blocks
* to the Block Node server. The server will respond with a BlockResponse message for each
* block received.
*/
rpc StreamSink(stream Block) returns (stream BlockResponse) {}

/**
* StreamSource is a bidirectional streaming method that allows a consumer to request a
* stream of blocks from the server. The consumer is expected to respond with a BlockResponse
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
}

/**
* A block is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message Block {
/**
* The id of the block. Each block id should be unique.
*/
int64 id = 1;

/**
* The value of the block. The value can be any string.
*/
string value = 2;
}

/**
* A block response is a simple message that contains an id.
* The block response message is simply meant to disambiguate it
* from the original request. This specification is a simple
* example meant to expedite development. It will be replaced with
* a PBJ implementation in the future.
*/
message BlockResponse {
/**
* The id of the block which was received. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
}
6 changes: 6 additions & 0 deletions server/src/test/network-latency-simulator/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# First Start consumer without any network latency so it connects without issues.
consumer.sh 1 1000 &
# Then start the network latency configuration script.
configure_latency.sh 500 500 64 &
# let it run
bash
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencyResolutionManagement {
version("com.google.protobuf", "3.24.0")
version("io.helidon.webserver.http2", "4.0.10")
version("io.helidon.webserver.grpc", "4.0.10")
version("io.helidon.config", "4.0.10")

// Testing only versions
version("org.assertj.core", "3.23.1")
Expand Down

0 comments on commit 66db659

Please sign in to comment.