Skip to content

Commit

Permalink
added additional BlockItem type support
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 16, 2024
1 parent 8951fa0 commit 84240b1
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 168 deletions.
11 changes: 9 additions & 2 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,18 @@ message Block {
*/
message BlockItem {

BlockHeader block_header = 1;
oneof items {
BlockHeader block_header = 1;
BlockProof state_proof = 2;
}

string value = 2;
string value = 3;
}

message BlockHeader {
uint64 block_number = 1;
}

message BlockProof {
uint64 block = 1;
}
7 changes: 4 additions & 3 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.hedera.block.server;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.Constants.*;

import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.BlockStorage;
Expand All @@ -28,8 +25,12 @@
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;

import java.io.IOException;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.Constants.*;

/** Main class for the block node server */
public class Server {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class ConsumerBlockItemObserver

private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;

private boolean isReachedFirstBlock;

/**
* Constructor for the LiveStreamObserverImpl class.
Expand Down Expand Up @@ -99,16 +98,10 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
producerLivenessMillis = producerLivenessClock.millis();

final BlockItem blockItem = event.get();
if (!isReachedFirstBlock && blockItem.getBlockHeader() > 0) {
isReachedFirstBlock = true;
}

if (isReachedFirstBlock) {
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.hedera.block.server.mediator;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
Expand All @@ -27,11 +24,14 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.hedera.block.protos.BlockStreamService.*;

/**
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
Expand All @@ -51,15 +51,15 @@ public class LiveStreamMediatorImpl
BatchEventProcessor<ObjectEvent<BlockItem>>>
subscribers = new HashMap<>();

private final BlockPersistenceHandler<BlockItem> blockPersistenceHandler;
private final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler;

/**
* Constructor for the LiveStreamMediatorImpl class.
*
* @param blockPersistenceHandler the block persistence handler
*/
public LiveStreamMediatorImpl(
final BlockPersistenceHandler<BlockItem> blockPersistenceHandler) {
final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler) {
this.blockPersistenceHandler = blockPersistenceHandler;

// Initialize and start the disruptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ public interface BlockPersistenceHandler<U, V> {
/**
* Persists a block.
*
* @param blockNumber the block to persist
* @return the id of the block
*/
Long persist(final V blockItem, final long blockNumber);
void persist(final V blockItem);

/**
* Reads a block.
Expand Down
32 changes: 32 additions & 0 deletions server/src/main/java/com/hedera/block/server/persistence/Util.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.persistence;

import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.BlockHeader;

public final class Util {
private Util() {}

public static long getBlockNumber(final Block block) {
return getBlockHeader(block).getBlockNumber();
}

public static BlockHeader getBlockHeader(final Block block) {
return block.getBlockItems(0).getBlockHeader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.hedera.block.server.persistence.storage.BlockStorage;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.BlockItem;

import java.util.LinkedList;
import java.util.Optional;
Expand All @@ -45,15 +45,10 @@ public WriteThroughCacheHandler(final BlockStorage<Block, BlockItem> blockStorag
/**
* Persists the block to the block storage and cache the block.
*
* @param blockItem the block to persist
* @return the block id
*/
@Override
public Long persist(final BlockItem blockItem, final long blockNumber) {

// Write-Through cache
blockStorage.write(blockItem, blockNumber);
return blockItem.getBlockHeader().getBlockNumber();
public void persist(final BlockItem blockItem) {
blockStorage.write(blockItem);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@
*
* @param <V> the type of block to store
*/
public interface BlockStorage<V> {
public interface BlockStorage<U, V> {

/**
* Writes a block to storage.
*
* @return the id of the block
*/
Optional<Long> write(final V block, final long blockNumber);
void write(final V blockItem);

/**
* Reads a block from storage.
*
* @return the block
*/
Optional<V> read(final long blockNumber);
Optional<U> read(final long blockNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;

import static com.hedera.block.protos.BlockStreamService.Block;
import static com.hedera.block.protos.BlockStreamService.Block.Builder;
import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY;

Expand All @@ -37,10 +37,14 @@
*/
public class FileSystemBlockStorage implements BlockStorage<Block, BlockItem> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

public static final String BLOCK_FILE_EXTENSION = ".blk";

private final Path blockNodeRootPath;
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private long currentIndex = 1;
private Path currentBlockDir;

/**
* Constructs a FileSystemBlockStorage object.
Expand All @@ -50,12 +54,11 @@ public class FileSystemBlockStorage implements BlockStorage<Block, BlockItem> {
* @throws IOException if an I/O error occurs while initializing the block node root directory
*/
public FileSystemBlockStorage(final String key, final Config config) throws IOException {

LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage");
LOGGER.log(System.Logger.Level.INFO, config.toString());

blockNodeRootPath = Path.of(config.get(key).asString().get());

LOGGER.log(System.Logger.Level.INFO, config.toString());
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);

if (!blockNodeRootPath.isAbsolute()) {
Expand All @@ -79,20 +82,20 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
/**
* Writes a block to the filesystem.
*
* @param blockItem the block to write
* @return the id of the block
*/
@Override
public Optional<Long> write(final BlockItem blockItem, final long blockNumber) {
final String fullPath = resolvePath(blockNumber);
try (FileOutputStream fos = new FileOutputStream(fullPath)) {
blockItem.writeTo(fos);
LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath);

return Optional.of(blockNumber);
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
return Optional.empty();
public void write(final BlockItem blockItem) {

try {
final String blockItemFilePath = getAbsoluteFilePath(blockItem);
try (FileOutputStream fos = new FileOutputStream(blockItemFilePath)) {
blockItem.writeTo(fos);
LOGGER.log(System.Logger.Level.INFO, "Successfully wrote the block item file: {0}", blockItemFilePath);
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
}
} catch (IOException io) {
LOGGER.log(System.Logger.Level.ERROR, "Error calculating the block item path", io);
}
}

Expand All @@ -104,27 +107,82 @@ public Optional<Long> write(final BlockItem blockItem, final long blockNumber) {
*/
@Override
public Optional<Block> read(final long id) {
return read(resolvePath(id));

final Builder builder = Block.newBuilder();
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id));
return read(blockPath, builder);
}

private Optional<Block> read(final String filePath) {
private Optional<Block> read(final Path blockPath, final Builder builder) {

// Directly count and add BlockItems into the Block
// to keep the retrieval process O(BlockItems)
boolean isEnd = false;
for (int i = 1;!isEnd;i++) {
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
if (blockItemOpt.isPresent()) {
builder.addBlockItems(blockItemOpt.get());
continue;
}

isEnd = true;
}

return Optional.of(builder.build());
}

try (FileInputStream fis = new FileInputStream(filePath)) {
private Optional<BlockItem> readBlockItem(final String blockItemPath) {
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
return Optional.of(BlockItem.parseFrom(fis));
} catch (FileNotFoundException io) {
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io);
return Optional.empty();
} catch (IOException io) {
throw new RuntimeException("Error reading file: " + filePath, io);
throw new RuntimeException("Error reading file: " + blockItemPath, io);
}
}

private String resolvePath(final long blockNumber) {
private String getAbsoluteFilePath(final BlockItem blockItem) throws IOException {

String fileName = id + BLOCK_FILE_EXTENSION;
Path fullPath = blockNodeRootPath.resolve(fileName);
LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath);
if (blockItem.hasBlockHeader()) {

return fullPath.toString();
// A "block" is a directory of blockItems. Create the "block"
// based on the block_number
currentBlockDir = Path.of(String.valueOf(blockItem.getBlockHeader().getBlockNumber()));

final Path blockPath = blockNodeRootPath.resolve(currentBlockDir);
createPath(blockPath);

// Build the path to the BlockHeader.blk file
currentIndex = 1;
return blockPath.resolve(currentIndex + BLOCK_FILE_EXTENSION).toString();
}

// Build the path to a .blk file
final Path blockPath = blockNodeRootPath.resolve(currentBlockDir);
return blockPath.resolve(++currentIndex + BLOCK_FILE_EXTENSION).toString();
}

private void createPath(Path blockNodePath) throws IOException {
// Initialize the block node root directory if it does not exist
if (Files.notExists(blockNodePath)) {
Files.createDirectory(blockNodePath);
LOGGER.log(
System.Logger.Level.INFO,
"Created block node root directory: " + blockNodePath);
} else {
LOGGER.log(
System.Logger.Level.INFO,
"Using existing block node root directory: " + blockNodePath);
}
}

// private String resolvePath(final long blockNumber) {
//
// String fileName = blockNumber + BLOCK_FILE_EXTENSION;
// Path fullPath = blockNodeRootPath.resolve(fileName);
// LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath);
//
// return fullPath.toString();
// }
}
Loading

0 comments on commit 84240b1

Please sign in to comment.