Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2639 Improved Exporter and Importer #3926

Merged
merged 14 commits into from
Jan 10, 2024
Merged
28 changes: 15 additions & 13 deletions node/src/main/scala/com/wavesplatform/Exporter.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.wavesplatform

import com.google.common.collect.AbstractIterator

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import com.google.common.primitives.Ints
import com.wavesplatform.block.Block
import com.wavesplatform.database.protobuf.BlockMeta
Expand All @@ -19,6 +17,7 @@ import kamon.Kamon
import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB}
import scopt.OParser

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import scala.concurrent.Await
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
Expand All @@ -39,7 +38,7 @@ object Exporter extends ScorexLogging {
// noinspection ScalaStyle
def main(args: Array[String]): Unit = {
OParser.parse(commandParser, args, ExporterOptions()).foreach {
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportSnapshots, exportHeight, format) =>
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) =>
val settings = Application.loadApplicationConfig(configFile)

Using.resources(
Expand All @@ -53,8 +52,9 @@ object Exporter extends ScorexLogging {
val blocksOutputFilename = s"$blocksOutputFileNamePrefix-$height"
log.info(s"Blocks output file: $blocksOutputFilename")

val exportSnapshots = snapshotsOutputFileNamePrefix.isDefined
val snapshotsOutputFilename = if (exportSnapshots) {
val filename = s"$snapshotsOutputFileNamePrefix-$height"
val filename = s"${snapshotsOutputFileNamePrefix.get}-$height"
log.info(s"Snapshots output file: $filename")
Some(filename)
} else None
Expand All @@ -74,7 +74,12 @@ object Exporter extends ScorexLogging {
var exportedSnapshotsBytes = 0L
val start = System.currentTimeMillis()

new BlockSnapshotIterator(rdb, height, settings.enableLightMode).asScala.foreach { case (h, block, txSnapshots) =>
new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) =>
val txCount = block.transactionData.length
if (exportSnapshots && txCount != txSnapshots.length)
throw new RuntimeException(
s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted"
)
exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary)
snapshotsStream.foreach { output =>
exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots)
Expand All @@ -100,7 +105,8 @@ object Exporter extends ScorexLogging {
}
}

private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, isLightMode: Boolean) extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean)
extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
var nextTxEntry: Option[(Int, Transaction)] = None
var nextSnapshotEntry: Option[(Int, Array[Byte])] = None

Expand Down Expand Up @@ -156,7 +162,7 @@ object Exporter extends ScorexLogging {
case Some(_) => Seq.empty
case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx))
}
val snapshots = if (isLightMode) {
val snapshots = if (exportSnapshots) {
nextSnapshotEntry match {
case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h =>
nextSnapshotEntry = None
Expand Down Expand Up @@ -267,8 +273,7 @@ object Exporter extends ScorexLogging {
private[this] final case class ExporterOptions(
configFileName: Option[File] = None,
blocksOutputFileNamePrefix: String = "blockchain",
snapshotsFileNamePrefix: String = "snapshots",
exportSnapshots: Boolean = false,
snapshotsFileNamePrefix: Option[String] = None,
exportHeight: Option[Int] = None,
format: String = Formats.Binary
)
Expand All @@ -290,10 +295,7 @@ object Exporter extends ScorexLogging {
.action((p, c) => c.copy(blocksOutputFileNamePrefix = p)),
opt[String]('s', "snapshot-output-prefix")
.text("Snapshots output file name prefix")
.action((p, c) => c.copy(snapshotsFileNamePrefix = p)),
opt[Unit]('l', "export-snapshots")
.text("Export snapshots for light node")
.action((_, c) => c.copy(exportSnapshots = true)),
.action((p, c) => c.copy(snapshotsFileNamePrefix = Some(p))),
opt[Int]('h', "height")
.text("Export to height")
.action((h, c) => c.copy(exportHeight = Some(h)))
Expand Down
35 changes: 18 additions & 17 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object Importer extends ScorexLogging {
final case class ImportOptions(
configFile: Option[File] = None,
blockchainFile: String = "blockchain",
snapshotsFile: String = "snapshots",
snapshotsFile: Option[String] = None,
importHeight: Int = Int.MaxValue,
format: String = Formats.Binary,
verify: Boolean = true,
Expand All @@ -79,7 +79,7 @@ object Importer extends ScorexLogging {
.action((f, c) => c.copy(blockchainFile = f)),
opt[String]('s', "snapshots-file")
.text("Snapshots data file name")
.action((f, c) => c.copy(snapshotsFile = f)),
.action((f, c) => c.copy(snapshotsFile = Some(f))),
opt[Int]('h', "height")
.text("Import to height")
.action((h, c) => c.copy(importHeight = h))
Expand Down Expand Up @@ -357,10 +357,10 @@ object Importer extends ScorexLogging {
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val (blocksFileOffset, snapshotsFileOffset) =
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
var blocksOffset = 0L
var blocksOffset = 0
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
Expand All @@ -369,22 +369,23 @@ object Importer extends ScorexLogging {
blocksOffset += meta.size + 4
}
}

var totalSize = 0L
rdb.db.iterateOver(KeyTags.NthTransactionStateSnapshotAtHeight) { e =>
totalSize += (e.getValue.length + 4)
}

val snapshotsOffset = totalSize

blocksOffset -> snapshotsOffset
case _ => 0L -> 0L
blocksOffset
case _ =>
0
}
val blocksInputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, blocksFileOffset), 2 * 1024 * 1024)
val snapshotsInputStream =
if (settings.enableLightMode)
Some(new BufferedInputStream(initFileStream(importOptions.snapshotsFile, snapshotsFileOffset), 20 * 1024 * 1024))
else None
importOptions.snapshotsFile
.map { file =>
val inputStream = new BufferedInputStream(initFileStream(file, 0), 20 * 1024 * 1024)
val sizeBytes = new Array[Byte](Ints.BYTES)
(2 to blockchainUpdater.height).foreach { _ =>
ByteStreams.read(inputStream, sizeBytes, 0, 4)
val snapshotsSize = Ints.fromByteArray(sizeBytes)
ByteStreams.skipFully(inputStream, snapshotsSize)
}
inputStream
}

sys.addShutdownHook {
quit = true
Expand Down
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/database/RDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ object RDB extends StrictLogging {
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-meta").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions".utf8Bytes,
"tx".utf8Bytes,
txCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions-snapshot".utf8Bytes,
"tx-snapshot".utf8Bytes,
txSnapshotCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions-snapshot").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava)
)
).asJava,
handles
Expand Down