From cf15057786db9d1c9a142c1b0c9e35cf1419495e Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Mon, 22 Jul 2024 11:23:12 -0600 Subject: [PATCH] fix:refactoring, boosted test coverage Signed-off-by: Matt Peterson --- .../java/com/hedera/block/server/Server.java | 11 +- .../consumer/ConsumerBlockItemObserver.java | 2 + .../mediator/LiveStreamMediatorImpl.java | 15 +- .../persistence/BlockPersistenceHandler.java | 6 +- .../hedera/block/server/persistence/Util.java | 32 --- .../persistence/WriteThroughCacheHandler.java | 2 +- ...BlockReader.java => BlockAsDirReader.java} | 31 ++- ...BlockWriter.java => BlockAsDirWriter.java} | 30 +-- .../ConsumerBlockItemObserverTest.java | 3 +- .../mediator/LiveStreamMediatorImplTest.java | 44 +++- .../block/server/persistence/RangeTest.java | 10 +- .../WriteThroughCacheHandlerTest.java | 16 +- .../storage/BlockAsDirectoryTest.java | 190 ++++++++++++++++++ 13 files changed, 294 insertions(+), 98 deletions(-) delete mode 100644 server/src/main/java/com/hedera/block/server/persistence/Util.java rename server/src/main/java/com/hedera/block/server/persistence/storage/{FileSystemBlockReader.java => BlockAsDirReader.java} (78%) rename server/src/main/java/com/hedera/block/server/persistence/storage/{FileSystemBlockWriter.java => BlockAsDirWriter.java} (75%) create mode 100644 server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 3c0c3e4fa..39ce2ac71 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -67,15 +67,20 @@ public static void main(final String[] args) { // Initialize the reader and writer for the block storage final BlockWriter blockWriter = - new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); final BlockReader blockReader = - new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); final BlockStreamService blockStreamService = new BlockStreamService( consumerTimeoutThreshold, new LiveStreamMediatorImpl( - new WriteThroughCacheHandler(blockReader, blockWriter))); + new WriteThroughCacheHandler(blockReader, blockWriter), + (streamMediator) -> { + LOGGER.log( + System.Logger.Level.ERROR, + "Shutting down the server due to an error."); + })); // Start the web server WebServer.builder() diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index c89fc7a88..18b46b8b6 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -109,6 +109,8 @@ public void onEvent(final ObjectEvent event, final long l, final bool final SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem); + subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 84d77ffc4..bbc10a513 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -16,7 +16,8 @@ package com.hedera.block.server.mediator; -import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.BlockItem; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; @@ -26,10 +27,12 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; /** * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible @@ -49,7 +52,8 @@ public class LiveStreamMediatorImpl implements StreamMediator>> subscribers = new HashMap<>(); - private final BlockPersistenceHandler blockPersistenceHandler; + private final BlockPersistenceHandler blockPersistenceHandler; + private final Consumer, BlockItem>> shutdownCallback; /** * Constructor for the LiveStreamMediatorImpl class. @@ -57,7 +61,8 @@ public class LiveStreamMediatorImpl implements StreamMediator blockPersistenceHandler) { + final BlockPersistenceHandler blockPersistenceHandler, + final Consumer, BlockItem>> shutdownCallback) { this.blockPersistenceHandler = blockPersistenceHandler; // Initialize and start the disruptor @@ -65,6 +70,7 @@ public LiveStreamMediatorImpl( new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE); this.ringBuffer = disruptor.start(); this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + this.shutdownCallback = shutdownCallback; } @Override @@ -77,9 +83,10 @@ public void publishEvent(BlockItem blockItem) { // Block persistence try { blockPersistenceHandler.persist(blockItem); - } catch (Exception e) { + } catch (IOException e) { // TODO: Push back on the producer? LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e); + shutdownCallback.accept(this); } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java index 41a93da52..bcb89afa3 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java @@ -29,7 +29,7 @@ public interface BlockPersistenceHandler { /** Persists a block. */ - void persist(final V blockItem) throws IOException; + void persist(final U blockItem) throws IOException; /** * Reads a block. @@ -37,7 +37,7 @@ public interface BlockPersistenceHandler { * @param blockNumber the number of the block to read * @return an Optional of the block */ - Optional read(final long blockNumber); + Optional read(final long blockNumber); /** * Reads a range of blocks. @@ -46,5 +46,5 @@ public interface BlockPersistenceHandler { * @param endBlockNumber the id of the last block to read * @return a queue of blocks */ - Queue readRange(final long startBlockNumber, final long endBlockNumber); + Queue readRange(final long startBlockNumber, final long endBlockNumber); } diff --git a/server/src/main/java/com/hedera/block/server/persistence/Util.java b/server/src/main/java/com/hedera/block/server/persistence/Util.java deleted file mode 100644 index 7c5afcdf9..000000000 --- a/server/src/main/java/com/hedera/block/server/persistence/Util.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hedera.block.server.persistence; - -import static com.hedera.block.protos.BlockStreamService.Block; -import static com.hedera.block.protos.BlockStreamService.BlockHeader; - -public final class Util { - private Util() {} - - public static long getBlockNumber(final Block block) { - return getBlockHeader(block).getBlockNumber(); - } - - public static BlockHeader getBlockHeader(final Block block) { - return block.getBlockItems(0).getHeader(); - } -} diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java index 5db49d05d..0a6479d8f 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -30,7 +30,7 @@ * Write-Through cache handler coordinates between the block storage and the block cache to ensure * the block is persisted to the storage before being cached. */ -public class WriteThroughCacheHandler implements BlockPersistenceHandler { +public class WriteThroughCacheHandler implements BlockPersistenceHandler { private final BlockReader blockReader; private final BlockWriter blockWriter; diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java similarity index 78% rename from server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java rename to server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java index c8ffcac2e..cefa22b26 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java @@ -22,19 +22,20 @@ import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION; import io.helidon.config.Config; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Path; import java.util.Optional; -public class FileSystemBlockReader implements BlockReader { +public class BlockAsDirReader implements BlockReader { private final System.Logger LOGGER = System.getLogger(getClass().getName()); final Path blockNodeRootPath; - public FileSystemBlockReader(final String key, final Config config) { + public BlockAsDirReader(final String key, final Config config) { LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader"); @@ -57,8 +58,8 @@ public Optional read(final long blockNumber) { } // There may be thousands of BlockItem files in a Block directory. - // The BlockItems must be written into the Block object in order. A - // DirectoryStream will iterate in any guaranteed order. To avoid sorting, + // The BlockItems must be added to the Block object in order. A + // DirectoryStream will iterate without any guaranteed order. To avoid sorting, // and to keep the retrieval process linear with the number of BlockItems, // Run a loop to fetch in the order we need. The loop will break when // it looks for a BlockItem file that does not exist. @@ -72,6 +73,8 @@ public Optional read(final long blockNumber) { continue; } } catch (IOException io) { + // Return an empty Optional signaling an error. It's all or nothing + // when retrieving a Block LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io); return Optional.empty(); } @@ -84,15 +87,23 @@ public Optional read(final long blockNumber) { } private Optional readBlockItem(final String blockItemPath) throws IOException { + try (FileInputStream fis = new FileInputStream(blockItemPath)) { return Optional.of(BlockItem.parseFrom(fis)); } catch (FileNotFoundException io) { - // The outer loop caller will continue to query - // for the next BlockItem file based on the index - // until the FileNotFoundException is thrown. - // It's expected that this exception will be caught - // at the end of every query. - return Optional.empty(); + File f = new File(blockItemPath); + if (!f.exists()) { + // The outer loop caller will continue to query + // for the next BlockItem file based on the index + // until the FileNotFoundException is thrown. + // It's expected that this exception will be caught + // at the end of every query. + return Optional.empty(); + } + + // FileNotFound is thrown also when a file cannot be read. + // So re-throw here to make a different decision upstream. + throw io; } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java similarity index 75% rename from server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java rename to server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java index 70c233fed..578322974 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java @@ -26,7 +26,7 @@ import java.nio.file.Files; import java.nio.file.Path; -public class FileSystemBlockWriter implements BlockWriter { +public class BlockAsDirWriter implements BlockWriter { private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -34,7 +34,7 @@ public class FileSystemBlockWriter implements BlockWriter { private long blockNodeFileNameIndex = 0; private Path currentBlockDir; - public FileSystemBlockWriter(final String key, final Config config) throws IOException { + public BlockAsDirWriter(final String key, final Config config) throws IOException { LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage"); @@ -49,16 +49,7 @@ public FileSystemBlockWriter(final String key, final Config config) throws IOExc } // Initialize the block node root directory if it does not exist - if (Files.notExists(blockNodeRootPath)) { - Files.createDirectory(blockNodeRootPath); - LOGGER.log( - System.Logger.Level.INFO, - "Created block node root directory: " + blockNodeRootPath); - } else { - LOGGER.log( - System.Logger.Level.INFO, - "Using existing block node root directory: " + blockNodeRootPath); - } + createPath(blockNodeRootPath, System.Logger.Level.INFO); this.blockNodeRootPath = blockNodeRootPath; } @@ -77,7 +68,8 @@ public void write(final BlockItem blockItem) throws IOException { "Successfully wrote the block item file: {0}", blockItemFilePath); } catch (IOException e) { - LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); + LOGGER.log( + System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e); } } @@ -87,7 +79,7 @@ private void resetState(final BlockItem blockItem) throws IOException { currentBlockDir = Path.of(String.valueOf(blockItem.getHeader().getBlockNumber())); // Construct the path to the block directory - createPath(blockNodeRootPath.resolve(currentBlockDir)); + createPath(blockNodeRootPath.resolve(currentBlockDir), System.Logger.Level.DEBUG); // Reset blockNodeFileNameIndex = 0; @@ -100,17 +92,13 @@ private String calculateBlockItemPath() { return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION).toString(); } - private void createPath(Path blockNodePath) throws IOException { + private void createPath(Path blockNodePath, System.Logger.Level logLevel) throws IOException { // Initialize the Block directory if it does not exist if (Files.notExists(blockNodePath)) { Files.createDirectory(blockNodePath); - LOGGER.log( - System.Logger.Level.INFO, - "Created block node root directory: " + blockNodePath); + LOGGER.log(logLevel, "Created block node root directory: " + blockNodePath); } else { - LOGGER.log( - System.Logger.Level.INFO, - "Using existing block node root directory: " + blockNodePath); + LOGGER.log(logLevel, "Using existing block node root directory: " + blockNodePath); } } } diff --git a/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java index 5206079cc..5d29fd66d 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java @@ -84,7 +84,7 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException { } @Test - public void testStuff() throws InterruptedException { + public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException { final var consumerBlockItemObserver = new ConsumerBlockItemObserver( TIMEOUT_THRESHOLD_MILLIS, @@ -92,6 +92,7 @@ public void testStuff() throws InterruptedException { streamMediator, responseStreamObserver); + // Send non-header BlockItems to validate that the observer does not send them for (int i = 1; i <= 10; i++) { if (i % 2 == 0) { diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 824563008..9a1b2b4ba 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -20,17 +20,18 @@ import static com.hedera.block.server.util.TestClock.buildClockInsideWindow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.consumer.ConsumerBlockItemObserver; import com.hedera.block.server.data.ObjectEvent; +import com.hedera.block.server.persistence.BlockPersistenceHandler; import com.hedera.block.server.persistence.WriteThroughCacheHandler; import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.persistence.storage.BlockWriter; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -39,25 +40,28 @@ @ExtendWith(MockitoExtension.class) public class LiveStreamMediatorImplTest { - @Mock private BlockItemEventHandler> observer1; + private final Object lock = new Object(); + @Mock private BlockItemEventHandler> observer1; @Mock private BlockItemEventHandler> observer2; - @Mock private BlockItemEventHandler> observer3; @Mock private BlockReader blockReader; - @Mock private BlockWriter blockWriter; @Mock private StreamObserver streamObserver1; @Mock private StreamObserver streamObserver2; @Mock private StreamObserver streamObserver3; + @Mock private Consumer, BlockItem>> testCallback; + @Mock private BlockPersistenceHandler blockPersistenceHandler; + @Test public void testUnsubscribeEach() { final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter), testCallback); // Set up the subscribers streamMediator.subscribe(observer1); @@ -94,7 +98,8 @@ public void testUnsubscribeEach() { public void testMediatorPersistenceWithoutSubscribers() throws IOException { final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter), testCallback); final BlockItem blockItem = BlockItem.newBuilder().build(); @@ -112,7 +117,8 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup final long TIMEOUT_THRESHOLD_MILLIS = 100L; final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockReader, blockWriter)); + new LiveStreamMediatorImpl( + new WriteThroughCacheHandler(blockReader, blockWriter), testCallback); final var concreteObserver1 = new ConsumerBlockItemObserver( @@ -159,8 +165,8 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup streamMediator.publishEvent(blockItem); // TODO: Is there a better way? - synchronized (streamObserver1) { - streamObserver1.wait(2000); + synchronized (lock) { + lock.wait(50); } // Confirm each subscriber was notified of the new block @@ -172,4 +178,22 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup // called despite the absence of subscribers verify(blockWriter).write(blockItem); } + + @Test + public void testPublishEventExceptionHandling() throws IOException, InterruptedException { + final var streamMediator = + new LiveStreamMediatorImpl(blockPersistenceHandler, testCallback); + + final BlockItem blockItem = BlockItem.newBuilder().build(); + doThrow(new IOException("Test exception")).when(blockPersistenceHandler).persist(blockItem); + + streamMediator.publishEvent(blockItem); + + // TODO: Is there a better way? + synchronized (lock) { + lock.wait(50); + } + + verify(testCallback, times(1)).accept(streamMediator); + } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java index 01d08f4e6..47c8ca0a7 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -36,7 +36,7 @@ public void testReadRangeWithEvenEntries() throws IOException { int numOfBlocks = 100; - BlockPersistenceHandler blockPersistenceHandler = + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(); for (BlockItem blockItem : generateBlockItems(numOfBlocks)) { @@ -52,7 +52,7 @@ public void testReadRangeWithEvenEntries() throws IOException { @Test public void testReadRangeWithNoBlocks() { - BlockPersistenceHandler blockPersistenceHandler = + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(); Queue results = blockPersistenceHandler.readRange(1, 100); assertNotNull(results); @@ -63,7 +63,7 @@ public void testReadRangeWithNoBlocks() { public void testReadRangeWhenBlocksLessThanWindow() throws IOException { int numOfBlocks = 9; - BlockPersistenceHandler blockPersistenceHandler = + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(); List blockItems = generateBlockItems(numOfBlocks); for (BlockItem blockItem : blockItems) { @@ -80,7 +80,7 @@ public void testReadRangeWhenBlocksLessThanWindow() throws IOException { private static void verifyReadRange( int window, int numOfWindows, - BlockPersistenceHandler blockPersistenceHandler) { + BlockPersistenceHandler blockPersistenceHandler) { for (int j = 0; j < numOfWindows; ++j) { @@ -96,7 +96,7 @@ private static void verifyReadRange( } } - private static BlockPersistenceHandler + private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler() { // Mock up a simple, in-memory persistence handler diff --git a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java index 393867fb7..d289d814d 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java @@ -22,10 +22,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import com.hedera.block.server.persistence.storage.BlockAsDirReader; +import com.hedera.block.server.persistence.storage.BlockAsDirWriter; import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.persistence.storage.BlockWriter; -import com.hedera.block.server.persistence.storage.FileSystemBlockReader; -import com.hedera.block.server.persistence.storage.FileSystemBlockWriter; import com.hedera.block.server.util.TestUtils; import io.helidon.config.Config; import io.helidon.config.MapConfigSource; @@ -70,9 +70,9 @@ public void testMaxEntriesGreaterThanBlocks() throws IOException { int numOfBlocks = 10; - BlockReader blockReader = new FileSystemBlockReader(JUNIT, testConfig); - BlockWriter blockWriter = new FileSystemBlockWriter(JUNIT, testConfig); - BlockPersistenceHandler blockPersistenceHandler = + BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockReader, blockWriter); List blockItems = generateBlockItems(numOfBlocks); @@ -82,7 +82,7 @@ public void testMaxEntriesGreaterThanBlocks() throws IOException { private static void persistBlockItems( List blockItems, - BlockPersistenceHandler blockPersistenceHandler) + BlockPersistenceHandler blockPersistenceHandler) throws IOException { for (BlockItem blockItem : blockItems) { @@ -91,8 +91,8 @@ private static void persistBlockItems( } private static void verifyBlocks( - int numOfBlocks, BlockPersistenceHandler blockPersistenceHandler) - throws IOException { + int numOfBlocks, BlockPersistenceHandler blockPersistenceHandler) { + for (int i = 1; i <= numOfBlocks; i++) { Optional blockOpt = blockPersistenceHandler.read(i); if (blockOpt.isEmpty()) { diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java new file mode 100644 index 000000000..f84759e4c --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java @@ -0,0 +1,190 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage; + +import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION; +import static org.junit.jupiter.api.Assertions.*; + +import com.hedera.block.server.persistence.PersistTestUtils; +import com.hedera.block.server.util.TestUtils; +import io.helidon.config.Config; +import io.helidon.config.MapConfigSource; +import io.helidon.config.spi.ConfigSource; +import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class BlockAsDirectoryTest { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private static final String TEMP_DIR = "block-node-unit-test-dir"; + private static final String JUNIT = "my-junit-test"; + + private static final String NO_READ = "-wx-wx-wx"; + private static final String NO_WRITE = "r-xr-xr-x"; + + private Path testPath; + private Config testConfig; + + @BeforeEach + public void setUp() throws IOException { + testPath = Files.createTempDirectory(TEMP_DIR); + LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString()); + + Map testProperties = Map.of(JUNIT, testPath.toString()); + ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); + testConfig = Config.builder(testConfigSource).build(); + } + + @AfterEach + public void tearDown() { + TestUtils.deleteDirectory(testPath.toFile()); + } + + @Test + public void testWriterAndReaderHappyPath() throws IOException { + + // Write a block + List blockItems = PersistTestUtils.generateBlockItems(1); + BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + for (BlockItem blockItem : blockItems) { + blockWriter.write(blockItem); + } + + // Confirm the block + BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + Optional blockOpt = blockReader.read(1); + assertFalse(blockOpt.isEmpty()); + + boolean hasHeader = false; + boolean hasBlockProof = false; + boolean hasStartEvent = false; + + Block block = blockOpt.get(); + for (BlockItem blockItem : block.getBlockItemsList()) { + if (blockItem.hasHeader()) { + hasHeader = true; + } else if (blockItem.hasStateProof()) { + hasBlockProof = true; + } else if (blockItem.hasStartEvent()) { + hasStartEvent = true; + } + } + + assertTrue(hasHeader, "Block should have a header"); + assertTrue(hasBlockProof, "Block should have a block proof"); + assertTrue(hasStartEvent, "Block should have a start event"); + } + + @Test + public void testBlockDoesNotExist() { + BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + Optional blockOpt = blockReader.read(10000); + assertTrue(blockOpt.isEmpty()); + } + + @Test + public void testRemoveBlockReadPerms() throws IOException { + List blockItems = PersistTestUtils.generateBlockItems(1); + BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + for (BlockItem blockItem : blockItems) { + blockWriter.write(blockItem); + } + + // Make the block unreadable + removeBlockReadPerms(1, testConfig); + + BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + Optional blockOpt = blockReader.read(1); + assertTrue(blockOpt.isEmpty()); + } + + @Test + public void testRemoveBlockItemReadPerms() throws IOException { + List blockItems = PersistTestUtils.generateBlockItems(1); + BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + for (BlockItem blockItem : blockItems) { + blockWriter.write(blockItem); + } + + removeBlockItemReadPerms(1, 1, testConfig); + + BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + Optional blockOpt = blockReader.read(1); + assertTrue(blockOpt.isEmpty()); + } + + @Test + public void testRemoveBlockWritePerms() throws IOException { + + List blockItems = PersistTestUtils.generateBlockItems(1); + + // Change the permissions on the block node root directory + removeBlockWritePerms(testConfig); + BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + assertThrows(AccessDeniedException.class, () -> blockWriter.write(blockItems.get(0))); + } + + @Test + public void testConstructorWithInvalidPath() { + Map testProperties = Map.of(JUNIT, "invalid-path"); + ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); + Config testConfig = Config.builder(testConfigSource).build(); + assertThrows(IllegalArgumentException.class, () -> new BlockAsDirWriter(JUNIT, testConfig)); + } + + private void removeBlockReadPerms(int blockNumber, Config config) throws IOException { + + final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get()); + final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber)); + + Set perms = PosixFilePermissions.fromString(NO_READ); + Files.setPosixFilePermissions(blockPath, perms); + } + + private void removeBlockWritePerms(Config config) throws IOException { + + final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get()); + + Set perms = PosixFilePermissions.fromString(NO_WRITE); + Files.setPosixFilePermissions(blockNodeRootPath, perms); + } + + private void removeBlockItemReadPerms(int blockNumber, int blockItem, Config config) + throws IOException { + + final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get()); + final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber)); + final Path blockItemPath = blockPath.resolve(blockItem + BLOCK_FILE_EXTENSION); + + Set perms = PosixFilePermissions.fromString(NO_READ); + Files.setPosixFilePermissions(blockItemPath, perms); + } +}