Skip to content

Commit

Permalink
refactor: configuration to use Platform SDK across all project (#101)
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 authored Aug 14, 2024
1 parent 87a1ac3 commit 7f48aec
Show file tree
Hide file tree
Showing 29 changed files with 787 additions and 360 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ gradle-app.setting
.DS_Store
# .env files
server/docker/.env
/server/data/
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,31 @@ Please do not file a public ticket mentioning the vulnerability. Refer to the se

---

# Running Locally
# Usage

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) Configuration variables
```
export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
```
3) Optional Configuration variables
## Configuration

| Environment Variable | Description | Default Value |
|---------------------------------|---------------------------------------------------------------------------------------------------------------|---------------|
| persistence.storage.rootPath | The root path for the storage, if not provided will attempt to create a `data` on the working dir of the app. | ./data |
| consumer.timeoutThresholdMillis | Time to wait for subscribers before disconnecting in milliseconds | 1500 |



# Starting locally:
```bash
./gradlew run
```
export BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD="<NumberInMiliseconds>" #Default is 1500

In debug mode, you can attach a debugger to the port 5005.
```bash
./gradlew run --debug-jvm
```

3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode
Also you can run on docker locally:
```bash
./gradlew startDockerContainer
```

# Running Tests
1) ./gradlew build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ServiceStatus serviceStatus;
Expand All @@ -55,8 +54,6 @@ public class BlockStreamService implements GrpcService {
* Constructor for the BlockStreamService class. It initializes the BlockStreamService with the
* given parameters.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to
* publish block items
* @param itemAckBuilder the item acknowledgement builder to send responses back to the producer
* @param streamMediator the stream mediator to proxy block items from the producer to the
* subscribers and manage the subscription lifecycle for subscribers
Expand All @@ -66,15 +63,13 @@ public class BlockStreamService implements GrpcService {
* stop the service and web server in the event of an unrecoverable exception
*/
BlockStreamService(
final long timeoutThresholdMillis,
@NonNull final ItemAckBuilder itemAckBuilder,
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.blockReader = blockReader;
Expand Down Expand Up @@ -143,7 +138,7 @@ void subscribeBlockStream(
@NonNull
final var streamObserver =
new ConsumerStreamResponseObserver(
timeoutThresholdMillis,
blockNodeContext,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver);
Expand Down
9 changes: 0 additions & 9 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@
public final class Constants {
private Constants() {}

/** Constant mapped to the root path config key where the block files are stored */
@NonNull
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

/** Constant mapped to the timeout for stream consumers in milliseconds */
@NonNull
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
"blocknode.server.consumer.timeout.threshold";

/** Constant mapped to the name of the service in the .proto file */
@NonNull public static final String SERVICE_NAME = "BlockStreamGrpcService";

Expand Down
31 changes: 8 additions & 23 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@
package com.hedera.block.server;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.Constants.BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY;
import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.config.BlockNodeContextFactory;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorBuilder;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.read.BlockAsDirReaderBuilder;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.producer.ItemAckBuilder;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import java.io.IOException;
Expand All @@ -56,17 +54,11 @@ public static void main(final String[] args) {
// init context, metrics, and configuration.
@NonNull final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();

// Set the global configuration
@NonNull final Config config = Config.create();
Config.global(config);

@NonNull final ServiceStatus serviceStatus = new ServiceStatusImpl();

@NonNull
final BlockWriter<BlockItem> blockWriter =
BlockAsDirWriterBuilder.newBuilder(
BLOCKNODE_STORAGE_ROOT_PATH_KEY, config, blockNodeContext)
.build();
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
LiveStreamMediatorBuilder.newBuilder(
Expand All @@ -75,13 +67,16 @@ public static void main(final String[] args) {

@NonNull
final BlockReader<Block> blockReader =
BlockAsDirReaderBuilder.newBuilder(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)
BlockAsDirReaderBuilder.newBuilder(
blockNodeContext
.configuration()
.getConfigData(PersistenceStorageConfig.class))
.build();

@NonNull
final BlockStreamService blockStreamService =
buildBlockStreamService(
config, streamMediator, blockReader, serviceStatus, blockNodeContext);
streamMediator, blockReader, serviceStatus, blockNodeContext);

@NonNull
final GrpcRouting.Builder grpcRouting =
Expand All @@ -104,24 +99,14 @@ public static void main(final String[] args) {

@NonNull
private static BlockStreamService buildBlockStreamService(
@NonNull final Config config,
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

return new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus,
blockNodeContext);
new ItemAckBuilder(), streamMediator, blockReader, serviceStatus, blockNodeContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.hedera.block.server.config;

import com.google.auto.service.AutoService;
import com.hedera.block.server.consumer.ConsumerConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.swirlds.common.config.BasicCommonConfig;
import com.swirlds.common.metrics.config.MetricsConfig;
import com.swirlds.common.metrics.platform.prometheus.PrometheusConfig;
Expand All @@ -28,6 +30,11 @@
@AutoService(ConfigurationExtension.class)
public class BlockNodeConfigExtension implements ConfigurationExtension {

/** Explicitly defined constructor. */
public BlockNodeConfigExtension() {
super();
}

/**
* {@inheritDoc}
*
Expand All @@ -36,6 +43,11 @@ public class BlockNodeConfigExtension implements ConfigurationExtension {
@NonNull
@Override
public Set<Class<? extends Record>> getConfigDataTypes() {
return Set.of(BasicCommonConfig.class, MetricsConfig.class, PrometheusConfig.class);
return Set.of(
BasicCommonConfig.class,
MetricsConfig.class,
PrometheusConfig.class,
ConsumerConfig.class,
PersistenceStorageConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.consumer;

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

/**
* Use this configuration across the consumer package.
*
* @param timeoutThresholdMillis after this time of inactivity, the consumer will be considered
* timed out and will be disconnected
*/
@ConfigData("consumer")
public record ConsumerConfig(@ConfigProperty(defaultValue = "1500") long timeoutThresholdMillis) {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.lmax.disruptor.EventHandler;
Expand Down Expand Up @@ -66,23 +67,25 @@ public class ConsumerStreamResponseObserver
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to
* publish block items
* @param context contains the context with metrics and configuration for the application
* @param producerLivenessClock the clock to use to determine the producer liveness
* @param subscriptionHandler the subscription handler to use to manage the subscription
* lifecycle
* @param subscribeStreamResponseObserver the observer to use to send responses to the consumer
*/
public ConsumerStreamResponseObserver(
final long timeoutThresholdMillis,
@NonNull final BlockNodeContext context,
@NonNull final InstantSource producerLivenessClock,
@NonNull
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>>
subscriptionHandler,
@NonNull
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.timeoutThresholdMillis =
context.configuration()
.getConfigData(ConsumerConfig.class)
.timeoutThresholdMillis();
this.subscriptionHandler = subscriptionHandler;

// The ServerCallStreamObserver can be configured with Runnable handlers to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class LiveStreamMediatorImpl
// Initialize and start the disruptor
@NonNull
final Disruptor<ObjectEvent<SubscribeStreamResponse>> disruptor =
// TODO: replace ring buffer size with a configurable value, create a MediatorConfig
new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
this.ringBuffer = disruptor.start();
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
package com.hedera.block.server.persistence.storage;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;

/** Util methods provide common functionality for the storage package. */
public final class Util {
private Util() {}
/** FileUtils methods provide common functionality for the storage package. */
public final class FileUtils {

private static final System.Logger LOGGER = System.getLogger(FileUtils.class.getName());

private FileUtils() {}

/**
* Default file permissions defines the file and directory for the storage package.
Expand All @@ -42,4 +48,26 @@ private Util() {}
PosixFilePermission.GROUP_EXECUTE,
PosixFilePermission.OTHERS_READ,
PosixFilePermission.OTHERS_EXECUTE));

/**
* Use this to create a Dir if it does not exist with the given permissions and log the result.
*
* @param blockNodePath the path to create
* @param logLevel the log level to use
* @param perms the permissions to use when creating the directory
* @throws IOException if the directory cannot be created
*/
public static void createPathIfNotExists(
@NonNull final Path blockNodePath,
@NonNull final System.Logger.Level logLevel,
@NonNull FileAttribute<Set<PosixFilePermission>> perms)
throws IOException {
// Initialize the Block directory if it does not exist
if (Files.notExists(blockNodePath)) {
Files.createDirectory(blockNodePath, perms);
LOGGER.log(logLevel, "Created block node root directory: " + blockNodePath);
} else {
LOGGER.log(logLevel, "Using existing block node root directory: " + blockNodePath);
}
}
}
Loading

0 comments on commit 7f48aec

Please sign in to comment.