From 306dd67fe146b7f7dc694d887c504b64aba403e7 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Fri, 1 Dec 2023 17:47:46 +0400 Subject: [PATCH] wip --- .../com/wavesplatform/mining/Miner.scala | 6 +- .../mining/microblocks/MicroBlockMiner.scala | 2 +- .../microblocks/MicroBlockMinerImpl.scala | 89 ++++++++----------- .../com/wavesplatform/network/messages.scala | 6 +- 4 files changed, 48 insertions(+), 55 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index c02c414b39..5ef98f4f37 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -309,8 +309,12 @@ class MinerImpl( }.uncancelable for { - _ <- waitBlockAppendedTask + elapsed <- waitBlockAppendedTask.timed.map(_._1) + newOffset = (offset - elapsed).max(Duration.Zero) + + _ <- Task(microBlockAttempt := SerialCancelable()).delayExecution(newOffset) result <- Task(forgeBlock(account)).executeOn(minerScheduler) + _ <- result match { case Right((block, totalConstraint)) => appendTask(block, totalConstraint) diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala index d53d1cf88b..5f895c14d6 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala @@ -17,7 +17,7 @@ trait MicroBlockMiner { account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] } diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index 871b27abd1..f6b0f764b5 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -45,15 +45,15 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] = - generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, lastMicroBlock) + generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs) .flatMap { case res @ Success(newBlock, newConstraint) => Task.defer(generateMicroBlockSequence(account, newBlock, newConstraint, res.nanoTime)) case Retry => Task - .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, lastMicroBlock)) + .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs)) .delayExecution(1 second) case Stop => setDebugState(MinerDebugInfo.MiningBlocks) @@ -65,7 +65,7 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[MicroBlockMiningResult] = { val packTask = Task.cancelable[(Option[Seq[Transaction]], MiningConstraint, Option[ByteStr])] { cb => @volatile var cancelled = false @@ -93,8 +93,8 @@ class MicroBlockMinerImpl( ) ) ) - log.trace(s"Finished pack for ${accumulatedBlock.id()}") val updatedTotalConstraint = updatedMdConstraint.head + log.trace(s"Finished pack for ${accumulatedBlock.id()}, updated total constraint: $updatedTotalConstraint") cb.onSuccess((unconfirmed, updatedTotalConstraint, stateHash)) } Task.eval { @@ -104,24 +104,25 @@ class MicroBlockMinerImpl( packTask.flatMap { case (Some(unconfirmed), updatedTotalConstraint, stateHash) if unconfirmed.nonEmpty => - val delay = { - val delay = System.nanoTime() - lastMicroBlock - val requiredDelay = settings.microBlockInterval.toNanos - if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos - } - for { - _ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")) - _ <- Task.sleep(delay) - r <- - if (blockchainUpdater.lastBlockId.forall(_ == accumulatedBlock.id())) { - log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") - appendAndBroadcastMicroBlock(account, accumulatedBlock, unconfirmed, updatedTotalConstraint, stateHash) - } else { - log.trace(s"Stopping generating microBlock for ${account.toAddress}, new key block was appended") - Task(Stop) - } - } yield r + blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash) + .leftWiden[Throwable] + .liftTo[Task] + (signedBlock, microBlock) = blocks + delay = { + val delay = System.nanoTime() - prevMicroBlockTs + val requiredDelay = settings.microBlockInterval.toNanos + if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos + } + _ <- + if (delay > Duration.Zero) { + log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock") + Task.sleep(delay) + } else Task.unit + _ <- appendMicroBlock(microBlock, account) + } yield + if (updatedTotalConstraint.isFull) Stop + else Success(signedBlock, updatedTotalConstraint) case (_, updatedTotalConstraint, _) => if (updatedTotalConstraint.isFull) { @@ -139,39 +140,27 @@ class MicroBlockMinerImpl( } } - private def appendAndBroadcastMicroBlock( - account: KeyPair, - block: Block, - transactions: Seq[Transaction], - constraint: MiningConstraint, - stateHash: Option[BlockId] - ): Task[MicroBlockMiningResult] = - for { - (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash).leftWiden[Throwable].liftTo[Task] - blockId <- appendMicroBlock(microBlock) - _ = BlockStats.mined(microBlock, blockId) - _ <- broadcastMicroBlock(account, microBlock, blockId) - } yield - if (constraint.isFull) Stop - else Success(signedBlock, constraint) - - private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = - Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))) - - private def appendMicroBlock(microBlock: MicroBlock): Task[BlockId] = - MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None) - .flatMap { - case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) - case Right(v) => Task.now(v) - } + private def appendMicroBlock(microBlock: MicroBlock, account: KeyPair): Task[BlockId] = + MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None).flatMap { + case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) + case Right(blockId) => + Task.evalAsync { + BlockStats.mined(microBlock, blockId) + if (allChannels != null) { + allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference)) + } + blockId + } + }.uncancelable private def forgeBlocks( account: KeyPair, accumulatedBlock: Block, - unconfirmed: Seq[Transaction], + packedTxs: Seq[Transaction], stateHash: Option[ByteStr] ): Either[MicroBlockMiningError, (Block, MicroBlock)] = microBlockBuildTimeStats.measureSuccessful { + log.trace(s"Forging microBlock for ${account.toAddress}") for { signedBlock <- Block .buildAndSign( @@ -180,7 +169,7 @@ class MicroBlockMinerImpl( reference = accumulatedBlock.header.reference, baseTarget = accumulatedBlock.header.baseTarget, generationSignature = accumulatedBlock.header.generationSignature, - txs = accumulatedBlock.transactionData ++ unconfirmed, + txs = accumulatedBlock.transactionData ++ packedTxs, signer = account, featureVotes = accumulatedBlock.header.featureVotes, rewardVote = accumulatedBlock.header.rewardVote, @@ -189,7 +178,7 @@ class MicroBlockMinerImpl( ) .leftMap(BlockBuildError) microBlock <- MicroBlock - .buildAndSign(signedBlock.header.version, account, unconfirmed, accumulatedBlock.id(), signedBlock.signature, stateHash) + .buildAndSign(signedBlock.header.version, account, packedTxs, accumulatedBlock.id(), signedBlock.signature, stateHash) .leftMap(MicroBlockBuildError) } yield (signedBlock, microBlock) } diff --git a/node/src/main/scala/com/wavesplatform/network/messages.scala b/node/src/main/scala/com/wavesplatform/network/messages.scala index 896b906246..f5b09c7413 100644 --- a/node/src/main/scala/com/wavesplatform/network/messages.scala +++ b/node/src/main/scala/com/wavesplatform/network/messages.scala @@ -80,9 +80,9 @@ case class MicroBlockInv(sender: PublicKey, totalBlockId: ByteStr, reference: By } object MicroBlockInv { - def apply(sender: KeyPair, totalBlockRef: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { - val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockRef.arr ++ prevBlockRef.arr) - new MicroBlockInv(sender.publicKey, totalBlockRef, prevBlockRef, signature) + def apply(sender: KeyPair, totalBlockId: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { + val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockId.arr ++ prevBlockRef.arr) + new MicroBlockInv(sender.publicKey, totalBlockId, prevBlockRef, signature) } }