Skip to content

Commit

Permalink
fix:refactoring, boosted test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 22, 2024
1 parent bbc4066 commit cf15057
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 98 deletions.
11 changes: 8 additions & 3 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,20 @@ public static void main(final String[] args) {

// Initialize the reader and writer for the block storage
final BlockWriter<BlockItem> blockWriter =
new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockReader<Block> blockReader =
new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new LiveStreamMediatorImpl(
new WriteThroughCacheHandler(blockReader, blockWriter)));
new WriteThroughCacheHandler(blockReader, blockWriter),
(streamMediator) -> {
LOGGER.log(
System.Logger.Level.ERROR,
"Shutting down the server due to an error.");
}));

// Start the web server
WebServer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem);

subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.hedera.block.server.mediator;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.BlockItem;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.data.ObjectEvent;
Expand All @@ -26,10 +27,12 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/**
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
Expand All @@ -49,22 +52,25 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
BatchEventProcessor<ObjectEvent<BlockItem>>>
subscribers = new HashMap<>();

private final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler;
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
private final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback;

/**
* Constructor for the LiveStreamMediatorImpl class.
*
* @param blockPersistenceHandler the block persistence handler
*/
public LiveStreamMediatorImpl(
final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler) {
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
this.blockPersistenceHandler = blockPersistenceHandler;

// Initialize and start the disruptor
final Disruptor<ObjectEvent<BlockItem>> disruptor =
new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
this.ringBuffer = disruptor.start();
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
this.shutdownCallback = shutdownCallback;
}

