Skip to content

Commit

Permalink
NODE-2639 Improved Exporter and Importer (#3926)
Browse files Browse the repository at this point in the history
  • Loading branch information
xrtm000 authored Jan 10, 2024
1 parent 3042225 commit 3533edc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
28 changes: 15 additions & 13 deletions node/src/main/scala/com/wavesplatform/Exporter.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.wavesplatform

import com.google.common.collect.AbstractIterator

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import com.google.common.primitives.Ints
import com.wavesplatform.block.Block
import com.wavesplatform.database.protobuf.BlockMeta
Expand All @@ -19,6 +17,7 @@ import kamon.Kamon
import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB}
import scopt.OParser

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import scala.concurrent.Await
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
Expand All @@ -39,7 +38,7 @@ object Exporter extends ScorexLogging {
// noinspection ScalaStyle
def main(args: Array[String]): Unit = {
OParser.parse(commandParser, args, ExporterOptions()).foreach {
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportSnapshots, exportHeight, format) =>
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) =>
val settings = Application.loadApplicationConfig(configFile)

Using.resources(
Expand All @@ -53,8 +52,9 @@ object Exporter extends ScorexLogging {
val blocksOutputFilename = s"$blocksOutputFileNamePrefix-$height"
log.info(s"Blocks output file: $blocksOutputFilename")

val exportSnapshots = snapshotsOutputFileNamePrefix.isDefined
val snapshotsOutputFilename = if (exportSnapshots) {
val filename = s"$snapshotsOutputFileNamePrefix-$height"
val filename = s"${snapshotsOutputFileNamePrefix.get}-$height"
log.info(s"Snapshots output file: $filename")
Some(filename)
} else None
Expand All @@ -74,7 +74,12 @@ object Exporter extends ScorexLogging {
var exportedSnapshotsBytes = 0L
val start = System.currentTimeMillis()

new BlockSnapshotIterator(rdb, height, settings.enableLightMode).asScala.foreach { case (h, block, txSnapshots) =>
new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) =>
val txCount = block.transactionData.length
if (exportSnapshots && txCount != txSnapshots.length)
throw new RuntimeException(
s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted"
)
exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary)
snapshotsStream.foreach { output =>
exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots)
Expand All @@ -100,7 +105,8 @@ object Exporter extends ScorexLogging {
}
}

private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, isLightMode: Boolean) extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean)
extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
var nextTxEntry: Option[(Int, Transaction)] = None
var nextSnapshotEntry: Option[(Int, Array[Byte])] = None

Expand Down Expand Up @@ -156,7 +162,7 @@ object Exporter extends ScorexLogging {
case Some(_) => Seq.empty
case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx))
}
val snapshots = if (isLightMode) {
val snapshots = if (exportSnapshots) {
nextSnapshotEntry match {
case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h =>
nextSnapshotEntry = None
Expand Down Expand Up @@ -267,8 +273,7 @@ object Exporter extends ScorexLogging {
private[this] final case class ExporterOptions(
configFileName: Option[File] = None,
blocksOutputFileNamePrefix: String = "blockchain",
snapshotsFileNamePrefix: String = "snapshots",
exportSnapshots: Boolean = false,
snapshotsFileNamePrefix: Option[String] = None,
exportHeight: Option[Int] = None,
format: String = Formats.Binary
)
Expand All @@ -290,10 +295,7 @@ object Exporter extends ScorexLogging {
.action((p, c) => c.copy(blocksOutputFileNamePrefix = p)),
opt[String]('s', "snapshot-output-prefix")
.text("Snapshots output file name prefix")
.action((p, c) => c.copy(snapshotsFileNamePrefix = p)),
opt[Unit]('l', "export-snapshots")
.text("Export snapshots for light node")
.action((_, c) => c.copy(exportSnapshots = true)),
.action((p, c) => c.copy(snapshotsFileNamePrefix = Some(p))),
opt[Int]('h', "height")
.text("Export to height")
.action((h, c) => c.copy(exportHeight = Some(h)))
Expand Down
35 changes: 18 additions & 17 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object Importer extends ScorexLogging {
final case class ImportOptions(
configFile: Option[File] = None,
blockchainFile: String = "blockchain",
snapshotsFile: String = "snapshots",
snapshotsFile: Option[String] = None,
importHeight: Int = Int.MaxValue,
format: String = Formats.Binary,
verify: Boolean = true,
Expand All @@ -79,7 +79,7 @@ object Importer extends ScorexLogging {
.action((f, c) => c.copy(blockchainFile = f)),
opt[String]('s', "snapshots-file")
.text("Snapshots data file name")
.action((f, c) => c.copy(snapshotsFile = f)),
.action((f, c) => c.copy(snapshotsFile = Some(f))),
opt[Int]('h', "height")
.text("Import to height")
.action((h, c) => c.copy(importHeight = h))
Expand Down Expand Up @@ -357,10 +357,10 @@ object Importer extends ScorexLogging {
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val (blocksFileOffset, snapshotsFileOffset) =
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
var blocksOffset = 0L
var blocksOffset = 0
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
Expand All @@ -369,22 +369,23 @@ object Importer extends ScorexLogging {
blocksOffset += meta.size + 4
}
}

var totalSize = 0L
rdb.db.iterateOver(KeyTags.NthTransactionStateSnapshotAtHeight) { e =>
totalSize += (e.getValue.length + 4)
}

val snapshotsOffset = totalSize

blocksOffset -> snapshotsOffset
case _ => 0L -> 0L
blocksOffset
case _ =>
0
}
val blocksInputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, blocksFileOffset), 2 * 1024 * 1024)
val snapshotsInputStream =
if (settings.enableLightMode)
Some(new BufferedInputStream(initFileStream(importOptions.snapshotsFile, snapshotsFileOffset), 20 * 1024 * 1024))
else None
importOptions.snapshotsFile
.map { file =>
val inputStream = new BufferedInputStream(initFileStream(file, 0), 20 * 1024 * 1024)
val sizeBytes = new Array[Byte](Ints.BYTES)
(2 to blockchainUpdater.height).foreach { _ =>
ByteStreams.read(inputStream, sizeBytes, 0, 4)
val snapshotsSize = Ints.fromByteArray(sizeBytes)
ByteStreams.skipFully(inputStream, snapshotsSize)
}
inputStream
}

sys.addShutdownHook {
quit = true
Expand Down
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/database/RDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ object RDB extends StrictLogging {
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-meta").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions".utf8Bytes,
"tx".utf8Bytes,
txCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions-snapshot".utf8Bytes,
"tx-snapshot".utf8Bytes,
txSnapshotCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions-snapshot").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava)
)
).asJava,
handles
Expand Down

0 comments on commit 3533edc

Please sign in to comment.