diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index 466a6829..35e9b0e6 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -57,11 +57,18 @@ message Block { */ message BlockItem { - BlockHeader block_header = 1; + oneof items { + BlockHeader block_header = 1; + BlockProof state_proof = 2; + } - string value = 2; + string value = 3; } message BlockHeader { uint64 block_number = 1; +} + +message BlockProof { + uint64 block = 1; } \ No newline at end of file diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 78a741be..68c88523 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -16,9 +16,6 @@ package com.hedera.block.server; -import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.server.Constants.*; - import com.hedera.block.server.mediator.LiveStreamMediatorImpl; import com.hedera.block.server.persistence.WriteThroughCacheHandler; import com.hedera.block.server.persistence.storage.BlockStorage; @@ -28,8 +25,12 @@ import io.helidon.config.Config; import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcRouting; + import java.io.IOException; +import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.server.Constants.*; + /** Main class for the block node server */ public class Server { diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index c59c8e13..0238ad53 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -50,7 +50,6 @@ public class ConsumerBlockItemObserver private final StreamMediator, SubscribeStreamRequest> streamMediator; - private boolean isReachedFirstBlock; /** * Constructor for the LiveStreamObserverImpl class. @@ -99,16 +98,10 @@ public void onEvent(final ObjectEvent event, final long l, final bool producerLivenessMillis = producerLivenessClock.millis(); final BlockItem blockItem = event.get(); - if (!isReachedFirstBlock && blockItem.getBlockHeader() > 0) { - isReachedFirstBlock = true; - } - - if (isReachedFirstBlock) { - final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); - subscribeStreamResponseObserver.onNext(subscribeStreamResponse); - } + subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } /** diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 00e46590..b0d802e3 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -16,9 +16,6 @@ package com.hedera.block.server.mediator; -import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest; - import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.persistence.BlockPersistenceHandler; @@ -27,11 +24,14 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static com.hedera.block.protos.BlockStreamService.*; + /** * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible * for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies @@ -51,7 +51,7 @@ public class LiveStreamMediatorImpl BatchEventProcessor>> subscribers = new HashMap<>(); - private final BlockPersistenceHandler blockPersistenceHandler; + private final BlockPersistenceHandler blockPersistenceHandler; /** * Constructor for the LiveStreamMediatorImpl class. @@ -59,7 +59,7 @@ public class LiveStreamMediatorImpl * @param blockPersistenceHandler the block persistence handler */ public LiveStreamMediatorImpl( - final BlockPersistenceHandler blockPersistenceHandler) { + final BlockPersistenceHandler blockPersistenceHandler) { this.blockPersistenceHandler = blockPersistenceHandler; // Initialize and start the disruptor diff --git a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java index 68a5cc1e..819afa3f 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java @@ -30,10 +30,8 @@ public interface BlockPersistenceHandler { /** * Persists a block. * - * @param blockNumber the block to persist - * @return the id of the block */ - Long persist(final V blockItem, final long blockNumber); + void persist(final V blockItem); /** * Reads a block. diff --git a/server/src/main/java/com/hedera/block/server/persistence/Util.java b/server/src/main/java/com/hedera/block/server/persistence/Util.java new file mode 100644 index 00000000..cde8792f --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/Util.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence; + +import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.BlockHeader; + +public final class Util { + private Util() {} + + public static long getBlockNumber(final Block block) { + return getBlockHeader(block).getBlockNumber(); + } + + public static BlockHeader getBlockHeader(final Block block) { + return block.getBlockItems(0).getBlockHeader(); + } +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java index c0629db6..a5cd76e9 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -18,8 +18,8 @@ import com.hedera.block.server.persistence.storage.BlockStorage; -import static com.hedera.block.protos.BlockStreamService.BlockItem; import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.BlockItem; import java.util.LinkedList; import java.util.Optional; @@ -45,15 +45,10 @@ public WriteThroughCacheHandler(final BlockStorage blockStorag /** * Persists the block to the block storage and cache the block. * - * @param blockItem the block to persist - * @return the block id */ @Override - public Long persist(final BlockItem blockItem, final long blockNumber) { - - // Write-Through cache - blockStorage.write(blockItem, blockNumber); - return blockItem.getBlockHeader().getBlockNumber(); + public void persist(final BlockItem blockItem) { + blockStorage.write(blockItem); } /** diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java index f19b6d41..afd3a452 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java @@ -23,19 +23,17 @@ * * @param the type of block to store */ -public interface BlockStorage { +public interface BlockStorage { /** * Writes a block to storage. - * - * @return the id of the block */ - Optional write(final V block, final long blockNumber); + void write(final V blockItem); /** * Reads a block from storage. * * @return the block */ - Optional read(final long blockNumber); + Optional read(final long blockNumber); } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java index b67c8c4f..22d920a5 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java @@ -24,10 +24,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; import java.util.Optional; import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.Block.Builder; import static com.hedera.block.protos.BlockStreamService.BlockItem; import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY; @@ -37,10 +37,14 @@ */ public class FileSystemBlockStorage implements BlockStorage { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + public static final String BLOCK_FILE_EXTENSION = ".blk"; private final Path blockNodeRootPath; - private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private long currentIndex = 1; + private Path currentBlockDir; /** * Constructs a FileSystemBlockStorage object. @@ -50,12 +54,11 @@ public class FileSystemBlockStorage implements BlockStorage { * @throws IOException if an I/O error occurs while initializing the block node root directory */ public FileSystemBlockStorage(final String key, final Config config) throws IOException { - LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage"); - LOGGER.log(System.Logger.Level.INFO, config.toString()); blockNodeRootPath = Path.of(config.get(key).asString().get()); + LOGGER.log(System.Logger.Level.INFO, config.toString()); LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath); if (!blockNodeRootPath.isAbsolute()) { @@ -79,20 +82,20 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx /** * Writes a block to the filesystem. * - * @param blockItem the block to write - * @return the id of the block */ @Override - public Optional write(final BlockItem blockItem, final long blockNumber) { - final String fullPath = resolvePath(blockNumber); - try (FileOutputStream fos = new FileOutputStream(fullPath)) { - blockItem.writeTo(fos); - LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath); - - return Optional.of(blockNumber); - } catch (IOException e) { - LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); - return Optional.empty(); + public void write(final BlockItem blockItem) { + + try { + final String blockItemFilePath = getAbsoluteFilePath(blockItem); + try (FileOutputStream fos = new FileOutputStream(blockItemFilePath)) { + blockItem.writeTo(fos); + LOGGER.log(System.Logger.Level.INFO, "Successfully wrote the block item file: {0}", blockItemFilePath); + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e); + } + } catch (IOException io) { + LOGGER.log(System.Logger.Level.ERROR, "Error calculating the block item path", io); } } @@ -104,27 +107,82 @@ public Optional write(final BlockItem blockItem, final long blockNumber) { */ @Override public Optional read(final long id) { - return read(resolvePath(id)); + + final Builder builder = Block.newBuilder(); + final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id)); + return read(blockPath, builder); } - private Optional read(final String filePath) { + private Optional read(final Path blockPath, final Builder builder) { + + // Directly count and add BlockItems into the Block + // to keep the retrieval process O(BlockItems) + boolean isEnd = false; + for (int i = 1;!isEnd;i++) { + final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION); + final Optional blockItemOpt = readBlockItem(blockItemPath.toString()); + if (blockItemOpt.isPresent()) { + builder.addBlockItems(blockItemOpt.get()); + continue; + } + + isEnd = true; + } + + return Optional.of(builder.build()); + } - try (FileInputStream fis = new FileInputStream(filePath)) { + private Optional readBlockItem(final String blockItemPath) { + try (FileInputStream fis = new FileInputStream(blockItemPath)) { return Optional.of(BlockItem.parseFrom(fis)); } catch (FileNotFoundException io) { - LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io); return Optional.empty(); } catch (IOException io) { - throw new RuntimeException("Error reading file: " + filePath, io); + throw new RuntimeException("Error reading file: " + blockItemPath, io); } } - private String resolvePath(final long blockNumber) { + private String getAbsoluteFilePath(final BlockItem blockItem) throws IOException { - String fileName = id + BLOCK_FILE_EXTENSION; - Path fullPath = blockNodeRootPath.resolve(fileName); - LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath); + if (blockItem.hasBlockHeader()) { - return fullPath.toString(); + // A "block" is a directory of blockItems. Create the "block" + // based on the block_number + currentBlockDir = Path.of(String.valueOf(blockItem.getBlockHeader().getBlockNumber())); + + final Path blockPath = blockNodeRootPath.resolve(currentBlockDir); + createPath(blockPath); + + // Build the path to the BlockHeader.blk file + currentIndex = 1; + return blockPath.resolve(currentIndex + BLOCK_FILE_EXTENSION).toString(); + } + + // Build the path to a .blk file + final Path blockPath = blockNodeRootPath.resolve(currentBlockDir); + return blockPath.resolve(++currentIndex + BLOCK_FILE_EXTENSION).toString(); + } + + private void createPath(Path blockNodePath) throws IOException { + // Initialize the block node root directory if it does not exist + if (Files.notExists(blockNodePath)) { + Files.createDirectory(blockNodePath); + LOGGER.log( + System.Logger.Level.INFO, + "Created block node root directory: " + blockNodePath); + } else { + LOGGER.log( + System.Logger.Level.INFO, + "Using existing block node root directory: " + blockNodePath); + } } + +// private String resolvePath(final long blockNumber) { +// +// String fileName = blockNumber + BLOCK_FILE_EXTENSION; +// Path fullPath = blockNodeRootPath.resolve(fileName); +// LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath); +// +// return fullPath.toString(); +// } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java index d2172fa8..afad3662 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java +++ b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java @@ -35,18 +35,26 @@ public static List generateBlocks(int numOfBlocks) { List blockItems = new ArrayList<>(); for (int j = 0; j < 10; j++) { - if (j == 0) { - blockItems.add( - BlockItem.newBuilder() - .setBlockHeader( - BlockStreamService.BlockHeader.newBuilder().setBlockNumber(i + 1).build() - ).setValue("block-item-" + j + 1).build()); - } else { - blockItems.add( - BlockItem.newBuilder() - .setValue("block-item-" + j + 1).build()); + switch (j) { + case 0: + blockItems.add( + BlockItem.newBuilder() + .setBlockHeader( + BlockStreamService.BlockHeader.newBuilder().setBlockNumber(i + 1).build() + ).setValue("block-item-" + (j + 1)).build()); + break; + case 9: + blockItems.add( + BlockItem.newBuilder() + .setStateProof( + BlockStreamService.BlockProof.newBuilder().setBlock(i + 1).build() + ).build()); + break; + default: + blockItems.add( + BlockItem.newBuilder() + .setValue("block-item-" + (j + 1)).build()); } - } Builder builder = Block.newBuilder(); diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java index add73dca..ac5a3b93 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -16,16 +16,18 @@ package com.hedera.block.server.persistence; +import com.hedera.block.server.persistence.storage.BlockStorage; +import org.junit.jupiter.api.Test; + +import java.util.*; + import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.Block.Builder; import static com.hedera.block.protos.BlockStreamService.BlockItem; import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import com.hedera.block.server.persistence.storage.BlockStorage; -import java.util.*; -import org.junit.jupiter.api.Test; - public class RangeTest { @Test @@ -36,14 +38,10 @@ public void testReadRangeWithEvenEntries() { BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - List blocks = generateBlocks(numOfBlocks); - for (Block block : blocks) { - long blockNumber = -1; + + for (Block block : generateBlocks(numOfBlocks)) { for (BlockItem blockItem : block.getBlockItemsList()) { - if (blockItem.getBlockHeader() != null) { - blockNumber = blockItem.getBlockHeader().getBlockNumber(); - } - blockPersistenceHandler.persist(blockItem, blockNumber); + blockPersistenceHandler.persist(blockItem); } } @@ -57,9 +55,9 @@ public void testReadRangeWithEvenEntries() { public void testReadRangeWithNoBlocks() { int maxEntries = 100; - BlockPersistenceHandler blockPersistenceHandler = + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - Queue results = blockPersistenceHandler.readRange(1, 100); + Queue results = blockPersistenceHandler.readRange(1, 100); assertNotNull(results); assertEquals(0, results.size()); } @@ -69,16 +67,18 @@ public void testReadRangeWhenBlocksLessThanWindow() { int maxEntries = 100; int numOfBlocks = 9; - BlockPersistenceHandler blockPersistenceHandler = + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - List blockItems = generateBlocks(numOfBlocks); - for (BlockItem blockItem : blockItems) { - blockPersistenceHandler.persist(blockItem); + List blocks = generateBlocks(numOfBlocks); + for (Block block : blocks) { + for (BlockItem blockItem : block.getBlockItemsList()) { + blockPersistenceHandler.persist(blockItem); + } } int window = 10; - Queue results = blockPersistenceHandler.readRange(1, window); + Queue results = blockPersistenceHandler.readRange(1, window); assertNotNull(results); assertEquals(numOfBlocks, results.size()); } @@ -86,56 +86,69 @@ public void testReadRangeWhenBlocksLessThanWindow() { private static void verifyReadRange( int window, int numOfWindows, - BlockPersistenceHandler blockPersistenceHandler) { + BlockPersistenceHandler blockPersistenceHandler) { for (int j = 0; j < numOfWindows; ++j) { int startBlockId = (j * window) + 1; int endBlockId = (startBlockId + window) - 1; - Queue results = blockPersistenceHandler.readRange(startBlockId, endBlockId); + Queue results = blockPersistenceHandler.readRange(startBlockId, endBlockId); for (int i = startBlockId; i <= endBlockId && results.peek() != null; ++i) { - BlockItem blockItem = results.poll(); - assertNotNull(blockItem); - assertEquals(i, blockItem.getId()); + Block block = results.poll(); + assertNotNull(block); + assertEquals(i, block.getBlockItems(0).getBlockHeader().getBlockNumber()); } } } - private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler( - int maxEntries) { + private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler(int maxEntries) { + // Mock up a simple, in-memory persistence handler - BlockStorage> blockStorage = new NoOpTestBlockStorage(); + BlockStorage blockStorage = new NoOpTestBlockStorage(); return new WriteThroughCacheHandler(blockStorage); } - private static class NoOpTestBlockStorage implements BlockStorage> { + private static class NoOpTestBlockStorage implements BlockStorage { private final Map> cache; + private long index; public NoOpTestBlockStorage() { this.cache = new HashMap<>(); } @Override - public Optional write(BlockItem blockItem) { - final long blockNumber = blockItem.getBlockHeader().getBlockNumber(); - cache.compute(blockNumber, (k, v) -> { + public void write(BlockItem blockItem) { + if (blockItem.hasBlockHeader()) { + index = blockItem.getBlockHeader().getBlockNumber(); + } + + cache.compute(index, (k, v) -> { if (v == null) { v = new ArrayList<>(); } v.add(blockItem); + return v; }); - - return Optional.of(blockNumber); } @Override - public Optional> read(long blockNumber) { + public Optional read(long blockNumber) { + + List blockItems = cache.get(blockNumber); + if (blockItems == null) { + return Optional.empty(); + } + + Builder builder = Block.newBuilder(); + for (BlockItem blockItem : blockItems) { + builder.addBlockItems(blockItem); + } - return Optional.ofNullable(cache.get(blockNumber)); + return Optional.of(builder.build()); } } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java index 03908802..3de20b65 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java @@ -16,27 +16,27 @@ package com.hedera.block.server.persistence; -import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -import com.hedera.block.server.persistence.storage.BlockStorage; import com.hedera.block.server.persistence.storage.FileSystemBlockStorage; import com.hedera.block.server.util.TestUtils; import io.helidon.config.Config; import io.helidon.config.MapConfigSource; import io.helidon.config.spi.ConfigSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; -import java.util.Optional; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; + +import static com.hedera.block.protos.BlockStreamService.Block; +import static com.hedera.block.protos.BlockStreamService.BlockItem; +import static com.hedera.block.server.persistence.PersistTestUtils.generateBlocks; +import static com.hedera.block.server.persistence.Util.getBlockNumber; +import static org.junit.jupiter.api.Assertions.assertEquals; public class WriteThroughCacheHandlerTest { @@ -66,83 +66,50 @@ public void tearDown() { @Test public void testMaxEntriesGreaterThanBlocks() throws IOException { - int numOfBlocks = 4; + int numOfBlocks = 100; FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); List blocks = generateBlocks(numOfBlocks); - verifyPersistenceHandler(blockItems, blockPersistenceHandler, testPath); + verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); } -// @Test -// public void testMaxEntriesEqualToBlocks() throws IOException { -// int numOfBlocks = 3; -// -// FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); -// BlockPersistenceHandler blockPersistenceHandler = -// new WriteThroughCacheHandler(blockStorage); -// -// List blockItems = generateBlocks(numOfBlocks); -// verifyPersistenceHandler(blockItems, blockPersistenceHandler, testPath); -// } -// -// @Test -// public void testMaxEntriesLessThanBlocks() throws IOException { -// int maxEntries = 3; -// int numOfBlocks = 4; -// -// BlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); -// BlockPersistenceHandler blockPersistenceHandler = -// new WriteThroughCacheHandler(blockStorage); -// -// List blocks = generateBlocks(numOfBlocks); -// verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); -// } - private static void verifyPersistenceHandler( List blocks, BlockPersistenceHandler blockPersistenceHandler, Path testPath) throws IOException { - final long blockNumber = 1; for (Block block : blocks) { - for (BlockItem blockItem : block.getBlockItemsList()) { - // Save the blockItem - blockPersistenceHandler.persist(blockItem, blockNumber); + final long blockNumber = getBlockNumber(block); + List blockItems = block.getBlockItemsList(); + for (int i = 0;i < blockItems.size();i++) { - // Read the block - long blockId = blockItem.getId(); - verifyPersistedBlockIsAccessible(blockId, blockPersistenceHandler); + // Persist the blockItem + BlockItem blockItem = blockItems.get(i); + blockPersistenceHandler.persist(blockItem); // Verify the block was written to the fs - verifyFileExists(blockId, blockItem, testPath); + verifyFileExists(blockItem, i + 1, blockNumber, testPath); } } } - private static void verifyPersistedBlockIsAccessible( - long blockId, BlockPersistenceHandler blockPersistenceHandler) { - - // Confirm the block is accessible - Optional blockOpt = blockPersistenceHandler.read(blockId); - if (blockOpt.isPresent()) { - assertEquals(blockId, blockOpt.get().getId()); - } else { - fail("Failed to persist block " + blockId); - } - } - - private static void verifyFileExists(long blockId, BlockItem blockItem, Path testPath) + private static void verifyFileExists( + final BlockItem blockItem, + final int index, + final long blockNumber, + final Path testPath) throws IOException { + // Verify the block was saved on the filesystem - Path fullTestPath = - testPath.resolve(blockItem.getId() + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); + Path fullTestPath = testPath + .resolve(String.valueOf(blockNumber)) + .resolve(index + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); try (FileInputStream fis = new FileInputStream(fullTestPath.toFile())) { BlockItem fetchedBlockItem = BlockItem.parseFrom(fis); - assertEquals(blockId, fetchedBlockItem.getId()); assertEquals(blockItem.getValue(), fetchedBlockItem.getValue()); } }