@Override
Expand All @@ -77,9 +83,10 @@ public void publishEvent(BlockItem blockItem) {
// Block persistence
try {
blockPersistenceHandler.persist(blockItem);
} catch (Exception e) {
} catch (IOException e) {
// TODO: Push back on the producer?
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e);
shutdownCallback.accept(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
public interface BlockPersistenceHandler<U, V> {

/** Persists a block. */
void persist(final V blockItem) throws IOException;
void persist(final U blockItem) throws IOException;

/**
* Reads a block.
*
* @param blockNumber the number of the block to read
* @return an Optional of the block
*/
Optional<U> read(final long blockNumber);
Optional<V> read(final long blockNumber);

/**
* Reads a range of blocks.
Expand All @@ -46,5 +46,5 @@ public interface BlockPersistenceHandler<U, V> {
* @param endBlockNumber the id of the last block to read
* @return a queue of blocks
*/
Queue<U> readRange(final long startBlockNumber, final long endBlockNumber);
Queue<V> readRange(final long startBlockNumber, final long endBlockNumber);
}
32 changes: 0 additions & 32 deletions server/src/main/java/com/hedera/block/server/persistence/Util.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Write-Through cache handler coordinates between the block storage and the block cache to ensure
* the block is persisted to the storage before being cached.
*/
public class WriteThroughCacheHandler implements BlockPersistenceHandler<Block, BlockItem> {
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockItem, Block> {

private final BlockReader<Block> blockReader;
private final BlockWriter<BlockItem> blockWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@
import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;

import io.helidon.config.Config;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

public class FileSystemBlockReader implements BlockReader<Block> {
public class BlockAsDirReader implements BlockReader<Block> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

final Path blockNodeRootPath;

public FileSystemBlockReader(final String key, final Config config) {
public BlockAsDirReader(final String key, final Config config) {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");

Expand All @@ -57,8 +58,8 @@ public Optional<Block> read(final long blockNumber) {
}

// There may be thousands of BlockItem files in a Block directory.
// The BlockItems must be written into the Block object in order. A
// DirectoryStream will iterate in any guaranteed order. To avoid sorting,
// The BlockItems must be added to the Block object in order. A
// DirectoryStream will iterate without any guaranteed order. To avoid sorting,
// and to keep the retrieval process linear with the number of BlockItems,
// Run a loop to fetch in the order we need. The loop will break when
// it looks for a BlockItem file that does not exist.
Expand All @@ -72,6 +73,8 @@ public Optional<Block> read(final long blockNumber) {
continue;
}
} catch (IOException io) {
// Return an empty Optional signaling an error. It's all or nothing
// when retrieving a Block
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io);
return Optional.empty();
}
Expand All @@ -84,15 +87,23 @@ public Optional<Block> read(final long blockNumber) {
}

private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOException {

try (FileInputStream fis = new FileInputStream(blockItemPath)) {
return Optional.of(BlockItem.parseFrom(fis));
} catch (FileNotFoundException io) {
// The outer loop caller will continue to query
// for the next BlockItem file based on the index
// until the FileNotFoundException is thrown.
// It's expected that this exception will be caught
// at the end of every query.
return Optional.empty();
File f = new File(blockItemPath);
if (!f.exists()) {
// The outer loop caller will continue to query
// for the next BlockItem file based on the index
// until the FileNotFoundException is thrown.
// It's expected that this exception will be caught
// at the end of every query.
return Optional.empty();
}

// FileNotFound is thrown also when a file cannot be read.
// So re-throw here to make a different decision upstream.
throw io;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import java.nio.file.Files;
import java.nio.file.Path;

public class FileSystemBlockWriter implements BlockWriter<BlockItem> {
public class BlockAsDirWriter implements BlockWriter<BlockItem> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final Path blockNodeRootPath;
private long blockNodeFileNameIndex = 0;
private Path currentBlockDir;

public FileSystemBlockWriter(final String key, final Config config) throws IOException {
public BlockAsDirWriter(final String key, final Config config) throws IOException {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage");

Expand All @@ -49,16 +49,7 @@ public FileSystemBlockWriter(final String key, final Config config) throws IOExc
}

// Initialize the block node root directory if it does not exist
if (Files.notExists(blockNodeRootPath)) {
Files.createDirectory(blockNodeRootPath);
LOGGER.log(
System.Logger.Level.INFO,
"Created block node root directory: " + blockNodeRootPath);
} else {
LOGGER.log(
System.Logger.Level.INFO,
"Using existing block node root directory: " + blockNodeRootPath);
}
createPath(blockNodeRootPath, System.Logger.Level.INFO);

this.blockNodeRootPath = blockNodeRootPath;
}
Expand All @@ -77,7 +68,8 @@ public void write(final BlockItem blockItem) throws IOException {
"Successfully wrote the block item file: {0}",
blockItemFilePath);
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
LOGGER.log(
System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e);
}
}

Expand All @@ -87,7 +79,7 @@ private void resetState(final BlockItem blockItem) throws IOException {
currentBlockDir = Path.of(String.valueOf(blockItem.getHeader().getBlockNumber()));

// Construct the path to the block directory
createPath(blockNodeRootPath.resolve(currentBlockDir));
createPath(blockNodeRootPath.resolve(currentBlockDir), System.Logger.Level.DEBUG);

// Reset
blockNodeFileNameIndex = 0;
Expand All @@ -100,17 +92,13 @@ private String calculateBlockItemPath() {
return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION).toString();
}

private void createPath(Path blockNodePath) throws IOException {
private void createPath(Path blockNodePath, System.Logger.Level logLevel) throws IOException {
// Initialize the Block directory if it does not exist
if (Files.notExists(blockNodePath)) {
Files.createDirectory(blockNodePath);
LOGGER.log(
System.Logger.Level.INFO,
"Created block node root directory: " + blockNodePath);
LOGGER.log(logLevel, "Created block node root directory: " + blockNodePath);
} else {
LOGGER.log(
System.Logger.Level.INFO,
"Using existing block node root directory: " + blockNodePath);
LOGGER.log(logLevel, "Using existing block node root directory: " + blockNodePath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {
}

@Test
public void testStuff() throws InterruptedException {
public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException {
final var consumerBlockItemObserver =
new ConsumerBlockItemObserver(
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

// Send non-header BlockItems to validate that the observer does not send them
for (int i = 1; i <= 10; i++) {

if (i % 2 == 0) {
Expand Down
Loading

0 comments on commit cf15057

Please sign in to comment.