Skip to content

Commit

Permalink
A load closer
Browse files Browse the repository at this point in the history
Signed-off-by: jasperpotts <[email protected]>
  • Loading branch information
jasperpotts committed Dec 19, 2024
1 parent 9e36b3d commit 5e0e3cd
Show file tree
Hide file tree
Showing 9 changed files with 523 additions and 138 deletions.
2 changes: 1 addition & 1 deletion stream/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ tasks.withType<JavaCompile>().configureEach {
tasks.cloneHederaProtobufs {
// uncomment below to use a specific tag
// tag = "v0.53.0" or a specific commit like "0047255"
tag = "1033f10"
tag = "eab8b58e30336512bcf387c803e6fc86b6ebe010"

// uncomment below to use a specific branch
// branch = "main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,29 @@

package com.hedera.block.tools.commands.record2blocks;

import static com.hedera.block.tools.commands.record2blocks.mirrornode.FetchBlockQuery.getPreviousHashForBlock;
import static com.hedera.block.tools.commands.record2blocks.util.BlockWriter.writeBlock;
import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.blockTimeLongToInstant;

import com.hedera.block.tools.commands.record2blocks.gcp.MainNetBucket;
import com.hedera.block.tools.commands.record2blocks.model.BlockInfo;
import com.hedera.block.tools.commands.record2blocks.model.BlockTimes;
import com.hedera.block.tools.commands.record2blocks.model.ChainFile;
import com.hedera.block.tools.commands.record2blocks.model.SignatureFile;
import com.hedera.block.tools.commands.record2blocks.model.ParsedSignatureFile;
import com.hedera.block.tools.commands.record2blocks.model.RecordFileVersionInfo;
import com.hedera.block.tools.commands.record2blocks.util.BlockWriter.BlockPath;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.stream.BlockItem.ItemOneOfType;
import com.hedera.hapi.block.stream.RecordFileItem;
import com.hedera.hapi.block.stream.RecordFileSignature;
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.hedera.hapi.node.base.BlockHashAlgorithm;
import com.hedera.hapi.node.base.SemanticVersion;
import com.hedera.hapi.node.base.Timestamp;
import com.hedera.hapi.streams.SidecarFile;
import com.hedera.pbj.runtime.OneOf;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
import java.io.IOException;
Expand All @@ -42,14 +49,23 @@
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HexFormat;
import java.util.List;
import picocli.CommandLine.Command;
import picocli.CommandLine.Help.Ansi;
import picocli.CommandLine.Option;

/**
* Command line command that converts a record stream to blocks
* <p>
* Example block ranges for testing:
* <ul>
* <li><code>-s 0 -e 10</code> - Record File v2</li>
* <li><code>-s 12877843 -e 12877853</code> - Record File v5</li>
* <li><code>-s 72756872 -e 72756882</code> - Record File v6 with sidecars</li>
* </ul>
* Record files start at V2 at block 0 then change to V5 at block 12370838 and V6 at block 38210031
* </p>
*/
@SuppressWarnings("FieldCanBeLocal")
@Command(name = "record2block", description = "Converts a record stream files into blocks")
Expand Down Expand Up @@ -137,6 +153,14 @@ public void run() {
}
// map the block_times.bin file
final BlockTimes blockTimes = new BlockTimes(blockTimesFile);
// get previous block hash
Bytes previousBlockHash;
if (startBlock == 0) {
previousBlockHash = Bytes.wrap(new byte[48]); // empty hash for first block
} else {
// get previous block hash from mirror node
previousBlockHash = getPreviousHashForBlock(startBlock);
}
// iterate over the blocks
Instant currentHour = null;
List<ChainFile> currentHoursFiles = null;
Expand Down Expand Up @@ -166,35 +190,48 @@ public void run() {
// print block info
System.out.println(" " + blockInfo);
// now we need to download the most common record file
// we will use the GCP bucket to download the file
byte[] recordFileBytes =
blockInfo.mostCommonRecordFile().chainFile().download(mainNetBucket);

// parse version information out of record file
final RecordFileVersionInfo recordFileVersionInfo = RecordFileVersionInfo.parse(recordFileBytes);

// download and parse all signature files
SignatureFile[] signatureFileBytes = blockInfo.signatureFiles().stream()
ParsedSignatureFile[] signatureFileBytes = blockInfo.signatureFiles().stream()
.parallel()
.map(chainFile -> chainFile.download(mainNetBucket))
.map(SignatureFile::parse)
.toArray(SignatureFile[]::new);
for (SignatureFile signatureFile : signatureFileBytes) {
// System.out.println(" signatureFile = " + signatureFile);
}
.map(cf -> ParsedSignatureFile.downloadAndParse(cf, mainNetBucket))
.toArray(ParsedSignatureFile[]::new);
// convert signature files to list of RecordFileSignatures
final List<RecordFileSignature> recordFileSignatures = Arrays.stream(signatureFileBytes)
.map(sigFile -> new RecordFileSignature(Bytes.wrap(sigFile.signature()), sigFile.nodeId()))
.toList();

// download most common sidecar file
List<Bytes> sideCars = new ArrayList<>();
// byte[] sidecarFileBytes = blockInfo.getMostCommonSidecarFileBytes(mainNetBucket);
List<SidecarFile> sideCars = blockInfo.sidecarFiles().values().stream()
.map(sidecarFile -> {
byte[] sidecarFileBytes = sidecarFile.mostCommonSidecarFile().chainFile().download(mainNetBucket);
try {
return SidecarFile.PROTOBUF.parse(Bytes.wrap(sidecarFileBytes));
} catch (ParseException e) {
throw new RuntimeException(e);
}
}).toList();

// build new Block File
final BlockHeader blockHeader = new BlockHeader(
recordFileVersionInfo.hapiProtoVersion(),
recordFileVersionInfo.hapiProtoVersion(),
blockNumber,
previousBlockHash,
new Timestamp(blockTimeInstant.getEpochSecond(), blockTimeInstant.getNano()),
BlockHashAlgorithm.SHA2_384);
final RecordFileItem recordFileItem = new RecordFileItem(
blockInfo.blockNum(),
new Timestamp(blockTimeInstant.getEpochSecond(), blockTimeInstant.getNano()),
Bytes.wrap(recordFileBytes),
sideCars,
BlockHashAlgorithm.SHA2_384,
Arrays.stream(signatureFileBytes)
.map(sigFile -> Bytes.wrap(sigFile.signature()))
.toList());
final Block block = new Block(Collections.singletonList(
sideCars,recordFileSignatures
);
final Block block = new Block(List.of(
new BlockItem(new OneOf<>(ItemOneOfType.BLOCK_HEADER, blockHeader)),
new BlockItem(new OneOf<>(ItemOneOfType.RECORD_FILE, recordFileItem))));
// write block to disk
final BlockPath blockPath = writeBlock(blocksDir, block);
Expand All @@ -211,6 +248,8 @@ public void run() {
}
System.out.println(Ansi.AUTO.string("@|bold,yellow Wrote block json to|@ " + blockJsonPath));
}
// update previous block hash
previousBlockHash = recordFileVersionInfo.blockHash();
}

} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import com.google.cloud.storage.StorageOptions;
import com.hedera.block.tools.commands.record2blocks.model.ChainFile;
import com.hedera.block.tools.commands.record2blocks.util.RecordFileDates;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -95,19 +97,53 @@ public MainNetBucket(boolean cacheEnabled, Path cacheDir, int minNodeAccountId,
* @return the bytes of the file
*/
public byte[] download(String path) {
try {
final Path cachedFilePath = cacheDir.resolve(path);
byte[] rawBytes;
if (cacheEnabled && Files.exists(cachedFilePath)) {
rawBytes = Files.readAllBytes(cachedFilePath);
} else {
rawBytes = STREAMS_BUCKET.get(path).getContent();
if (cacheEnabled) {
Files.createDirectories(cachedFilePath.getParent());
Path tempCachedFilePath = Files.createTempFile(cacheDir, null, ".tmp");
Files.write(tempCachedFilePath, rawBytes);
Files.move(tempCachedFilePath, cachedFilePath);
}
}
// if file is gzipped, unzip it
if (path.endsWith(".gz")) {
try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(rawBytes))) {
return gzipInputStream.readAllBytes();
}
} else {
return rawBytes;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Download a file from GCP as a stream, caching if CACHE_ENABLED is true. This is designed to be thread safe.
*
* @param path the path to the file in the bucket
* @return the stream of the file
*/
public java.io.InputStream downloadStreaming(String path) {
try {
Path cachedFilePath = cacheDir.resolve(path);
if (cacheEnabled && Files.exists(cachedFilePath)) {
return Files.readAllBytes(cachedFilePath);
return Files.newInputStream(cachedFilePath, StandardOpenOption.READ);
} else {
byte[] bytes = STREAMS_BUCKET.get(path).getContent();
final byte[] bytes = STREAMS_BUCKET.get(path).getContent();
if (cacheEnabled) {
Files.createDirectories(cachedFilePath.getParent());
Path tempCachedFilePath = Files.createTempFile(cacheDir, null, ".tmp");
Files.write(tempCachedFilePath, bytes);
Files.move(tempCachedFilePath, cachedFilePath);
}
return bytes;
return new ByteArrayInputStream(bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.net.URL;
import java.util.HexFormat;

/**
* Query Mirror Node and fetch block information
Expand All @@ -40,6 +42,19 @@ public static String getRecordFileNameForBlock(long blockNumber) {
return json.get("name").getAsString();
}

/**
* Get the previous hash for a block number from the mirror node.
*
* @param blockNumber the block number
* @return the record file name
*/
public static Bytes getPreviousHashForBlock(long blockNumber) {
final String url = "https://mainnet-public.mirrornode.hedera.com/api/v1/blocks/" + blockNumber;
final JsonObject json = readUrl(url);
final String hashStr = json.get("previous_hash").getAsString();
return Bytes.wrap(HexFormat.of().parseHex(hashStr.substring(2))); // remove 0x prefix and parse
}

/**
* Read a URL and return the JSON object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.extractRecordFileTime;

import com.hedera.block.tools.commands.record2blocks.gcp.MainNetBucket;
import java.io.InputStream;
import java.io.Serializable;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -87,6 +88,16 @@ public byte[] download(MainNetBucket mainNetBucket) {
return mainNetBucket.download(path);
}

/**
* Downloads the file from the bucket as a stream.
*
* @param mainNetBucket the main net bucket that contains the file
* @return the file as a stream
*/
public InputStream downloadStreaming(MainNetBucket mainNetBucket) {
return mainNetBucket.downloadStreaming(path);
}

/**
* Enum for the kind of file.
*/
Expand Down
Loading

0 comments on commit 5e0e3cd

Please sign in to comment.