diff --git a/server/src/main/java/com/hedera/block/server/Constants.java b/server/src/main/java/com/hedera/block/server/Constants.java index dbe8f226a..ce32a9c40 100644 --- a/server/src/main/java/com/hedera/block/server/Constants.java +++ b/server/src/main/java/com/hedera/block/server/Constants.java @@ -29,4 +29,7 @@ private Constants() {} public static final String SERVICE_NAME = "BlockStreamGrpcService"; public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream"; public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream"; + + // Constants used when interacting with the file system. + public static final String BLOCK_FILE_EXTENSION = ".blk"; } diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index c2176c749..3c0c3e4fa 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -21,8 +21,7 @@ import com.hedera.block.server.mediator.LiveStreamMediatorImpl; import com.hedera.block.server.persistence.WriteThroughCacheHandler; -import com.hedera.block.server.persistence.storage.BlockStorage; -import com.hedera.block.server.persistence.storage.FileSystemBlockStorage; +import com.hedera.block.server.persistence.storage.*; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; import io.helidon.config.Config; @@ -66,13 +65,17 @@ public static void main(final String[] args) { .asLong() .orElse(1500L); - // Initialize the block storage, cache, and service - final BlockStorage blockStorage = - new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + // Initialize the reader and writer for the block storage + final BlockWriter blockWriter = + new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + final BlockReader blockReader = + new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + final BlockStreamService blockStreamService = new BlockStreamService( consumerTimeoutThreshold, - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage))); + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter))); // Start the web server WebServer.builder() diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 23fb727b2..84d77ffc4 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -75,7 +75,12 @@ public void publishEvent(BlockItem blockItem) { ringBuffer.publishEvent((event, sequence) -> event.set(blockItem)); // Block persistence - blockPersistenceHandler.persist(blockItem); + try { + blockPersistenceHandler.persist(blockItem); + } catch (Exception e) { + // TODO: Push back on the producer? + LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e); + } } @Override diff --git a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java index 880a1351a..41a93da52 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java @@ -16,6 +16,7 @@ package com.hedera.block.server.persistence; +import java.io.IOException; import java.util.Optional; import java.util.Queue; @@ -28,7 +29,7 @@ public interface BlockPersistenceHandler { /** Persists a block. */ - void persist(final V blockItem); + void persist(final V blockItem) throws IOException; /** * Reads a block. diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java index a55179c42..5db49d05d 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -19,7 +19,9 @@ import static com.hedera.block.protos.BlockStreamService.Block; import static com.hedera.block.protos.BlockStreamService.BlockItem; -import com.hedera.block.server.persistence.storage.BlockStorage; +import com.hedera.block.server.persistence.storage.BlockReader; +import com.hedera.block.server.persistence.storage.BlockWriter; +import java.io.IOException; import java.util.LinkedList; import java.util.Optional; import java.util.Queue; @@ -30,21 +32,14 @@ */ public class WriteThroughCacheHandler implements BlockPersistenceHandler { - private final BlockStorage blockStorage; + private final BlockReader blockReader; + private final BlockWriter blockWriter; - /** - * Constructor for the WriteThroughCacheHandler class. - * - * @param blockStorage the block storage - */ - public WriteThroughCacheHandler(final BlockStorage blockStorage) { - this.blockStorage = blockStorage; - } - - /** Persists the block to the block storage and cache the block. */ - @Override - public void persist(final BlockItem blockItem) { - blockStorage.write(blockItem); + /** Constructor for the WriteThroughCacheHandler class. */ + public WriteThroughCacheHandler( + final BlockReader blockReader, final BlockWriter blockWriter) { + this.blockReader = blockReader; + this.blockWriter = blockWriter; } /** @@ -74,6 +69,12 @@ public Queue readRange(final long startBlockId, final long endBlockId) { */ @Override public Optional read(final long id) { - return blockStorage.read(id); + return blockReader.read(id); + } + + /** Persists the block to the block storage and cache the block. */ + @Override + public void persist(final BlockItem blockItem) throws IOException { + blockWriter.write(blockItem); } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java similarity index 63% rename from server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java rename to server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java index a6393bfac..b905430b2 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java @@ -18,20 +18,6 @@ import java.util.Optional; -/** - * The BlockStorage interface defines operations to write and read blocks to a persistent store. - * - * @param the type of block to store - */ -public interface BlockStorage { - - /** Writes a block to storage. */ - void write(final V blockItem); - - /** - * Reads a block from storage. - * - * @return the block - */ - Optional read(final long blockNumber); +public interface BlockReader { + Optional read(final long blockNumber); } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java new file mode 100644 index 000000000..531c6cfcc --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockWriter.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import java.io.IOException; + +public interface BlockWriter { + void write(final V blockItem) throws IOException; +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java new file mode 100644 index 000000000..c8ffcac2e --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.Block.Builder; +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION; + +import io.helidon.config.Config; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Optional; + +public class FileSystemBlockReader implements BlockReader { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + final Path blockNodeRootPath; + + public FileSystemBlockReader(final String key, final Config config) { + + LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader"); + + final Path blockNodeRootPath = Path.of(config.get(key).asString().get()); + + LOGGER.log(System.Logger.Level.INFO, config.toString()); + LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath); + + this.blockNodeRootPath = blockNodeRootPath; + } + + public Optional read(final long blockNumber) { + + // Construct the path to the requested Block + final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber)); + + // Verify attributes of the Block + if (!isVerified(blockPath)) { + return Optional.empty(); + } + + // There may be thousands of BlockItem files in a Block directory. + // The BlockItems must be written into the Block object in order. A + // DirectoryStream will iterate in any guaranteed order. To avoid sorting, + // and to keep the retrieval process linear with the number of BlockItems, + // Run a loop to fetch in the order we need. The loop will break when + // it looks for a BlockItem file that does not exist. + final Builder builder = Block.newBuilder(); + for (int i = 1; ; i++) { + final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION); + try { + final Optional blockItemOpt = readBlockItem(blockItemPath.toString()); + if (blockItemOpt.isPresent()) { + builder.addBlockItems(blockItemOpt.get()); + continue; + } + } catch (IOException io) { + LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io); + return Optional.empty(); + } + + break; + } + + // Return the Block + return Optional.of(builder.build()); + } + + private Optional readBlockItem(final String blockItemPath) throws IOException { + try (FileInputStream fis = new FileInputStream(blockItemPath)) { + return Optional.of(BlockItem.parseFrom(fis)); + } catch (FileNotFoundException io) { + // The outer loop caller will continue to query + // for the next BlockItem file based on the index + // until the FileNotFoundException is thrown. + // It's expected that this exception will be caught + // at the end of every query. + return Optional.empty(); + } + } + + private boolean isVerified(final Path blockPath) { + if (!blockPath.toFile().isDirectory()) { + LOGGER.log(System.Logger.Level.ERROR, "Block directory not found: " + blockPath); + return false; + } + + if (!blockPath.toFile().canRead()) { + LOGGER.log(System.Logger.Level.ERROR, "Block directory not readable: " + blockPath); + return false; + } + + return true; + } +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java deleted file mode 100644 index 9cb96c01c..000000000 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (C) 2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hedera.block.server.persistence.storage; - -import static com.hedera.block.protos.BlockStreamService.Block; -import static com.hedera.block.protos.BlockStreamService.Block.Builder; -import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY; - -import io.helidon.config.Config; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Optional; - -/** - * The FileSystemBlockStorage class implements the BlockStorage interface to store blocks to the - * filesystem. - */ -public class FileSystemBlockStorage implements BlockStorage { - - private final System.Logger LOGGER = System.getLogger(getClass().getName()); - - public static final String BLOCK_FILE_EXTENSION = ".blk"; - - private final Path blockNodeRootPath; - - private long currentIndex = 1; - private Path currentBlockDir; - - /** - * Constructs a FileSystemBlockStorage object. - * - * @param key the key to use to retrieve the block node root path from the configuration - * @param config the configuration - * @throws IOException if an I/O error occurs while initializing the block node root directory - */ - public FileSystemBlockStorage(final String key, final Config config) throws IOException { - LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage"); - - blockNodeRootPath = Path.of(config.get(key).asString().get()); - - LOGGER.log(System.Logger.Level.INFO, config.toString()); - LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath); - - if (!blockNodeRootPath.isAbsolute()) { - throw new IllegalArgumentException( - BLOCKNODE_STORAGE_ROOT_PATH_KEY + " must be an absolute path"); - } - - // Initialize the block node root directory if it does not exist - if (Files.notExists(blockNodeRootPath)) { - Files.createDirectory(blockNodeRootPath); - LOGGER.log( - System.Logger.Level.INFO, - "Created block node root directory: " + blockNodeRootPath); - } else { - LOGGER.log( - System.Logger.Level.INFO, - "Using existing block node root directory: " + blockNodeRootPath); - } - } - - /** Writes a block to the filesystem. */ - @Override - public void write(final BlockItem blockItem) { - - try { - final String blockItemFilePath = getAbsoluteFilePath(blockItem); - try (FileOutputStream fos = new FileOutputStream(blockItemFilePath)) { - blockItem.writeTo(fos); - LOGGER.log( - System.Logger.Level.INFO, - "Successfully wrote the block item file: {0}", - blockItemFilePath); - } catch (IOException e) { - LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); - } - } catch (IOException io) { - LOGGER.log(System.Logger.Level.ERROR, "Error calculating the block item path", io); - } - } - - /** - * Reads a block from the filesystem. - * - * @param id the id of the block to read - * @return the block - */ - @Override - public Optional read(final long id) { - - final Builder builder = Block.newBuilder(); - final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id)); - return read(blockPath, builder); - } - - private Optional read(final Path blockPath, final Builder builder) { - - // Directly count and add BlockItems into the Block - // to keep the retrieval process O(BlockItems) - boolean isEnd = false; - for (int i = 1; !isEnd; i++) { - final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION); - final Optional blockItemOpt = readBlockItem(blockItemPath.toString()); - if (blockItemOpt.isPresent()) { - builder.addBlockItems(blockItemOpt.get()); - continue; - } - - isEnd = true; - } - - return Optional.of(builder.build()); - } - - private Optional readBlockItem(final String blockItemPath) { - try (FileInputStream fis = new FileInputStream(blockItemPath)) { - return Optional.of(BlockItem.parseFrom(fis)); - } catch (FileNotFoundException io) { - return Optional.empty(); - } catch (IOException io) { - throw new RuntimeException("Error reading file: " + blockItemPath, io); - } - } - - private String getAbsoluteFilePath(final BlockItem blockItem) throws IOException { - - if (blockItem.hasHeader()) { - - // A "block" is a directory of blockItems. Create the "block" - // based on the block_number - currentBlockDir = Path.of(String.valueOf(blockItem.getHeader().getBlockNumber())); - - final Path blockPath = blockNodeRootPath.resolve(currentBlockDir); - createPath(blockPath); - - // Build the path to the BlockHeader.blk file - currentIndex = 1; - return blockPath.resolve(currentIndex + BLOCK_FILE_EXTENSION).toString(); - } - - // Build the path to a .blk file - final Path blockPath = blockNodeRootPath.resolve(currentBlockDir); - return blockPath.resolve(++currentIndex + BLOCK_FILE_EXTENSION).toString(); - } - - private void createPath(Path blockNodePath) throws IOException { - // Initialize the block node root directory if it does not exist - if (Files.notExists(blockNodePath)) { - Files.createDirectory(blockNodePath); - LOGGER.log( - System.Logger.Level.INFO, - "Created block node root directory: " + blockNodePath); - } else { - LOGGER.log( - System.Logger.Level.INFO, - "Using existing block node root directory: " + blockNodePath); - } - } - - // private String resolvePath(final long blockNumber) { - // - // String fileName = blockNumber + BLOCK_FILE_EXTENSION; - // Path fullPath = blockNodeRootPath.resolve(fileName); - // LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath); - // - // return fullPath.toString(); - // } -} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java new file mode 100644 index 000000000..70c233fed --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY; +import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION; + +import io.helidon.config.Config; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class FileSystemBlockWriter implements BlockWriter { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final Path blockNodeRootPath; + private long blockNodeFileNameIndex = 0; + private Path currentBlockDir; + + public FileSystemBlockWriter(final String key, final Config config) throws IOException { + + LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage"); + + final Path blockNodeRootPath = Path.of(config.get(key).asString().get()); + + LOGGER.log(System.Logger.Level.INFO, config.toString()); + LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath); + + if (!blockNodeRootPath.isAbsolute()) { + throw new IllegalArgumentException( + BLOCKNODE_STORAGE_ROOT_PATH_KEY + " must be an absolute path"); + } + + // Initialize the block node root directory if it does not exist + if (Files.notExists(blockNodeRootPath)) { + Files.createDirectory(blockNodeRootPath); + LOGGER.log( + System.Logger.Level.INFO, + "Created block node root directory: " + blockNodeRootPath); + } else { + LOGGER.log( + System.Logger.Level.INFO, + "Using existing block node root directory: " + blockNodeRootPath); + } + + this.blockNodeRootPath = blockNodeRootPath; + } + + public void write(final BlockItem blockItem) throws IOException { + + if (blockItem.hasHeader()) { + resetState(blockItem); + } + + final String blockItemFilePath = calculateBlockItemPath(); + try (FileOutputStream fos = new FileOutputStream(blockItemFilePath)) { + blockItem.writeTo(fos); + LOGGER.log( + System.Logger.Level.INFO, + "Successfully wrote the block item file: {0}", + blockItemFilePath); + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); + } + } + + private void resetState(final BlockItem blockItem) throws IOException { + // Here a "block" is represented as a directory of BlockItems. + // Create the "block" directory based on the block_number + currentBlockDir = Path.of(String.valueOf(blockItem.getHeader().getBlockNumber())); + + // Construct the path to the block directory + createPath(blockNodeRootPath.resolve(currentBlockDir)); + + // Reset + blockNodeFileNameIndex = 0; + } + + private String calculateBlockItemPath() { + // Build the path to a .blk file + final Path blockPath = blockNodeRootPath.resolve(currentBlockDir); + blockNodeFileNameIndex++; + return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION).toString(); + } + + private void createPath(Path blockNodePath) throws IOException { + // Initialize the Block directory if it does not exist + if (Files.notExists(blockNodePath)) { + Files.createDirectory(blockNodePath); + LOGGER.log( + System.Logger.Level.INFO, + "Created block node root directory: " + blockNodePath); + } else { + LOGGER.log( + System.Logger.Level.INFO, + "Using existing block node root directory: " + blockNodePath); + } + } +} diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 3b657e108..824563008 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -27,8 +27,10 @@ import com.hedera.block.server.consumer.ConsumerBlockItemObserver; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.persistence.WriteThroughCacheHandler; -import com.hedera.block.server.persistence.storage.BlockStorage; +import com.hedera.block.server.persistence.storage.BlockReader; +import com.hedera.block.server.persistence.storage.BlockWriter; import io.grpc.stub.StreamObserver; +import java.io.IOException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -43,7 +45,9 @@ public class LiveStreamMediatorImplTest { @Mock private BlockItemEventHandler> observer3; - @Mock private BlockStorage blockStorage; + @Mock private BlockReader blockReader; + + @Mock private BlockWriter blockWriter; @Mock private StreamObserver streamObserver1; @Mock private StreamObserver streamObserver2; @@ -53,7 +57,7 @@ public class LiveStreamMediatorImplTest { public void testUnsubscribeEach() { final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); // Set up the subscribers streamMediator.subscribe(observer1); @@ -87,10 +91,10 @@ public void testUnsubscribeEach() { } @Test - public void testMediatorPersistenceWithoutSubscribers() { + public void testMediatorPersistenceWithoutSubscribers() throws IOException { final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); final BlockItem blockItem = BlockItem.newBuilder().build(); @@ -99,16 +103,16 @@ public void testMediatorPersistenceWithoutSubscribers() { // Confirm the BlockStorage write method was // called despite the absence of subscribers - verify(blockStorage).write(blockItem); + verify(blockWriter).write(blockItem); } @Test - public void testMediatorPublishEventToSubscribers() throws InterruptedException { + public void testMediatorPublishEventToSubscribers() throws IOException, InterruptedException { final long TIMEOUT_THRESHOLD_MILLIS = 100L; final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); final var concreteObserver1 = new ConsumerBlockItemObserver( @@ -166,6 +170,6 @@ public void testMediatorPublishEventToSubscribers() throws InterruptedException // Confirm the BlockStorage write method was // called despite the absence of subscribers - verify(blockStorage).write(blockItem); + verify(blockWriter).write(blockItem); } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java index e2b20d939..dc803f7b4 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java +++ b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java @@ -16,8 +16,9 @@ package com.hedera.block.server.persistence; -import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.protos.BlockStreamService.Block.Builder; +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.protos.BlockStreamService.BlockProof; +import static com.hedera.block.protos.BlockStreamService.EventMetadata; import com.hedera.block.protos.BlockStreamService; import java.util.ArrayList; @@ -27,47 +28,39 @@ public final class PersistTestUtils { private PersistTestUtils() {} - public static List generateBlocks(int numOfBlocks) { + public static List generateBlockItems(int numOfBlocks) { - List blocks = new ArrayList<>(); - for (int i = 0; i < numOfBlocks; i++) { - - List blockItems = new ArrayList<>(); - for (int j = 0; j < 10; j++) { + List blockItems = new ArrayList<>(); + for (int i = 1; i <= numOfBlocks; i++) { + for (int j = 1; j <= 10; j++) { switch (j) { - case 0: + case 1: blockItems.add( BlockItem.newBuilder() .setHeader( BlockStreamService.BlockHeader.newBuilder() - .setBlockNumber(i + 1) + .setBlockNumber(i) .build()) - .setValue("block-item-" + (j + 1)) + .setValue("block-item-" + (j)) .build()); break; - case 9: + case 10: blockItems.add( BlockItem.newBuilder() - .setStateProof( - BlockStreamService.BlockProof.newBuilder() - .setBlock(i + 1) - .build()) + .setStateProof(BlockProof.newBuilder().setBlock(i).build()) .build()); break; default: blockItems.add( - BlockItem.newBuilder().setValue("block-item-" + (j + 1)).build()); + BlockItem.newBuilder() + .setStartEvent( + EventMetadata.newBuilder().setCreatorId(i).build()) + .build()); + break; } } - - Builder builder = Block.newBuilder(); - for (BlockItem bi : blockItems) { - builder.addBlockItems(bi); - } - - blocks.add(builder.build()); } - return blocks; + return blockItems; } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java index 1e50610cc..01d08f4e6 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -19,28 +19,28 @@ import static com.hedera.block.protos.BlockStreamService.Block; import static com.hedera.block.protos.BlockStreamService.Block.Builder; import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; +import static com.hedera.block.server.persistence.PersistTestUtils.generateBlockItems; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import com.hedera.block.server.persistence.storage.BlockStorage; +import com.hedera.block.server.persistence.storage.BlockReader; +import com.hedera.block.server.persistence.storage.BlockWriter; +import java.io.IOException; import java.util.*; import org.junit.jupiter.api.Test; public class RangeTest { @Test - public void testReadRangeWithEvenEntries() { + public void testReadRangeWithEvenEntries() throws IOException { int numOfBlocks = 100; BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(); - for (Block block : generateBlocks(numOfBlocks)) { - for (BlockItem blockItem : block.getBlockItemsList()) { - blockPersistenceHandler.persist(blockItem); - } + for (BlockItem blockItem : generateBlockItems(numOfBlocks)) { + blockPersistenceHandler.persist(blockItem); } int window = 10; @@ -60,16 +60,14 @@ public void testReadRangeWithNoBlocks() { } @Test - public void testReadRangeWhenBlocksLessThanWindow() { + public void testReadRangeWhenBlocksLessThanWindow() throws IOException { int numOfBlocks = 9; BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(); - List blocks = generateBlocks(numOfBlocks); - for (Block block : blocks) { - for (BlockItem blockItem : block.getBlockItemsList()) { - blockPersistenceHandler.persist(blockItem); - } + List blockItems = generateBlockItems(numOfBlocks); + for (BlockItem blockItem : blockItems) { + blockPersistenceHandler.persist(blockItem); } int window = 10; @@ -102,11 +100,12 @@ private static void verifyReadRange( generateInMemoryTestBlockPersistenceHandler() { // Mock up a simple, in-memory persistence handler - BlockStorage blockStorage = new NoOpTestBlockStorage(); - return new WriteThroughCacheHandler(blockStorage); + NoOpTestBlockStorage noOpTestBlockStorage = new NoOpTestBlockStorage(); + return new WriteThroughCacheHandler(noOpTestBlockStorage, noOpTestBlockStorage); } - private static class NoOpTestBlockStorage implements BlockStorage { + private static class NoOpTestBlockStorage + implements BlockReader, BlockWriter { private final Map> cache; private long index; diff --git a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java index 65768411f..393867fb7 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java @@ -18,21 +18,24 @@ import static com.hedera.block.protos.BlockStreamService.Block; import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; -import static com.hedera.block.server.persistence.Util.getBlockNumber; +import static com.hedera.block.server.persistence.PersistTestUtils.generateBlockItems; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; -import com.hedera.block.server.persistence.storage.FileSystemBlockStorage; +import com.hedera.block.server.persistence.storage.BlockReader; +import com.hedera.block.server.persistence.storage.BlockWriter; +import com.hedera.block.server.persistence.storage.FileSystemBlockReader; +import com.hedera.block.server.persistence.storage.FileSystemBlockWriter; import com.hedera.block.server.util.TestUtils; import io.helidon.config.Config; import io.helidon.config.MapConfigSource; import io.helidon.config.spi.ConfigSource; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,48 +68,55 @@ public void tearDown() { @Test public void testMaxEntriesGreaterThanBlocks() throws IOException { - int numOfBlocks = 100; + int numOfBlocks = 10; - FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); + BlockReader blockReader = new FileSystemBlockReader(JUNIT, testConfig); + BlockWriter blockWriter = new FileSystemBlockWriter(JUNIT, testConfig); BlockPersistenceHandler blockPersistenceHandler = - new WriteThroughCacheHandler(blockStorage); + new WriteThroughCacheHandler(blockReader, blockWriter); - List blocks = generateBlocks(numOfBlocks); - verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + List blockItems = generateBlockItems(numOfBlocks); + persistBlockItems(blockItems, blockPersistenceHandler); + verifyBlocks(numOfBlocks, blockPersistenceHandler); } - private static void verifyPersistenceHandler( - List blocks, - BlockPersistenceHandler blockPersistenceHandler, - Path testPath) + private static void persistBlockItems( + List blockItems, + BlockPersistenceHandler blockPersistenceHandler) throws IOException { - for (Block block : blocks) { - final long blockNumber = getBlockNumber(block); - List blockItems = block.getBlockItemsList(); - for (int i = 0; i < blockItems.size(); i++) { - - // Persist the blockItem - BlockItem blockItem = blockItems.get(i); - blockPersistenceHandler.persist(blockItem); - - // Verify the block was written to the fs - verifyFileExists(blockItem, i + 1, blockNumber, testPath); - } + for (BlockItem blockItem : blockItems) { + blockPersistenceHandler.persist(blockItem); } } - private static void verifyFileExists( - final BlockItem blockItem, final int index, final long blockNumber, final Path testPath) + private static void verifyBlocks( + int numOfBlocks, BlockPersistenceHandler blockPersistenceHandler) throws IOException { - - // Verify the block was saved on the filesystem - Path fullTestPath = - testPath.resolve(String.valueOf(blockNumber)) - .resolve(index + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); - try (FileInputStream fis = new FileInputStream(fullTestPath.toFile())) { - BlockItem fetchedBlockItem = BlockItem.parseFrom(fis); - assertEquals(blockItem.getValue(), fetchedBlockItem.getValue()); + for (int i = 1; i <= numOfBlocks; i++) { + Optional blockOpt = blockPersistenceHandler.read(i); + if (blockOpt.isEmpty()) { + fail("Block not found: " + i); + } else { + int j = 1; + Block block = blockOpt.get(); + for (BlockItem blockItem : block.getBlockItemsList()) { + + if (blockItem.hasHeader()) { + // Header type + assertEquals(i, blockItem.getHeader().getBlockNumber()); + assertEquals("block-item-" + j, blockItem.getValue()); + } else if (blockItem.hasStateProof()) { + // StateProof type + assertEquals(i, blockItem.getStateProof().getBlock()); + } else { + // EventMetadata type + assertEquals(i, blockItem.getStartEvent().getCreatorId()); + } + + j++; + } + } } } }