Skip to content

Commit

Permalink
fix:refactor out FileSystemBlockStorage and split Read and Write
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 19, 2024
1 parent 2721b4b commit bbc4066
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 312 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ private Constants() {}
public static final String SERVICE_NAME = "BlockStreamGrpcService";
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";

// Constants used when interacting with the file system.
public static final String BLOCK_FILE_EXTENSION = ".blk";
}
15 changes: 9 additions & 6 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.BlockStorage;
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
import com.hedera.block.server.persistence.storage.*;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.helidon.config.Config;
Expand Down Expand Up @@ -66,13 +65,17 @@ public static void main(final String[] args) {
.asLong()
.orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<Block, BlockItem> blockStorage =
new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
// Initialize the reader and writer for the block storage
final BlockWriter<BlockItem> blockWriter =
new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockReader<Block> blockReader =
new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
new LiveStreamMediatorImpl(
new WriteThroughCacheHandler(blockReader, blockWriter)));

// Start the web server
WebServer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ public void publishEvent(BlockItem blockItem) {
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));

// Block persistence
blockPersistenceHandler.persist(blockItem);
try {
blockPersistenceHandler.persist(blockItem);
} catch (Exception e) {
// TODO: Push back on the producer?
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.persistence;

import java.io.IOException;
import java.util.Optional;
import java.util.Queue;

Expand All @@ -28,7 +29,7 @@
public interface BlockPersistenceHandler<U, V> {

/** Persists a block. */
void persist(final V blockItem);
void persist(final V blockItem) throws IOException;

/**
* Reads a block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.BlockItem;

import com.hedera.block.server.persistence.storage.BlockStorage;
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.persistence.storage.BlockWriter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
Expand All @@ -30,21 +32,14 @@
*/
public class WriteThroughCacheHandler implements BlockPersistenceHandler<Block, BlockItem> {

private final BlockStorage<Block, BlockItem> blockStorage;
private final BlockReader<Block> blockReader;
private final BlockWriter<BlockItem> blockWriter;

/**
* Constructor for the WriteThroughCacheHandler class.
*
* @param blockStorage the block storage
*/
public WriteThroughCacheHandler(final BlockStorage<Block, BlockItem> blockStorage) {
this.blockStorage = blockStorage;
}

/** Persists the block to the block storage and cache the block. */
@Override
public void persist(final BlockItem blockItem) {
blockStorage.write(blockItem);
/** Constructor for the WriteThroughCacheHandler class. */
public WriteThroughCacheHandler(
final BlockReader<Block> blockReader, final BlockWriter<BlockItem> blockWriter) {
this.blockReader = blockReader;
this.blockWriter = blockWriter;
}

/**
Expand Down Expand Up @@ -74,6 +69,12 @@ public Queue<Block> readRange(final long startBlockId, final long endBlockId) {
*/
@Override
public Optional<Block> read(final long id) {
return blockStorage.read(id);
return blockReader.read(id);
}

/** Persists the block to the block storage and cache the block. */
@Override
public void persist(final BlockItem blockItem) throws IOException {
blockWriter.write(blockItem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,6 @@

import java.util.Optional;

/**
* The BlockStorage interface defines operations to write and read blocks to a persistent store.
*
* @param <V> the type of block to store
*/
public interface BlockStorage<U, V> {

/** Writes a block to storage. */
void write(final V blockItem);

/**
* Reads a block from storage.
*
* @return the block
*/
Optional<U> read(final long blockNumber);
public interface BlockReader<V> {
Optional<V> read(final long blockNumber);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.persistence.storage;

import java.io.IOException;

public interface BlockWriter<V> {
void write(final V blockItem) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.persistence.storage;

import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.Block.Builder;
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;

import io.helidon.config.Config;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

public class FileSystemBlockReader implements BlockReader<Block> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

final Path blockNodeRootPath;

public FileSystemBlockReader(final String key, final Config config) {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");

final Path blockNodeRootPath = Path.of(config.get(key).asString().get());

LOGGER.log(System.Logger.Level.INFO, config.toString());
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);

this.blockNodeRootPath = blockNodeRootPath;
}

public Optional<Block> read(final long blockNumber) {

// Construct the path to the requested Block
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));

// Verify attributes of the Block
if (!isVerified(blockPath)) {
return Optional.empty();
}

// There may be thousands of BlockItem files in a Block directory.
// The BlockItems must be written into the Block object in order. A
// DirectoryStream will iterate in any guaranteed order. To avoid sorting,
// and to keep the retrieval process linear with the number of BlockItems,
// Run a loop to fetch in the order we need. The loop will break when
// it looks for a BlockItem file that does not exist.
final Builder builder = Block.newBuilder();
for (int i = 1; ; i++) {
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
try {
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
if (blockItemOpt.isPresent()) {
builder.addBlockItems(blockItemOpt.get());
continue;
}
} catch (IOException io) {
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io);
return Optional.empty();
}

break;
}

// Return the Block
return Optional.of(builder.build());
}

private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOException {
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
return Optional.of(BlockItem.parseFrom(fis));
} catch (FileNotFoundException io) {
// The outer loop caller will continue to query
// for the next BlockItem file based on the index
// until the FileNotFoundException is thrown.
// It's expected that this exception will be caught
// at the end of every query.
return Optional.empty();
}
}

private boolean isVerified(final Path blockPath) {
if (!blockPath.toFile().isDirectory()) {
LOGGER.log(System.Logger.Level.ERROR, "Block directory not found: " + blockPath);
return false;
}

if (!blockPath.toFile().canRead()) {
LOGGER.log(System.Logger.Level.ERROR, "Block directory not readable: " + blockPath);
return false;
}

return true;
}
}
Loading

0 comments on commit bbc4066

Please sign in to comment.