diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index bc04a625..785a619f 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -42,7 +42,7 @@ public class BlockStreamService implements GrpcService { private final System.Logger LOGGER = System.getLogger(getClass().getName()); private final long timeoutThresholdMillis; - private final StreamMediator, BlockStreamServiceGrpcProto.BlockResponse> streamMediator; + private final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator; /** * Constructor for the BlockStreamService class. @@ -50,7 +50,7 @@ public class BlockStreamService implements GrpcService { * @param timeoutThresholdMillis the timeout threshold in milliseconds */ public BlockStreamService(final long timeoutThresholdMillis, - final StreamMediator, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) { + final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.streamMediator = streamMediator; } @@ -92,11 +92,11 @@ public void update(final Routing routing) { * * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer. * - * @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer + * @return a custom StreamObserver to handle streaming blockItems from the producer to all subscribed consumer * via the streamMediator as well as sending responses back to the producer. */ - private StreamObserver streamSink( - final StreamObserver responseStreamObserver) { + private StreamObserver streamSink( + final StreamObserver responseStreamObserver) { LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method"); return new ProducerBlockItemObserver(streamMediator, responseStreamObserver); @@ -111,7 +111,7 @@ private StreamObserver streamSink( * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well * as handling responses from the consumer. */ - private StreamObserver streamSource(final StreamObserver responseStreamObserver) { + private StreamObserver streamSource(final StreamObserver responseStreamObserver) { LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method"); // Return a custom StreamObserver to handle streaming blocks from the producer. diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 7bd6454a..394ff6f8 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -38,8 +38,8 @@ public class Server { // Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class. - private static ServerCalls.BidiStreamingMethod, StreamObserver> clientBidiStreamingMethod; - private static ServerCalls.BidiStreamingMethod, StreamObserver> serverBidiStreamingMethod; + private static ServerCalls.BidiStreamingMethod, StreamObserver> clientBidiStreamingMethod; + private static ServerCalls.BidiStreamingMethod, StreamObserver> serverBidiStreamingMethod; private static final System.Logger LOGGER = System.getLogger(Server.class.getName()); @@ -62,7 +62,7 @@ public static void main(final String[] args) { final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L); // Initialize the block storage, cache, and service - final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold, new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage))); diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index ebe9f0ed..4501a4ee 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -29,11 +29,11 @@ * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer * via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods. */ -public class ConsumerBlockItemObserver implements BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockResponse> { +public class ConsumerBlockItemObserver implements BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockItemResponse> { private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final StreamObserver responseStreamObserver; + private final StreamObserver responseStreamObserver; private final long timeoutThresholdMillis; @@ -45,7 +45,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockResponse> streamMediator; + private final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator; /** * Constructor for the LiveStreamObserverImpl class. @@ -56,8 +56,8 @@ public ConsumerBlockItemObserver( final long timeoutThresholdMillis, final InstantSource producerLivenessClock, final InstantSource consumerLivenessClock, - final StreamMediator, BlockStreamServiceGrpcProto.BlockResponse> streamMediator, - final StreamObserver responseStreamObserver) { + final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator, + final StreamObserver responseStreamObserver) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.producerLivenessClock = producerLivenessClock; @@ -75,7 +75,7 @@ public ConsumerBlockItemObserver( * */ @Override - public void onEvent(final ObjectEvent event, final long l, final boolean b) throws Exception { + public void onEvent(final ObjectEvent event, final long l, final boolean b) throws Exception { // Check if the consumer has timed out. If so, unsubscribe the observer from the mediator. if (isThresholdExceeded(consumerLivenessMillis)) { @@ -91,10 +91,10 @@ public void onEvent(final ObjectEvent event, /** * The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream. * - * @param blockResponse the BlockResponse passed back to the server via the bidirectional stream to the downstream consumer. + * @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer. */ @Override - public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) { + public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItemResponse) { // Check if the producer has timed out. If so, unsubscribe the observer from the mediator. if (isThresholdExceeded(producerLivenessMillis)) { @@ -102,7 +102,7 @@ public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse streamMediator.unsubscribe(this); } else { // Refresh the consumer liveness - LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse); + LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockItemResponse); consumerLivenessMillis = consumerLivenessClock.millis(); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 308a5134..07753872 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -36,45 +36,45 @@ * managing the subscribe and unsubscribe operations of downstream consumers. It also proxies live * blocks to the subscribers as they arrive and persists the blocks to the block persistence store. */ -public class LiveStreamMediatorImpl implements StreamMediator, BlockStreamServiceGrpcProto.BlockResponse> { +public class LiveStreamMediatorImpl implements StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> { private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final RingBuffer> ringBuffer; + private final RingBuffer> ringBuffer; private final ExecutorService executor; - private final Map, BlockStreamServiceGrpcProto.BlockResponse>, - BatchEventProcessor>> subscribers = new HashMap<>(); + private final Map, BlockStreamServiceGrpcProto.BlockItemResponse>, + BatchEventProcessor>> subscribers = new HashMap<>(); - private final BlockPersistenceHandler blockPersistenceHandler; + private final BlockPersistenceHandler blockPersistenceHandler; /** * Constructor for the LiveStreamMediatorImpl class. * * @param blockPersistenceHandler the block persistence handler */ - public LiveStreamMediatorImpl(final BlockPersistenceHandler blockPersistenceHandler) { + public LiveStreamMediatorImpl(final BlockPersistenceHandler blockPersistenceHandler) { this.blockPersistenceHandler = blockPersistenceHandler; // Initialize and start the disruptor - final Disruptor> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE); + final Disruptor> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE); this.ringBuffer = disruptor.start(); this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); } @Override - public void publishEvent(BlockStreamServiceGrpcProto.Block block) { + public void publishEvent(BlockStreamServiceGrpcProto.BlockItem blockItem) { // Publish the block for all subscribers to receive - LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", block); - ringBuffer.publishEvent((event, sequence) -> event.set(block)); + LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem); + ringBuffer.publishEvent((event, sequence) -> event.set(blockItem)); // Block persistence - blockPersistenceHandler.persist(block); + blockPersistenceHandler.persist(blockItem); } @Override - public void subscribe(final BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockResponse> handler) { + public void subscribe(final BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockItemResponse> handler) { // Initialize the batch event processor and set it on the ring buffer final var batchEventProcessor = new BatchEventProcessorBuilder() @@ -87,7 +87,7 @@ public void subscribe(final BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockResponse> handler) { + public void unsubscribe(final BlockItemEventHandler, BlockStreamServiceGrpcProto.BlockItemResponse> handler) { final var batchEventProcessor = subscribers.remove(handler); // Stop the processor diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index 1533e131..8dad14cf 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -32,7 +32,7 @@ */ public interface StreamMediator { - void publishEvent(final BlockStreamServiceGrpcProto.Block block); + void publishEvent(final BlockStreamServiceGrpcProto.BlockItem blockItem); void subscribe(final BlockItemEventHandler handler); diff --git a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java index 44d11f55..ef6eb98b 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java +++ b/server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java @@ -28,31 +28,31 @@ * Write-Through cache handler coordinates between the block storage and the block cache to ensure the block * is persisted to the storage before being cached. */ -public class WriteThroughCacheHandler implements BlockPersistenceHandler { +public class WriteThroughCacheHandler implements BlockPersistenceHandler { - private final BlockStorage blockStorage; + private final BlockStorage blockStorage; /** * Constructor for the WriteThroughCacheHandler class. * * @param blockStorage the block storage */ - public WriteThroughCacheHandler(final BlockStorage blockStorage) { + public WriteThroughCacheHandler(final BlockStorage blockStorage) { this.blockStorage = blockStorage; } /** * Persists the block to the block storage and cache the block. * - * @param block the block to persist + * @param blockItem the block to persist * @return the block id */ @Override - public Long persist(final BlockStreamServiceGrpcProto.Block block) { + public Long persist(final BlockStreamServiceGrpcProto.BlockItem blockItem) { // Write-Through cache - blockStorage.write(block); - return block.getId(); + blockStorage.write(blockItem); + return blockItem.getId(); } /** @@ -63,10 +63,10 @@ public Long persist(final BlockStreamServiceGrpcProto.Block block) { * @return a queue of blocks */ @Override - public Queue readRange(final long startBlockId, final long endBlockId) { - final Queue blocks = new LinkedList<>(); + public Queue readRange(final long startBlockId, final long endBlockId) { + final Queue blocks = new LinkedList<>(); for (long count = startBlockId; count <= endBlockId; count++) { - final Optional blockOpt = read(count); + final Optional blockOpt = read(count); blockOpt.ifPresent(blocks::add); } @@ -82,7 +82,7 @@ public Queue readRange(final long startBlockI * @return an Optional with the block */ @Override - public Optional read(final long id) { + public Optional read(final long id) { return blockStorage.read(id); } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java index 7f42807d..ed892aff 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java @@ -28,10 +28,10 @@ public interface BlockStorage { /** * Writes a block to storage. * - * @param block the block to write + * @param blockItem the block to write * @return the id of the block */ - Optional write(final V block); + Optional write(final V blockItem); /** * Reads a block from storage. diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java index 821ea4e9..0943210e 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java @@ -32,7 +32,7 @@ /** * The FileSystemBlockStorage class implements the BlockStorage interface to store blocks to the filesystem. */ -public class FileSystemBlockStorage implements BlockStorage { +public class FileSystemBlockStorage implements BlockStorage { public static final String BLOCK_FILE_EXTENSION = ".blk"; @@ -74,16 +74,16 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx /** * Writes a block to the filesystem. * - * @param block the block to write + * @param blockItem the block to write * @return the id of the block */ @Override - public Optional write(final BlockStreamServiceGrpcProto.Block block) { - Long id = block.getId(); + public Optional write(final BlockStreamServiceGrpcProto.BlockItem blockItem) { + Long id = blockItem.getId(); final String fullPath = resolvePath(id); try (FileOutputStream fos = new FileOutputStream(fullPath)) { - block.writeTo(fos); + blockItem.writeTo(fos); LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath); return Optional.of(id); @@ -100,14 +100,14 @@ public Optional write(final BlockStreamServiceGrpcProto.Block block) { * @return the block */ @Override - public Optional read(final Long id) { + public Optional read(final Long id) { return read(resolvePath(id)); } - private Optional read(final String filePath) { + private Optional read(final String filePath) { try (FileInputStream fis = new FileInputStream(filePath)) { - return Optional.of(BlockStreamServiceGrpcProto.Block.parseFrom(fis)); + return Optional.of(BlockStreamServiceGrpcProto.BlockItem.parseFrom(fis)); } catch (FileNotFoundException io) { LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io); return Optional.empty(); diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 305432a1..52e53827 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -27,12 +27,12 @@ * with the connection to the upstream producer (e.g. blocks streamed from the Consensus Node to * the server). */ -public class ProducerBlockItemObserver implements StreamObserver { +public class ProducerBlockItemObserver implements StreamObserver { private final System.Logger LOGGER = System.getLogger(getClass().getName()); - private final StreamObserver responseStreamObserver; - private final StreamMediator, BlockStreamServiceGrpcProto.BlockResponse> streamMediator; + private final StreamObserver responseStreamObserver; + private final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator; /** * Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the mediator with blocks @@ -41,8 +41,8 @@ public class ProducerBlockItemObserver implements StreamObserver, BlockStreamServiceGrpcProto.BlockResponse> streamMediator, - final StreamObserver responseStreamObserver) { + public ProducerBlockItemObserver(final StreamMediator, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator, + final StreamObserver responseStreamObserver) { this.streamMediator = streamMediator; this.responseStreamObserver = responseStreamObserver; } @@ -54,13 +54,13 @@ public ProducerBlockItemObserver(final StreamMediator streamMediator; + private StreamMediator streamMediator; @Mock - private StreamObserver responseStreamObserver; + private StreamObserver responseStreamObserver; // @Test diff --git a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java index 87e1e83b..5bb8279f 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java +++ b/server/src/test/java/com/hedera/block/server/persistence/PersistTestUtils.java @@ -26,10 +26,10 @@ public final class PersistTestUtils { private PersistTestUtils() {} - public static List generateBlocks(int numOfBlocks) { + public static List generateBlocks(int numOfBlocks) { return IntStream .range(1, numOfBlocks + 1) - .mapToObj(i -> BlockStreamServiceGrpcProto.Block + .mapToObj(i -> BlockStreamServiceGrpcProto.BlockItem .newBuilder() .setId(i) .setValue("block-node-" + i).build() diff --git a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java index ef6539a0..7fc653c6 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/RangeTest.java @@ -34,10 +34,10 @@ public void testReadRangeWithEvenEntries() { int maxEntries = 100; int numOfBlocks = 100; - BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - List blocks = generateBlocks(numOfBlocks); - for (BlockStreamServiceGrpcProto.Block block : blocks) { - blockPersistenceHandler.persist(block); + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + List blockItems = generateBlocks(numOfBlocks); + for (BlockStreamServiceGrpcProto.BlockItem blockItem : blockItems) { + blockPersistenceHandler.persist(blockItem); } int window = 10; @@ -50,8 +50,8 @@ public void testReadRangeWithEvenEntries() { public void testReadRangeWithNoBlocks() { int maxEntries = 100; - BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - Queue results = blockPersistenceHandler.readRange(1, 100); + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + Queue results = blockPersistenceHandler.readRange(1, 100); assertNotNull(results); assertEquals(0, results.size()); } @@ -61,15 +61,15 @@ public void testReadRangeWhenBlocksLessThanWindow() { int maxEntries = 100; int numOfBlocks = 9; - BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); - List blocks = generateBlocks(numOfBlocks); - for (BlockStreamServiceGrpcProto.Block block : blocks) { - blockPersistenceHandler.persist(block); + BlockPersistenceHandler blockPersistenceHandler = generateInMemoryTestBlockPersistenceHandler(maxEntries); + List blockItems = generateBlocks(numOfBlocks); + for (BlockStreamServiceGrpcProto.BlockItem blockItem : blockItems) { + blockPersistenceHandler.persist(blockItem); } int window = 10; - Queue results = blockPersistenceHandler.readRange(1, window); + Queue results = blockPersistenceHandler.readRange(1, window); assertNotNull(results); assertEquals(numOfBlocks, results.size()); } @@ -77,44 +77,44 @@ public void testReadRangeWhenBlocksLessThanWindow() { private static void verifyReadRange( int window, int numOfWindows, - BlockPersistenceHandler blockPersistenceHandler) { + BlockPersistenceHandler blockPersistenceHandler) { for (int j = 0; j < numOfWindows;++j) { int startBlockId = (j * window) + 1; int endBlockId = (startBlockId + window) - 1; - Queue results = blockPersistenceHandler.readRange(startBlockId, endBlockId); + Queue results = blockPersistenceHandler.readRange(startBlockId, endBlockId); for (int i = startBlockId;i <= endBlockId && results.peek() != null;++i) { - BlockStreamServiceGrpcProto.Block block = results.poll(); - assertNotNull(block); - assertEquals(i, block.getId()); + BlockStreamServiceGrpcProto.BlockItem blockItem = results.poll(); + assertNotNull(blockItem); + assertEquals(i, blockItem.getId()); } } } - private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler(int maxEntries) { + private static BlockPersistenceHandler generateInMemoryTestBlockPersistenceHandler(int maxEntries) { // Mock up a simple, in-memory persistence handler - BlockStorage blockStorage = new NoOpTestBlockStorage(); + BlockStorage blockStorage = new NoOpTestBlockStorage(); return new WriteThroughCacheHandler(blockStorage); } - private static class NoOpTestBlockStorage implements BlockStorage { + private static class NoOpTestBlockStorage implements BlockStorage { - private final Map cache; + private final Map cache; public NoOpTestBlockStorage() { this.cache = new HashMap<>(); } @Override - public Optional write(BlockStreamServiceGrpcProto.Block block) { - cache.put(block.getId(), block); - return Optional.of(block.getId()); + public Optional write(BlockStreamServiceGrpcProto.BlockItem blockItem) { + cache.put(blockItem.getId(), blockItem); + return Optional.of(blockItem.getId()); } @Override - public Optional read(Long blockId) { + public Optional read(Long blockId) { return Optional.ofNullable(cache.get(blockId)); } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java index 1ec52758..6556e508 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/WriteThroughCacheHandlerTest.java @@ -69,10 +69,10 @@ public void testMaxEntriesGreaterThanBlocks() throws IOException { int numOfBlocks = 4; FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); - BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); - List blocks = generateBlocks(numOfBlocks); - verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + List blockItems = generateBlocks(numOfBlocks); + verifyPersistenceHandler(blockItems, blockPersistenceHandler, testPath); } @Test @@ -80,10 +80,10 @@ public void testMaxEntriesEqualToBlocks() throws IOException { int numOfBlocks = 3; FileSystemBlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); - BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); - List blocks = generateBlocks(numOfBlocks); - verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); + List blockItems = generateBlocks(numOfBlocks); + verifyPersistenceHandler(blockItems, blockPersistenceHandler, testPath); } @Test @@ -91,36 +91,36 @@ public void testMaxEntriesLessThanBlocks() throws IOException { int maxEntries = 3; int numOfBlocks = 4; - BlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); - BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); + BlockStorage blockStorage = new FileSystemBlockStorage(JUNIT, testConfig); + BlockPersistenceHandler blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage); - List blocks = generateBlocks(numOfBlocks); + List blocks = generateBlocks(numOfBlocks); verifyPersistenceHandler(blocks, blockPersistenceHandler, testPath); } private static void verifyPersistenceHandler( - List blocks, - BlockPersistenceHandler blockPersistenceHandler, + List blockItems, + BlockPersistenceHandler blockPersistenceHandler, Path testPath) throws IOException { - for (BlockStreamServiceGrpcProto.Block block : blocks) { + for (BlockStreamServiceGrpcProto.BlockItem blockItem : blockItems) { // Save the block - blockPersistenceHandler.persist(block); + blockPersistenceHandler.persist(blockItem); // Read the block - long blockId = block.getId(); + long blockId = blockItem.getId(); verifyPersistedBlockIsAccessible(blockId, blockPersistenceHandler); // Verify the block was written to the fs - verifyFileExists(blockId, block, testPath); + verifyFileExists(blockId, blockItem, testPath); } } - private static void verifyPersistedBlockIsAccessible(long blockId, BlockPersistenceHandler blockPersistenceHandler) { + private static void verifyPersistedBlockIsAccessible(long blockId, BlockPersistenceHandler blockPersistenceHandler) { // Confirm the block is accessible - Optional blockOpt = blockPersistenceHandler.read(blockId); + Optional blockOpt = blockPersistenceHandler.read(blockId); if (blockOpt.isPresent()) { assertEquals(blockId, blockOpt.get().getId()); } else { @@ -128,13 +128,13 @@ private static void verifyPersistedBlockIsAccessible(long blockId, BlockPersiste } } - private static void verifyFileExists(long blockId, BlockStreamServiceGrpcProto.Block block, Path testPath) throws IOException { + private static void verifyFileExists(long blockId, BlockStreamServiceGrpcProto.BlockItem blockItem, Path testPath) throws IOException { // Verify the block was saved on the filesystem - Path fullTestPath = testPath.resolve(block.getId() + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); + Path fullTestPath = testPath.resolve(blockItem.getId() + FileSystemBlockStorage.BLOCK_FILE_EXTENSION); try (FileInputStream fis = new FileInputStream(fullTestPath.toFile())) { - BlockStreamServiceGrpcProto.Block fetchedBlock = BlockStreamServiceGrpcProto.Block.parseFrom(fis); - assertEquals(blockId, fetchedBlock.getId()); - assertEquals(block.getValue(), fetchedBlock.getValue()); + BlockStreamServiceGrpcProto.BlockItem fetchedBlockItem = BlockStreamServiceGrpcProto.BlockItem.parseFrom(fis); + assertEquals(blockId, fetchedBlockItem.getId()); + assertEquals(blockItem.getValue(), fetchedBlockItem.getValue()); } } }