diff --git a/server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java b/server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java index b4f983146..7ed2864da 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java +++ b/server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java @@ -19,5 +19,5 @@ import com.lmax.disruptor.EventHandler; public interface BlockItemEventHandler extends EventHandler { - void awaitShutdown() throws InterruptedException; + void awaitShutdown(); } diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 0a9a83483..98c652a63 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -44,7 +44,7 @@ public class ConsumerBlockItemObserver private long producerLivenessMillis; private boolean streamStarted; - private final AtomicBoolean allowPublish = new AtomicBoolean(true); + private final AtomicBoolean isResponsePermitted = new AtomicBoolean(true); protected Runnable onCancel; protected Runnable onClose; @@ -73,7 +73,9 @@ public ConsumerBlockItemObserver( onCancel = () -> { - allowPublish.set(false); + // The consumer has cancelled the stream. + // Do not allow additional responses to be sent. + isResponsePermitted.set(false); streamMediator.unsubscribe(this); LOGGER.log( System.Logger.Level.INFO, @@ -83,7 +85,9 @@ public ConsumerBlockItemObserver( onClose = () -> { - allowPublish.set(false); + // The consumer has closed the stream. + // Do not allow additional responses to be sent. + isResponsePermitted.set(false); streamMediator.unsubscribe(this); LOGGER.log( System.Logger.Level.INFO, @@ -119,7 +123,7 @@ public void onEvent( streamStarted = true; } - if (streamStarted && allowPublish.get()) { + if (streamStarted && isResponsePermitted.get()) { LOGGER.log(System.Logger.Level.INFO, "Send BlockItem downstream: {0} ", blockItem); subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } @@ -128,7 +132,7 @@ public void onEvent( @Override public void awaitShutdown() { - if (!allowPublish.get()) { + if (!isResponsePermitted.get()) { LOGGER.log(System.Logger.Level.INFO, "ConsumerBlockItemObserver shutting down..."); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index b148c08ce..b4aa2081b 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -154,19 +154,17 @@ public void subscribe( public void unsubscribe( final BlockItemEventHandler> handler) { + // Shutdown the handler + handler.awaitShutdown(); + + // Remove the subscriber final var batchEventProcessor = subscribers.remove(handler); - try { - // Wait for shutdown the complete - handler.awaitShutdown(); - // Stop the processor - batchEventProcessor.halt(); + // Stop the processor + batchEventProcessor.halt(); - // Remove the gating sequence from the ring buffer - ringBuffer.removeGatingSequence(batchEventProcessor.getSequence()); - } catch (InterruptedException e) { - LOGGER.log(System.Logger.Level.ERROR, "Error occurred while waiting for shutdown", e); - } + // Remove the gating sequence from the ring buffer + ringBuffer.removeGatingSequence(batchEventProcessor.getSequence()); } @Override diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java index 68c020154..4916b1719 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java @@ -117,7 +117,7 @@ private Optional readBlockItem(final String blockItemPath) throws IOE try (FileInputStream fis = new FileInputStream(blockItemPath)) { return Optional.of(BlockItem.parseFrom(fis)); } catch (FileNotFoundException io) { - File f = new File(blockItemPath); + final File f = new File(blockItemPath); if (!f.exists()) { // The outer loop caller will continue to query // for the next BlockItem file based on the index @@ -134,20 +134,31 @@ private Optional readBlockItem(final String blockItemPath) throws IOE } private boolean isVerified(final Path path) { + if (!path.toFile().exists()) { + // This code path gets hit if a consumer + // requests a block that does not exist. + // Only log this as a debug message. LOGGER.log(System.Logger.Level.DEBUG, "Path not found: {0}", path); return false; } - if (!isPathReadable(path)) { + if (!path.toFile().canRead()) { + LOGGER.log(System.Logger.Level.ERROR, "Path not readable: {0}", path); LOGGER.log( System.Logger.Level.ERROR, "Attempting to repair the path permissions: {0}", path); - // If resetting the permissions fails or - // if the path is still unreadable, return false. - if (!setPerms(path) || !isPathReadable(path)) { + try { + // If resetting the permissions fails or + // if the path is still unreadable, return false. + setPerm(path, filePerms.value()); + if (!path.toFile().canRead()) { + return false; + } + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "Error setting permissions on: " + path, e); return false; } } @@ -160,24 +171,7 @@ private boolean isVerified(final Path path) { return true; } - private boolean isPathReadable(final Path path) { - if (path.toFile().canRead()) { - return true; - } - - LOGGER.log(System.Logger.Level.ERROR, "Path not readable: {0}", path); - - return false; - } - - private boolean setPerms(final Path path) { - try { - Files.setPosixFilePermissions(path, filePerms.value()); - return true; - } catch (IOException e) { - LOGGER.log(System.Logger.Level.ERROR, "Error setting permissions on: " + path, e); - } - - return false; + void setPerm(Path path, Set perms) throws IOException { + Files.setPosixFilePermissions(path, perms); } } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index c0bd742b8..9ce65fb83 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -19,13 +19,13 @@ import static com.hedera.block.protos.BlockStreamService.*; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; import static com.hedera.block.server.util.TestClock.buildClockInsideWindow; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.consumer.ConsumerBlockItemObserver; import com.hedera.block.server.data.ObjectEvent; +import com.hedera.block.server.persistence.BlockPersistenceHandler; import com.hedera.block.server.persistence.FileSystemPersistenceHandler; import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.persistence.storage.BlockWriter; @@ -48,6 +48,7 @@ public class LiveStreamMediatorImplTest { @Mock private BlockItemEventHandler> observer2; @Mock private BlockItemEventHandler> observer3; + @Mock private BlockPersistenceHandler blockPersistenceHandler; @Mock private BlockReader blockReader; @Mock private BlockWriter blockWriter; @@ -278,6 +279,36 @@ public void testOnCloseSubscriptionHandling() throws IOException { assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); } + @Test + public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException { + final var streamMediator = new LiveStreamMediatorImpl(blockPersistenceHandler); + + final List blockItems = generateBlockItems(1); + final BlockItem firstBlockItem = blockItems.getFirst(); + + // Right now, only a single producer calls publishEvent. In + // that case, they will get an IOException bubbled up to them. + // However, we will need to support multiple producers in the + // future. In that case, we need to make sure a second producer + // is not able to publish a block after the first producer fails. + doThrow(new IOException()).when(blockPersistenceHandler).persist(firstBlockItem); + try { + streamMediator.publishEvent(firstBlockItem); + fail("Expected an IOException to be thrown"); + } catch (IOException e) { + synchronized (lock) { + lock.wait(50); + } + + final BlockItem secondBlockItem = blockItems.get(1); + streamMediator.publishEvent(secondBlockItem); + + // Confirm the BlockPersistenceHandler write method was only called + // once despite the second block being published. + verify(blockPersistenceHandler, times(1)).persist(firstBlockItem); + } + } + private static class TestConsumerBlockItemObserver extends ConsumerBlockItemObserver { public TestConsumerBlockItemObserver( long timeoutThresholdMillis, diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirReaderTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirReaderTest.java index 87a13159e..f64007856 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirReaderTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirReaderTest.java @@ -26,12 +26,15 @@ import io.helidon.config.Config; import io.helidon.config.MapConfigSource; import io.helidon.config.spi.ConfigSource; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,8 +54,8 @@ public void setUp() throws IOException { testPath = Files.createTempDirectory(TEMP_DIR); LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString()); - Map testProperties = Map.of(JUNIT, testPath.toString()); - ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); + final Map testProperties = Map.of(JUNIT, testPath.toString()); + final ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build(); testConfig = Config.builder(testConfigSource).build(); } @@ -67,15 +70,15 @@ public void tearDown() { @Test public void testReadBlockDoesNotExist() throws IOException { - BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); - Optional blockOpt = blockReader.read(10000); + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final Optional blockOpt = blockReader.read(10000); assertTrue(blockOpt.isEmpty()); } @Test public void testReadPermsRepairSucceeded() throws IOException { - List blockItems = PersistTestUtils.generateBlockItems(1); - BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + final List blockItems = PersistTestUtils.generateBlockItems(1); + final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); for (BlockItem blockItem : blockItems) { blockWriter.write(blockItem); } @@ -84,16 +87,16 @@ public void testReadPermsRepairSucceeded() throws IOException { removeBlockReadPerms(1, testConfig); // The default BlockReader will attempt to repair the permissions and should succeed - BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); - Optional blockOpt = blockReader.read(1); + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final Optional blockOpt = blockReader.read(1); assertFalse(blockOpt.isEmpty()); assertEquals(10, blockOpt.get().getBlockItemsList().size()); } @Test public void testRemoveBlockReadPermsRepairFailed() throws IOException { - List blockItems = PersistTestUtils.generateBlockItems(1); - BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + final List blockItems = PersistTestUtils.generateBlockItems(1); + final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); for (BlockItem blockItem : blockItems) { blockWriter.write(blockItem); } @@ -103,18 +106,18 @@ public void testRemoveBlockReadPermsRepairFailed() throws IOException { // For this test, build the Reader with ineffective repair permissions to // simulate a failed repair (root changed the perms, etc.) - BlockRemover blockRemover = + final BlockRemover blockRemover = new BlockAsDirRemover( Path.of(testConfig.get(JUNIT).asString().get()), Util.defaultPerms); - BlockReader blockReader = + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig, blockRemover, TestUtils.getNoPerms()); - Optional blockOpt = blockReader.read(1); + final Optional blockOpt = blockReader.read(1); assertTrue(blockOpt.isEmpty()); } @Test public void testRemoveBlockItemReadPerms() throws IOException { - List blockItems = PersistTestUtils.generateBlockItems(1); + final List blockItems = PersistTestUtils.generateBlockItems(1); BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); for (BlockItem blockItem : blockItems) { blockWriter.write(blockItem); @@ -122,14 +125,69 @@ public void testRemoveBlockItemReadPerms() throws IOException { removeBlockItemReadPerms(1, 1, testConfig); - BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); assertThrows(IOException.class, () -> blockReader.read(1)); } + @Test + public void testPathIsNotDirectory() throws IOException { + final List blockItems = PersistTestUtils.generateBlockItems(1); + final Path blockNodeRootPath = Path.of(testConfig.get(JUNIT).asString().get()); + + // Write a file named "1" where a directory should be + writeFileToPath(blockNodeRootPath.resolve(Path.of("1")), blockItems.getFirst()); + + // Should return empty because the path is not a directory + final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); + final Optional blockOpt = blockReader.read(1); + assertTrue(blockOpt.isEmpty()); + } + + @Test + public void testRepairReadPermsFails() throws IOException { + + final List blockItems = PersistTestUtils.generateBlockItems(1); + final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig); + for (final BlockItem blockItem : blockItems) { + blockWriter.write(blockItem); + } + + removeBlockReadPerms(1, testConfig); + + final BlockReader blockReader = new TestPermIOException(JUNIT, testConfig); + final Optional blockOpt = blockReader.read(1); + assertTrue(blockOpt.isEmpty()); + } + + @Test + public void testBlockNodePathReadFails() throws IOException { + + // Remove read perm on the root path + removePathReadPerms(Path.of(testConfig.get(JUNIT).asString().get())); + + // Use TestPermIOException to simulate an IOException when setting perms + // to always fail + final BlockReader blockReader = new TestPermIOException(JUNIT, testConfig); + final Optional blockOpt = blockReader.read(1); + assertTrue(blockOpt.isEmpty()); + } + + private void writeFileToPath(final Path path, final BlockItem blockItem) throws IOException { + try (FileOutputStream fos = new FileOutputStream(path.toString())) { + blockItem.writeTo(fos); + LOGGER.log( + System.Logger.Level.INFO, "Successfully wrote the block item file: {0}", path); + } + } + static void removeBlockReadPerms(int blockNumber, final Config config) throws IOException { final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get()); final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber)); - Files.setPosixFilePermissions(blockPath, TestUtils.getNoRead().value()); + removePathReadPerms(blockPath); + } + + static void removePathReadPerms(final Path path) throws IOException { + Files.setPosixFilePermissions(path, TestUtils.getNoRead().value()); } private void removeBlockItemReadPerms(int blockNumber, int blockItem, Config config) @@ -139,4 +197,15 @@ private void removeBlockItemReadPerms(int blockNumber, int blockItem, Config con final Path blockItemPath = blockPath.resolve(blockItem + BLOCK_FILE_EXTENSION); Files.setPosixFilePermissions(blockItemPath, TestUtils.getNoRead().value()); } + + private static final class TestPermIOException extends BlockAsDirReader { + public TestPermIOException(String key, Config config) { + super(key, config); + } + + @Override + void setPerm(Path path, Set perms) throws IOException { + throw new IOException("Test IOException"); + } + } }