Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: configuration to use Platform SDK across all project #101

Merged
merged 23 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6f79cc4
created a custom config data for consumer, with a single property: ti…
AlfredoG87 Aug 14, 2024
eb7667d
deleted unneeded application.yaml props file.
AlfredoG87 Aug 14, 2024
3bc6e9a
adding test app.properties for tests, with prometheus endpoint disabl…
AlfredoG87 Aug 14, 2024
e32db20
added scaffolding pieces for tests that need context and configuratio…
AlfredoG87 Aug 14, 2024
becb521
Changed Consumer package to use ConsumerConfig instead of timeoutThre…
AlfredoG87 Aug 14, 2024
67e4156
style fixes
AlfredoG87 Aug 14, 2024
736f5cb
Refactor Persistence Storage to use PersistenceStorageConfig
AlfredoG87 Aug 14, 2024
9fd4a32
improvements
AlfredoG87 Aug 14, 2024
bd13246
style fixes and removal of comment and log
AlfredoG87 Aug 14, 2024
915379e
readme changes and improvements
AlfredoG87 Aug 14, 2024
059b234
some variables refactor and improvements generally
AlfredoG87 Aug 14, 2024
9d454db
refactored BlockStreamService to send the whole context instead of ju…
AlfredoG87 Aug 14, 2024
e4a8ebf
style fix
AlfredoG87 Aug 14, 2024
3ba979a
more improvements to README.md
AlfredoG87 Aug 14, 2024
faf74d5
fixed Typo
AlfredoG87 Aug 14, 2024
79014d2
removed leftover comments
AlfredoG87 Aug 14, 2024
759f3c8
delete app.properties in test resources in favor of test.app.properti…
AlfredoG87 Aug 14, 2024
4a720e1
return to app.properties
AlfredoG87 Aug 14, 2024
ed109ad
removed unneeded exports on module-info.java
AlfredoG87 Aug 14, 2024
1f5713f
Refactored Util to FileUtils on persistence package.
AlfredoG87 Aug 14, 2024
ec5cca9
JavaDocs improvements
AlfredoG87 Aug 14, 2024
26aa6f5
style fix
AlfredoG87 Aug 14, 2024
60678e9
removed left comment
AlfredoG87 Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ gradle-app.setting
.DS_Store
# .env files
server/docker/.env
/server/data/
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,31 @@ Please do not file a public ticket mentioning the vulnerability. Refer to the se

---

# Running Locally
# Usage

1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
2) Configuration variables
```
export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
```
3) Optional Configuration variables
## Configuration

| Environment Variable | Description | Default Value |
|---------------------------------|---------------------------------------------------------------------------------------------------------------|---------------|
| persistence.storage.rootPath | The root path for the storage, if not provided will attempt to create a `data` on the working dir of the app. | ./data |
| consumer.timeoutThresholdMillis | Time to wait for subscribers before disconnecting in milliseconds | 1500 |



# Starting locally:
```bash
./gradlew run
```
export BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD="<NumberInMiliseconds>" #Default is 1500

In debug mode, you can attach a debugger to the port 5005.
```bash
./gradlew run --debug-jvm
```

3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode
Also you can run on docker locally:
```bash
./gradlew startDockerContainer
```

