Skip to content

Commit

Permalink
fix: added tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent 23106d3 commit bbec8cd
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
import com.lmax.disruptor.EventHandler;

public interface BlockItemEventHandler<T> extends EventHandler<T> {
void awaitShutdown() throws InterruptedException;
void awaitShutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ConsumerBlockItemObserver
private long producerLivenessMillis;

private boolean streamStarted;
private final AtomicBoolean allowPublish = new AtomicBoolean(true);
private final AtomicBoolean isResponsePermitted = new AtomicBoolean(true);

protected Runnable onCancel;
protected Runnable onClose;
Expand Down Expand Up @@ -73,7 +73,9 @@ public ConsumerBlockItemObserver(

onCancel =
() -> {
allowPublish.set(false);
// The consumer has cancelled the stream.
// Do not allow additional responses to be sent.
isResponsePermitted.set(false);
streamMediator.unsubscribe(this);
LOGGER.log(
System.Logger.Level.INFO,
Expand All @@ -83,7 +85,9 @@ public ConsumerBlockItemObserver(

onClose =
() -> {
allowPublish.set(false);
// The consumer has closed the stream.
// Do not allow additional responses to be sent.
isResponsePermitted.set(false);
streamMediator.unsubscribe(this);
LOGGER.log(
System.Logger.Level.INFO,
Expand Down Expand Up @@ -119,7 +123,7 @@ public void onEvent(
streamStarted = true;
}

if (streamStarted && allowPublish.get()) {
if (streamStarted && isResponsePermitted.get()) {
LOGGER.log(System.Logger.Level.INFO, "Send BlockItem downstream: {0} ", blockItem);
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
Expand All @@ -128,7 +132,7 @@ public void onEvent(

@Override
public void awaitShutdown() {
if (!allowPublish.get()) {
if (!isResponsePermitted.get()) {
LOGGER.log(System.Logger.Level.INFO, "ConsumerBlockItemObserver shutting down...");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,17 @@ public void subscribe(
public void unsubscribe(
final BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

// Shutdown the handler
handler.awaitShutdown();

// Remove the subscriber
final var batchEventProcessor = subscribers.remove(handler);
try {
// Wait for shutdown the complete
handler.awaitShutdown();

// Stop the processor
batchEventProcessor.halt();
// Stop the processor
batchEventProcessor.halt();

// Remove the gating sequence from the ring buffer
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
} catch (InterruptedException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while waiting for shutdown", e);
}
// Remove the gating sequence from the ring buffer
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOE
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
return Optional.of(BlockItem.parseFrom(fis));
} catch (FileNotFoundException io) {
File f = new File(blockItemPath);
final File f = new File(blockItemPath);
if (!f.exists()) {
// The outer loop caller will continue to query
// for the next BlockItem file based on the index
Expand All @@ -134,20 +134,31 @@ private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOE
}

private boolean isVerified(final Path path) {

if (!path.toFile().exists()) {
// This code path gets hit if a consumer
// requests a block that does not exist.
// Only log this as a debug message.
LOGGER.log(System.Logger.Level.DEBUG, "Path not found: {0}", path);
return false;
}

if (!isPathReadable(path)) {
if (!path.toFile().canRead()) {
LOGGER.log(System.Logger.Level.ERROR, "Path not readable: {0}", path);
LOGGER.log(
System.Logger.Level.ERROR,
"Attempting to repair the path permissions: {0}",
path);

// If resetting the permissions fails or
// if the path is still unreadable, return false.
if (!setPerms(path) || !isPathReadable(path)) {
try {
// If resetting the permissions fails or
// if the path is still unreadable, return false.
setPerm(path, filePerms.value());
if (!path.toFile().canRead()) {
return false;
}
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error setting permissions on: " + path, e);
return false;
}
}
Expand All @@ -160,24 +171,7 @@ private boolean isVerified(final Path path) {
return true;
}

private boolean isPathReadable(final Path path) {
if (path.toFile().canRead()) {
return true;
}

LOGGER.log(System.Logger.Level.ERROR, "Path not readable: {0}", path);

return false;
}

private boolean setPerms(final Path path) {
try {
Files.setPosixFilePermissions(path, filePerms.value());
return true;
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error setting permissions on: " + path, e);
}

return false;
void setPerm(Path path, Set<PosixFilePermission> perms) throws IOException {
Files.setPosixFilePermissions(path, perms);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static com.hedera.block.server.util.TestClock.buildClockInsideWindow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.persistence.storage.BlockWriter;
Expand All @@ -48,6 +48,7 @@ public class LiveStreamMediatorImplTest {
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer2;
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer3;

@Mock private BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
@Mock private BlockReader<Block> blockReader;
@Mock private BlockWriter<BlockItem> blockWriter;

Expand Down Expand Up @@ -278,6 +279,36 @@ public void testOnCloseSubscriptionHandling() throws IOException {
assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver));
}

@Test
public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException {
final var streamMediator = new LiveStreamMediatorImpl(blockPersistenceHandler);

final List<BlockItem> blockItems = generateBlockItems(1);
final BlockItem firstBlockItem = blockItems.getFirst();

// Right now, only a single producer calls publishEvent. In
// that case, they will get an IOException bubbled up to them.
// However, we will need to support multiple producers in the
// future. In that case, we need to make sure a second producer
// is not able to publish a block after the first producer fails.
doThrow(new IOException()).when(blockPersistenceHandler).persist(firstBlockItem);
try {
streamMediator.publishEvent(firstBlockItem);
fail("Expected an IOException to be thrown");
} catch (IOException e) {
synchronized (lock) {
lock.wait(50);
}

final BlockItem secondBlockItem = blockItems.get(1);
streamMediator.publishEvent(secondBlockItem);

// Confirm the BlockPersistenceHandler write method was only called
// once despite the second block being published.
verify(blockPersistenceHandler, times(1)).persist(firstBlockItem);
}
}

private static class TestConsumerBlockItemObserver extends ConsumerBlockItemObserver {
public TestConsumerBlockItemObserver(
long timeoutThresholdMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import io.helidon.config.Config;
import io.helidon.config.MapConfigSource;
import io.helidon.config.spi.ConfigSource;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -51,8 +54,8 @@ public void setUp() throws IOException {
testPath = Files.createTempDirectory(TEMP_DIR);
LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString());

Map<String, String> testProperties = Map.of(JUNIT, testPath.toString());
ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build();
final Map<String, String> testProperties = Map.of(JUNIT, testPath.toString());
final ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build();
testConfig = Config.builder(testConfigSource).build();
}

Expand All @@ -67,15 +70,15 @@ public void tearDown() {

@Test
public void testReadBlockDoesNotExist() throws IOException {
BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
Optional<Block> blockOpt = blockReader.read(10000);
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(10000);
assertTrue(blockOpt.isEmpty());
}

@Test
public void testReadPermsRepairSucceeded() throws IOException {
List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
for (BlockItem blockItem : blockItems) {
blockWriter.write(blockItem);
}
Expand All @@ -84,16 +87,16 @@ public void testReadPermsRepairSucceeded() throws IOException {
removeBlockReadPerms(1, testConfig);

// The default BlockReader will attempt to repair the permissions and should succeed
BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
Optional<Block> blockOpt = blockReader.read(1);
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(1);
assertFalse(blockOpt.isEmpty());
assertEquals(10, blockOpt.get().getBlockItemsList().size());
}

@Test
public void testRemoveBlockReadPermsRepairFailed() throws IOException {
List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
for (BlockItem blockItem : blockItems) {
blockWriter.write(blockItem);
}
Expand All @@ -103,33 +106,88 @@ public void testRemoveBlockReadPermsRepairFailed() throws IOException {

// For this test, build the Reader with ineffective repair permissions to
// simulate a failed repair (root changed the perms, etc.)
BlockRemover blockRemover =
final BlockRemover blockRemover =
new BlockAsDirRemover(
Path.of(testConfig.get(JUNIT).asString().get()), Util.defaultPerms);
BlockReader<Block> blockReader =
final BlockReader<Block> blockReader =
new BlockAsDirReader(JUNIT, testConfig, blockRemover, TestUtils.getNoPerms());
Optional<Block> blockOpt = blockReader.read(1);
final Optional<Block> blockOpt = blockReader.read(1);
assertTrue(blockOpt.isEmpty());
}

@Test
public void testRemoveBlockItemReadPerms() throws IOException {
List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
for (BlockItem blockItem : blockItems) {
blockWriter.write(blockItem);
}

removeBlockItemReadPerms(1, 1, testConfig);

BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
assertThrows(IOException.class, () -> blockReader.read(1));
}

@Test
public void testPathIsNotDirectory() throws IOException {
final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final Path blockNodeRootPath = Path.of(testConfig.get(JUNIT).asString().get());

// Write a file named "1" where a directory should be
writeFileToPath(blockNodeRootPath.resolve(Path.of("1")), blockItems.getFirst());

// Should return empty because the path is not a directory
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(1);
assertTrue(blockOpt.isEmpty());
}

@Test
public void testRepairReadPermsFails() throws IOException {

final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
for (final BlockItem blockItem : blockItems) {
blockWriter.write(blockItem);
}

removeBlockReadPerms(1, testConfig);

final BlockReader<Block> blockReader = new TestPermIOException(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(1);
assertTrue(blockOpt.isEmpty());
}

@Test
public void testBlockNodePathReadFails() throws IOException {

// Remove read perm on the root path
removePathReadPerms(Path.of(testConfig.get(JUNIT).asString().get()));

// Use TestPermIOException to simulate an IOException when setting perms
// to always fail
final BlockReader<Block> blockReader = new TestPermIOException(JUNIT, testConfig);
final Optional<Block> blockOpt = blockReader.read(1);
assertTrue(blockOpt.isEmpty());
}

private void writeFileToPath(final Path path, final BlockItem blockItem) throws IOException {
try (FileOutputStream fos = new FileOutputStream(path.toString())) {
blockItem.writeTo(fos);
LOGGER.log(
System.Logger.Level.INFO, "Successfully wrote the block item file: {0}", path);
}
}

static void removeBlockReadPerms(int blockNumber, final Config config) throws IOException {
final Path blockNodeRootPath = Path.of(config.get(JUNIT).asString().get());
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
Files.setPosixFilePermissions(blockPath, TestUtils.getNoRead().value());
removePathReadPerms(blockPath);
}

static void removePathReadPerms(final Path path) throws IOException {
Files.setPosixFilePermissions(path, TestUtils.getNoRead().value());
}

private void removeBlockItemReadPerms(int blockNumber, int blockItem, Config config)
Expand All @@ -139,4 +197,15 @@ private void removeBlockItemReadPerms(int blockNumber, int blockItem, Config con
final Path blockItemPath = blockPath.resolve(blockItem + BLOCK_FILE_EXTENSION);
Files.setPosixFilePermissions(blockItemPath, TestUtils.getNoRead().value());
}

private static final class TestPermIOException extends BlockAsDirReader {
public TestPermIOException(String key, Config config) {
super(key, config);
}

@Override
void setPerm(Path path, Set<PosixFilePermission> perms) throws IOException {
throw new IOException("Test IOException");
}
}
}

0 comments on commit bbec8cd

Please sign in to comment.