# Running Tests
1) ./gradlew build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ServiceStatus serviceStatus;
Expand All @@ -55,8 +54,6 @@ public class BlockStreamService implements GrpcService {
* Constructor for the BlockStreamService class. It initializes the BlockStreamService with the
* given parameters.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to
* publish block items
* @param itemAckBuilder the item acknowledgement builder to send responses back to the producer
* @param streamMediator the stream mediator to proxy block items from the producer to the
* subscribers and manage the subscription lifecycle for subscribers
Expand All @@ -66,15 +63,13 @@ public class BlockStreamService implements GrpcService {
* stop the service and web server in the event of an unrecoverable exception
*/
BlockStreamService(
final long timeoutThresholdMillis,
@NonNull final ItemAckBuilder itemAckBuilder,
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.blockReader = blockReader;
Expand Down Expand Up @@ -143,7 +138,7 @@ void subscribeBlockStream(
@NonNull
final var streamObserver =
new ConsumerStreamResponseObserver(
timeoutThresholdMillis,
blockNodeContext,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver);
Expand Down
9 changes: 0 additions & 9 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@
public final class Constants {
private Constants() {}

/** Constant mapped to the root path config key where the block files are stored */
@NonNull
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";

/** Constant mapped to the timeout for stream consumers in milliseconds */
@NonNull
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
"blocknode.server.consumer.timeout.threshold";

/** Constant mapped to the name of the service in the .proto file */
@NonNull public static final String SERVICE_NAME = "BlockStreamGrpcService";

Expand Down
31 changes: 8 additions & 23 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@
package com.hedera.block.server;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.Constants.BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY;
import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.config.BlockNodeContextFactory;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorBuilder;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.read.BlockAsDirReaderBuilder;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.producer.ItemAckBuilder;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import java.io.IOException;
Expand All @@ -56,17 +54,11 @@ public static void main(final String[] args) {
// init context, metrics, and configuration.
@NonNull final BlockNodeContext blockNodeContext = BlockNodeContextFactory.create();

// Set the global configuration
@NonNull final Config config = Config.create();
Config.global(config);

@NonNull final ServiceStatus serviceStatus = new ServiceStatusImpl();

@NonNull
final BlockWriter<BlockItem> blockWriter =
BlockAsDirWriterBuilder.newBuilder(
BLOCKNODE_STORAGE_ROOT_PATH_KEY, config, blockNodeContext)
.build();
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
LiveStreamMediatorBuilder.newBuilder(
Expand All @@ -75,13 +67,16 @@ public static void main(final String[] args) {

@NonNull
final BlockReader<Block> blockReader =
BlockAsDirReaderBuilder.newBuilder(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)
BlockAsDirReaderBuilder.newBuilder(
blockNodeContext
.configuration()
.getConfigData(PersistenceStorageConfig.class))
.build();

@NonNull
final BlockStreamService blockStreamService =
buildBlockStreamService(
config, streamMediator, blockReader, serviceStatus, blockNodeContext);
streamMediator, blockReader, serviceStatus, blockNodeContext);

@NonNull
final GrpcRouting.Builder grpcRouting =
Expand All @@ -104,24 +99,14 @@ public static void main(final String[] args) {

@NonNull
private static BlockStreamService buildBlockStreamService(
@NonNull final Config config,
@NonNull
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
streamMediator,
@NonNull final BlockReader<Block> blockReader,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockNodeContext blockNodeContext) {

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

return new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
streamMediator,
blockReader,
serviceStatus,
blockNodeContext);
new ItemAckBuilder(), streamMediator, blockReader, serviceStatus, blockNodeContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.hedera.block.server.config;

import com.google.auto.service.AutoService;
import com.hedera.block.server.consumer.ConsumerConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.swirlds.common.config.BasicCommonConfig;
import com.swirlds.common.metrics.config.MetricsConfig;
import com.swirlds.common.metrics.platform.prometheus.PrometheusConfig;
Expand All @@ -28,6 +30,11 @@
@AutoService(ConfigurationExtension.class)
mattp-swirldslabs marked this conversation as resolved.
Show resolved Hide resolved
public class BlockNodeConfigExtension implements ConfigurationExtension {

/** Explicitly defined constructor. */
public BlockNodeConfigExtension() {
super();
}

/**
* {@inheritDoc}
*
Expand All @@ -36,6 +43,11 @@ public class BlockNodeConfigExtension implements ConfigurationExtension {
@NonNull
@Override
public Set<Class<? extends Record>> getConfigDataTypes() {
return Set.of(BasicCommonConfig.class, MetricsConfig.class, PrometheusConfig.class);
return Set.of(
BasicCommonConfig.class,
MetricsConfig.class,
PrometheusConfig.class,
ConsumerConfig.class,
PersistenceStorageConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.consumer;

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

/**
* Use this configuration across the consumer package.
*
* @param timeoutThresholdMillis after this time of inactivity, the consumer will be considered
* timed out and will be disconnected
*/
@ConfigData("consumer")
public record ConsumerConfig(@ConfigProperty(defaultValue = "1500") long timeoutThresholdMillis) {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.lmax.disruptor.EventHandler;
Expand Down Expand Up @@ -66,23 +67,25 @@ public class ConsumerStreamResponseObserver
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds for the producer to
* publish block items
* @param context contains the context with metrics and configuration for the application
* @param producerLivenessClock the clock to use to determine the producer liveness
* @param subscriptionHandler the subscription handler to use to manage the subscription
* lifecycle
* @param subscribeStreamResponseObserver the observer to use to send responses to the consumer
*/
public ConsumerStreamResponseObserver(
final long timeoutThresholdMillis,
@NonNull final BlockNodeContext context,
@NonNull final InstantSource producerLivenessClock,
@NonNull
final SubscriptionHandler<ObjectEvent<SubscribeStreamResponse>>
subscriptionHandler,
@NonNull
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.timeoutThresholdMillis =
context.configuration()
.getConfigData(ConsumerConfig.class)
.timeoutThresholdMillis();
this.subscriptionHandler = subscriptionHandler;

// The ServerCallStreamObserver can be configured with Runnable handlers to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class LiveStreamMediatorImpl
// Initialize and start the disruptor
@NonNull
final Disruptor<ObjectEvent<SubscribeStreamResponse>> disruptor =
// TODO: replace ring buffer size with a configurable value, create a MediatorConfig
new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
this.ringBuffer = disruptor.start();
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.persistence.storage;

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

/**
* Use this configuration across the persistent storage package
*
* @param rootPath provides the root path for saving block data, if you want to override it need to
* set it as persistence.storage.rootPath
*/
@ConfigData("persistence.storage")
public record PersistenceStorageConfig(@ConfigProperty(defaultValue = "") String rootPath) {
/**
* Constructor to set the default root path if not provided, it will be set to the data
* directory in the current working directory
*/
public PersistenceStorageConfig {
if (rootPath.isEmpty()) {
Path defaultPath = Paths.get(rootPath).toAbsolutePath().resolve("data");
if (!Files.exists(defaultPath)) {
try {
Files.createDirectories(defaultPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
rootPath = defaultPath.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;

import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.config.Config;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -49,18 +49,16 @@ class BlockAsDirReader implements BlockReader<Block> {
* Constructor for the BlockAsDirReader class. It initializes the BlockAsDirReader with the
* given parameters.
*
* @param key the key to retrieve the block node root path from the configuration
* @param config the configuration to retrieve the block node root path
* @param filePerms the file permissions to set on the block node root path
*/
BlockAsDirReader(
@NonNull final String key,
@NonNull final Config config,
@NonNull final PersistenceStorageConfig config,
@NonNull final FileAttribute<Set<PosixFilePermission>> filePerms) {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");

@NonNull final Path blockNodeRootPath = Path.of(config.get(key).asString().get());
@NonNull final Path blockNodeRootPath = Path.of(config.rootPath());

LOGGER.log(System.Logger.Level.INFO, config.toString());
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);
Expand Down
Loading
Loading