diff --git a/build.sbt b/build.sbt index bee84cd5d..dc7805904 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.1", + version := "2.0.2", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/examples/src/test/resources/settings.conf b/examples/src/test/resources/settings.conf index 13bacc4a0..19c596481 100644 --- a/examples/src/test/resources/settings.conf +++ b/examples/src/test/resources/settings.conf @@ -165,6 +165,13 @@ sparkz { # Max penalty score peer can accumulate before being banned penaltyScoreThreshold = 100 + ######################### + # Other settings # + ######################### + + # Enables transactions in the mempool + handlingTransactionsEnabled = true + } ntp { @@ -192,4 +199,4 @@ sparkz { password = "cookies" walletDir = "/tmp/scorex-test/data/wallet" } -} \ No newline at end of file +} diff --git a/release-notes.md b/release-notes.md index 3a73330c9..f681d7c62 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,8 @@ +2.0.2 +--------- +* P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled +* Seeder nodes support + 2.0.1 --------- * P2p rate limitng feature diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index aa7addd94..9332f588b 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -181,6 +181,18 @@ sparkz { # The impact is single measurement has on a an average processing value. slowModeMeasurementImpact = 0.1 + # Enabled or disables rebroadcasting of modifiers after the slow mode is over. + rebroadcastEnabled = false + + # Delay between rebroadcasting batches of modifiers. + rebroadcastDelay = 30s + + # Maximum number of modifiers to keep in the rebroadcast queue. + rebroadcastQueueSize = 1024 + + # number of modifiers to re-broadcast every rebroadcastDelay + rebroadcastBatchSize = 75 + # Desired number of inv objects. Our requests will have this size. desiredInvObjects = 512 @@ -201,6 +213,12 @@ sparkz { # Max penalty score peer can accumulate before being banned penaltyScoreThreshold = 100 + ######################### + # Other settings # + ######################### + + # Enables transactions in the mempool + handlingTransactionsEnabled = true } ntp { @@ -213,4 +231,4 @@ sparkz { # server answer timeout timeout = 30s } -} \ No newline at end of file +} diff --git a/src/main/scala/sparkz/core/NodeViewHolder.scala b/src/main/scala/sparkz/core/NodeViewHolder.scala index 9cc18909e..d7ff8d778 100644 --- a/src/main/scala/sparkz/core/NodeViewHolder.scala +++ b/src/main/scala/sparkz/core/NodeViewHolder.scala @@ -149,18 +149,22 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier] //todo: this method causes delays in a block processing as it removes transactions from mempool and checks //todo: validity of remaining transactions in a synchronous way. Do this job async! protected def updateMemPool(blocksRemoved: Seq[PMOD], blocksApplied: Seq[PMOD], memPool: MP, state: MS): MP = { - val rolledBackTxs = blocksRemoved.flatMap(extractTransactions) + if (sparksSettings.network.handlingTransactionsEnabled){ + val rolledBackTxs = blocksRemoved.flatMap(extractTransactions) - val appliedTxs = blocksApplied.flatMap(extractTransactions) + val appliedTxs = blocksApplied.flatMap(extractTransactions) - memPool.putWithoutCheck(rolledBackTxs).filter { tx => - !appliedTxs.exists(t => t.id == tx.id) && { - state match { - case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess - case _ => true + memPool.putWithoutCheck(rolledBackTxs).filter { tx => + !appliedTxs.exists(t => t.id == tx.id) && { + state match { + case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess + case _ => true + } } } } + else + memPool } private def trimChainSuffix(suffix: IndexedSeq[PMOD], rollbackPoint: sparkz.util.ModifierId): IndexedSeq[PMOD] = { @@ -344,7 +348,11 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier] protected def transactionsProcessing: Receive = { case newTxs: NewTransactions[TX @unchecked] => - newTxs.txs.foreach(txModify) + if (sparksSettings.network.handlingTransactionsEnabled) + newTxs.txs.foreach(txModify) + else + newTxs.txs.foreach(tx => context.system.eventStream.publish( + FailedTransaction(tx.id, new Exception("Transactions handling disabled"), immediateFailure = false))) case EliminateTransactions(ids) => val updatedPool = memoryPool().filter(tx => !ids.contains(tx.id)) updateNodeView(updatedMempool = Some(updatedPool)) diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index c9821d7ec..943a94cd8 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -3,7 +3,7 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Cancellable} import sparkz.core.consensus.ContainsModifiers import sparkz.core.network.ModifiersStatus._ -import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.CheckDelivery +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{CheckDelivery, TransactionRebroadcast} import sparkz.core.settings.NetworkSettings import sparkz.util.SparkzEncoding import sparkz.core.{ModifierTypeId, NodeViewModifier} @@ -38,7 +38,7 @@ import scala.util.{Failure, Try} */ class DeliveryTracker(system: ActorSystem, networkSettings: NetworkSettings, - nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding { + nvsRef: ActorRef)(implicit ec: ExecutionContext) extends SparkzLogging with SparkzEncoding { protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks @@ -47,6 +47,10 @@ class DeliveryTracker(system: ActorSystem, protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested protected val slowModeMeasurementImpact: Double = networkSettings.slowModeMeasurementImpact + protected val rebroadcastEnabled: Boolean = networkSettings.rebroadcastEnabled + protected val rebroadcastDelay: FiniteDuration = networkSettings.rebroadcastDelay + protected val rebroadcastQueueSize: Int = networkSettings.rebroadcastQueueSize + protected val rebroadcastBatchSize: Int = networkSettings.rebroadcastBatchSize protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int) @@ -62,6 +66,11 @@ class DeliveryTracker(system: ActorSystem, // when our node received a modifier we put it to `received` protected val received: mutable.Map[ModifierId, (ConnectedPeer, Long)] = mutable.Map() + // when the modifier is created but not broadcasted, we put it to `rebroadcastQueue` + protected val rebroadcastQueue: mutable.Queue[ModifierId] = mutable.Queue() + + private var transactionRebroadcastEvent: Cancellable = Cancellable.alreadyCancelled + private var averageProcessingTimeMs: Long = 0 var slowMode: Boolean = false @@ -92,8 +101,7 @@ class DeliveryTracker(system: ActorSystem, * * @return `true` if number of checks was not exceed, `false` otherwise */ - def onStillWaiting(cp: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierId: ModifierId) - (implicit ec: ExecutionContext): Try[Unit] = + def onStillWaiting(cp: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierId: ModifierId): Try[Unit] = tryWithLogging { val checks = requested(modifierId).checks + 1 setUnknown(modifierId) @@ -104,8 +112,7 @@ class DeliveryTracker(system: ActorSystem, /** * Set status of modifier with id `id` to `Requested` */ - private def setRequested(id: ModifierId, typeId: ModifierTypeId, supplier: ConnectedPeer, checksDone: Int = 0) - (implicit ec: ExecutionContext): Unit = + private def setRequested(id: ModifierId, typeId: ModifierTypeId, supplier: ConnectedPeer, checksDone: Int = 0): Unit = tryWithLogging { require(isCorrectTransition(status(id), Requested), s"Illegal status transition: ${status(id)} -> Requested") val cancellable = system.scheduler.scheduleOnce(deliveryTimeout, nvsRef, CheckDelivery(supplier, typeId, id)) @@ -116,8 +123,8 @@ class DeliveryTracker(system: ActorSystem, } } - def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer) - (implicit ec: ExecutionContext): Unit = ids.foreach(setRequested(_, typeId, cp)) + def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer): Unit = + ids.foreach(setRequested(_, typeId, cp)) /** * Modified with id `id` is permanently invalid - set its status to `Invalid` @@ -219,6 +226,32 @@ class DeliveryTracker(system: ActorSystem, !(slowModeFeatureFlag && slowMode && requested.size > slowModeMaxRequested) } + def putInRebroadcastQueue(modifierId: ModifierId): Unit = { + if (rebroadcastEnabled) { + rebroadcastQueue.enqueue(modifierId) + while (rebroadcastQueue.size > rebroadcastQueueSize) { + rebroadcastQueue.dequeue() + } + } + } + + def getRebroadcastModifiers: Seq[ModifierId] = { + rebroadcastQueue.take(rebroadcastBatchSize).toSeq + .map(_ => rebroadcastQueue.dequeue()) + } + + /** + * Schedule rebroadcast of transactions if rebroadcast is enabled and there are transactions to rebroadcast. + * If there is already scheduled rebroadcast, cancel it and schedule new one, it means that during the delay node + * entered slow mode again. + */ + def scheduleRebroadcastIfNeeded(): Unit = { + if (rebroadcastEnabled && rebroadcastQueue.nonEmpty) { + transactionRebroadcastEvent.cancel() + transactionRebroadcastEvent = system.scheduler.scheduleOnce(rebroadcastDelay, nvsRef, TransactionRebroadcast) + } + } + private def incrementPeerLimitCounter(peer: ConnectedPeer): Unit = { peerLimits.get(peer) match { case Some(value) => peerLimits.put(peer, value + 1) @@ -290,6 +323,7 @@ class DeliveryTracker(system: ActorSystem, } else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) { slowMode = false logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.") + scheduleRebroadcastIfNeeded() } } } diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 3c17495f4..1bb5d51cd 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -374,7 +374,12 @@ class NetworkController(settings: NetworkSettings, } } val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress) - val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature + val mandatoryFeatures = if (settings.handlingTransactionsEnabled) { + sparkzContext.features :+ mySessionIdFeature + } + else { + sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature() + } val peerFeatures = if (isLocal) { val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort) val localAddrFeature = LocalAddressPeerFeature(la) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index cb3a7a3ae..d2ad5181f 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -23,6 +23,7 @@ import sparkz.util.{ModifierId, SparkzEncoding, SparkzLogging} import java.nio.ByteBuffer import scala.annotation.{nowarn, tailrec} +import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps @@ -96,22 +97,30 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes protected def broadcastModifierInv[M <: NodeViewModifier](m: M): Unit = { m.modifierTypeId match { - case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load + case Transaction.ModifierTypeId if deliveryTracker.slowMode => + deliveryTracker.putInRebroadcastQueue(m.id) case _ => val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) - networkControllerRef ! SendToNetwork(msg, Broadcast) + if (m.modifierTypeId == Transaction.ModifierTypeId) + networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) + else + networkControllerRef ! SendToNetwork(msg, Broadcast) } } protected def viewHolderEvents: Receive = { case SuccessfulTransaction(tx) => - deliveryTracker.setHeld(tx.id) - broadcastModifierInv(tx) + if (networkSettings.handlingTransactionsEnabled) { + deliveryTracker.setHeld(tx.id) + broadcastModifierInv(tx) + } case FailedTransaction(id, _, immediateFailure) => - val senderOpt = deliveryTracker.setInvalid(id) - // penalize sender only in case transaction was invalidated at first validation. - if (immediateFailure) senderOpt.foreach(penalizeMisbehavingPeer) + if (networkSettings.handlingTransactionsEnabled) { + val senderOpt = deliveryTracker.setInvalid(id) + // penalize sender only in case transaction was invalidated at first validation. + if (immediateFailure) senderOpt.foreach(penalizeMisbehavingPeer) + } case SyntacticallySuccessfulModifier(mod) => deliveryTracker.setHeld(mod.id) @@ -225,10 +234,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val modifierTypeId = invData.typeId val newModifierIds = (modifierTypeId match { case Transaction.ModifierTypeId => - if (deliveryTracker.canRequestMoreTransactions) + if (deliveryTracker.canRequestMoreTransactions && networkSettings.handlingTransactionsEnabled) invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown) else - Seq() // do not request transactions due to the high load + Seq.empty case _ => invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown) }) @@ -250,7 +259,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes readersOpt.foreach { readers => val objs: Seq[NodeViewModifier] = invData.typeId match { case typeId: ModifierTypeId if typeId == Transaction.ModifierTypeId => - readers._2.getAll(invData.ids) + if (networkSettings.handlingTransactionsEnabled) + readers._2.getAll(invData.ids) + else + Seq.empty case _: ModifierTypeId => invData.ids.flatMap(id => readers._1.modifierById(id)) } @@ -437,6 +449,18 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes } } + protected def transactionRebroadcast: Receive = { + case TransactionRebroadcast => + val mods = deliveryTracker.getRebroadcastModifiers + mempoolReaderOpt match { + case Some(mempool) => + mempool.getAll(ids = mods).foreach { tx =>broadcastModifierInv(tx) } + case None => + log.warn(s"Trying to rebroadcast while readers are not ready $mempoolReaderOpt") + } + deliveryTracker.scheduleRebroadcastIfNeeded() + } + override def receive: Receive = processDataFromPeer orElse getLocalSyncInfo orElse @@ -444,7 +468,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes responseFromLocal orElse viewHolderEvents orElse peerManagerEvents orElse - checkDelivery orElse { + checkDelivery orElse + transactionRebroadcast orElse { case a: Any => log.error("Strange input: " + a) } @@ -467,6 +492,8 @@ object NodeViewSynchronizer { // getLocalSyncInfo messages case object SendLocalSyncInfo + case object TransactionRebroadcast + case class ResponseFromLocal[M <: NodeViewModifier](source: ConnectedPeer, modifierTypeId: ModifierTypeId, localObjects: Seq[M]) /** @@ -586,3 +613,43 @@ object NodeViewSynchronizerRef { system.actorOf(props[TX, SI, SIS, PMOD, HR, MR](networkControllerRef, viewHolderRef, syncInfoSpec, networkSettings, timeProvider, modifierSerializers), name) } + +object Test extends App { + + test() + + protected val rebroadcastQueue: mutable.Queue[String] = mutable.Queue() + + def test(): Unit = { + putInRebroadcastQueue("1") + putInRebroadcastQueue("2") + putInRebroadcastQueue("3") + putInRebroadcastQueue("4") + putInRebroadcastQueue("5") + putInRebroadcastQueue("6") + putInRebroadcastQueue("7") + putInRebroadcastQueue("8") + putInRebroadcastQueue("9") + putInRebroadcastQueue("10") + + println(s"get modifiers = $getRebroadcastModifiers") + + println(s"queue = $rebroadcastQueue") + + println(s"get modifiers = $getRebroadcastModifiers") + + println(s"queue = $rebroadcastQueue") + } + + def putInRebroadcastQueue(modifierId: String): Unit = { + rebroadcastQueue.enqueue(modifierId) + } + + def getRebroadcastModifiers: Seq[String] = { + val mods = rebroadcastQueue.take(5).toSeq + rebroadcastQueue.drop(5) + mods + } + + +} \ No newline at end of file diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 2d0de8206..4d228b032 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,5 +1,7 @@ package sparkz.core.network +import sparkz.core.network.peer.TransactionsDisabledPeerFeature + import java.security.SecureRandom trait SendingStrategy { @@ -21,6 +23,14 @@ case object Broadcast extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers } + +case object BroadcastTransaction extends SendingStrategy { + override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { + peers.filter(p => p.peerInfo.flatMap{ + info => info.peerSpec.features.collectFirst {case f:TransactionsDisabledPeerFeature => true}}.isEmpty) + } +} + case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers.filterNot(exceptOf.contains) diff --git a/src/main/scala/sparkz/core/network/peer/TransactionsDisabledPeerFeature.scala b/src/main/scala/sparkz/core/network/peer/TransactionsDisabledPeerFeature.scala new file mode 100644 index 000000000..5c8050cb4 --- /dev/null +++ b/src/main/scala/sparkz/core/network/peer/TransactionsDisabledPeerFeature.scala @@ -0,0 +1,35 @@ +package sparkz.core.network.peer + +import sparkz.core.network.PeerFeature +import sparkz.core.network.PeerFeature.Id +import sparkz.core.network.message.Message +import sparkz.util.serialization._ +import sparkz.core.serialization.SparkzSerializer + +/** + * This peer feature allows to detect peers who don't support transactions + */ +case class TransactionsDisabledPeerFeature() extends PeerFeature { + + override type M = TransactionsDisabledPeerFeature + override val featureId: Id = TransactionsDisabledPeerFeature.featureId + + override def serializer: TransactionsDisabledPeerFeatureSerializer.type = TransactionsDisabledPeerFeatureSerializer + +} + +object TransactionsDisabledPeerFeature { + + val featureId: Id = 4: Byte + +} + +object TransactionsDisabledPeerFeatureSerializer extends SparkzSerializer[TransactionsDisabledPeerFeature] { + + + override def parse(r: Reader): TransactionsDisabledPeerFeature = { + TransactionsDisabledPeerFeature() + } + + override def serialize(obj: TransactionsDisabledPeerFeature, w: Writer): Unit = {} +} diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 0645da1c5..5e9939831 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -35,6 +35,10 @@ case class NetworkSettings(nodeName: String, slowModeThresholdMs: Long, slowModeMaxRequested: Int, slowModeMeasurementImpact: Double, + rebroadcastEnabled: Boolean, + rebroadcastDelay: FiniteDuration, + rebroadcastQueueSize: Int, + rebroadcastBatchSize: Int, appVersion: String, agentName: String, maxModifiersSpecMessageSize: Int, @@ -57,7 +61,8 @@ case class NetworkSettings(nodeName: String, storageBackupDelay: FiniteDuration, temporalBanDuration: FiniteDuration, penaltySafeInterval: FiniteDuration, - penaltyScoreThreshold: Int) + penaltyScoreThreshold: Int, + handlingTransactionsEnabled: Boolean) case class SparkzSettings(dataDir: File, logDir: File, diff --git a/src/test/resources/reference.conf b/src/test/resources/reference.conf index 1ee826b33..fe05fa5d8 100644 --- a/src/test/resources/reference.conf +++ b/src/test/resources/reference.conf @@ -187,6 +187,13 @@ sparkz { # Max penalty score peer can accumulate before being banned penaltyScoreThreshold = 100 + ############################ + # Synchronization settings # + ############################ + + # Enable p2p transactions synchronization + syncTransactionsEnabled = true + } ntp { diff --git a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala index c0d2007bc..b5279438e 100644 --- a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala +++ b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala @@ -10,6 +10,7 @@ import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import sparkz.ObjectGenerators import sparkz.core.consensus.ContainsModifiers import sparkz.core.network.ModifiersStatus._ +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.TransactionRebroadcast import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.NetworkSettings import sparkz.core.{ModifierTypeId, PersistentNodeViewModifier} @@ -212,6 +213,117 @@ class DeliveryTrackerSpecification extends AnyPropSpec deliveryTracker.slowMode shouldBe false } + property(" should schedule an event to rebroadcast modifiers") { + val system = ActorSystem() + val probe = TestProbe("p")(system) + implicit val nvsStub: ActorRef = probe.testActor + val dt = FiniteDuration(3, MINUTES) + val networkSettings = mock[NetworkSettings] + when(networkSettings.deliveryTimeout).thenReturn(dt) + when(networkSettings.maxDeliveryChecks).thenReturn(2) + when(networkSettings.maxRequestedPerPeer).thenReturn(3) + when(networkSettings.slowModeFeatureFlag).thenReturn(true) + when(networkSettings.slowModeThresholdMs).thenReturn(100) + when(networkSettings.slowModeMeasurementImpact).thenReturn(0.1) + when(networkSettings.rebroadcastEnabled).thenReturn(true) + when(networkSettings.rebroadcastDelay).thenReturn(Duration.fromNanos(1000)) + when(networkSettings.rebroadcastBatchSize).thenReturn(5) + when(networkSettings.rebroadcastQueueSize).thenReturn(1000) + + val deliveryTracker = new DeliveryTracker( + system, + networkSettings, + nvsRef = nvsStub) + deliveryTracker.slowMode shouldBe false + val modifiersBatch1 = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + val throttledModifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + + // engage slow mode + deliveryTracker.setRequested(modifiersBatch1, mtid, cp) + modifiersBatch1.foreach(deliveryTracker.setReceived(_, cp)) + Thread.sleep(200) + modifiersBatch1.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe true + + // send throttled transactions + throttledModifiers.foreach(deliveryTracker.putInRebroadcastQueue) + + // end slow mode + deliveryTracker.setRequested(modifiersBatch1, mtid, cp) + modifiersBatch1.foreach(deliveryTracker.setReceived(_, cp)) + modifiersBatch1.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe false + + // assert rebroadcast logic + probe.expectMsg(TransactionRebroadcast) + val batch1 = deliveryTracker.getRebroadcastModifiers + batch1 should ( + have size 5 and + contain theSameElementsAs throttledModifiers.take(5) + ) + + deliveryTracker.scheduleRebroadcastIfNeeded() + probe.expectMsg(TransactionRebroadcast) + val batch2 = deliveryTracker.getRebroadcastModifiers + batch2 should ( + have size 5 and + contain theSameElementsAs throttledModifiers.drop(5) + ) + + deliveryTracker.scheduleRebroadcastIfNeeded() + probe.expectNoMessage(200.millis) + deliveryTracker.getRebroadcastModifiers should have size 0 + } + + property("should not rebroadcast modifiers if disabled") { + val system = ActorSystem() + val probe = TestProbe("p")(system) + implicit val nvsStub: ActorRef = probe.testActor + val dt = FiniteDuration(3, MINUTES) + val networkSettings = mock[NetworkSettings] + when(networkSettings.deliveryTimeout).thenReturn(dt) + when(networkSettings.maxDeliveryChecks).thenReturn(2) + when(networkSettings.maxRequestedPerPeer).thenReturn(3) + when(networkSettings.slowModeFeatureFlag).thenReturn(true) + when(networkSettings.slowModeThresholdMs).thenReturn(100) + when(networkSettings.slowModeMeasurementImpact).thenReturn(0.1) + when(networkSettings.rebroadcastEnabled).thenReturn(false) + when(networkSettings.rebroadcastDelay).thenReturn(Duration.fromNanos(1000)) + when(networkSettings.rebroadcastBatchSize).thenReturn(5) + when(networkSettings.rebroadcastQueueSize).thenReturn(1000) + + val deliveryTracker = new DeliveryTracker( + system, + networkSettings, + nvsRef = nvsStub) + deliveryTracker.slowMode shouldBe false + val modifiersBatch1 = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + val throttledModifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + + // engage slow mode + deliveryTracker.setRequested(modifiersBatch1, mtid, cp) + modifiersBatch1.foreach(deliveryTracker.setReceived(_, cp)) + Thread.sleep(200) + modifiersBatch1.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe true + + // send throttled transactions + throttledModifiers.foreach(deliveryTracker.putInRebroadcastQueue) + + // end slow mode + deliveryTracker.setRequested(modifiersBatch1, mtid, cp) + modifiersBatch1.foreach(deliveryTracker.setReceived(_, cp)) + modifiersBatch1.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe false + + // assert rebroadcast logic + probe.expectNoMessage(200.millis) + deliveryTracker.getRebroadcastModifiers should have size 0 + deliveryTracker.scheduleRebroadcastIfNeeded() + probe.expectNoMessage(200.millis) + } + + private def genDeliveryTracker = { val system = ActorSystem() val probe = TestProbe("p")(system) diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index 2cd55441a..f3dc238d8 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -19,6 +19,7 @@ import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedP import sparkz.core.network.message._ import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding} import sparkz.core.network.peer._ +import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.SparkzSettings import sparkz.core.utils.LocalTimeProvider @@ -31,7 +32,8 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { import scala.concurrent.ExecutionContext.Implicits.global - private val featureSerializers = Map(LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer) + private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) "A NetworkController" should "send local address on handshake when peer and node address are in localhost" in { implicit val system: ActorSystem = ActorSystem() @@ -110,6 +112,47 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } + it should "not send TransactionsDisabledPeerFeature on handshake when handlingTransactionsEnabled is true" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + val (networkControllerRef: ActorRef, _) = createNetworkController(settings, tcpManagerProbe) + val testPeer = new TestPeer(settings, networkControllerRef, tcpManagerProbe) + + val peerAddr = new InetSocketAddress("127.0.0.1", 5678) + val nodeAddr = new InetSocketAddress("127.0.0.1", settings.network.bindAddress.getPort) + testPeer.connectAndExpectSuccessfulMessages(peerAddr, nodeAddr, Tcp.ResumeReading) + + val handshakeFromNode = testPeer.receiveHandshake + handshakeFromNode.peerSpec.features.exists{case f:TransactionsDisabledPeerFeature => true; case _ => false} shouldBe false + + system.terminate() + } + + it should "send TransactionsDisabledPeerFeature on handshake when handlingTransactionsEnabled is false" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = false + ) + ) + + val (networkControllerRef: ActorRef, _) = createNetworkController(networkSettings, tcpManagerProbe) + val testPeer = new TestPeer(networkSettings, networkControllerRef, tcpManagerProbe) + + val peerAddr = new InetSocketAddress("127.0.0.1", 5678) + val nodeAddr = new InetSocketAddress("127.0.0.1", networkSettings.network.bindAddress.getPort) + testPeer.connectAndExpectSuccessfulMessages(peerAddr, nodeAddr, Tcp.ResumeReading) + + val handshakeFromNode = testPeer.receiveHandshake + handshakeFromNode.peerSpec.features.exists { case f: TransactionsDisabledPeerFeature => true; case _ => false } shouldBe true + + system.terminate() + } + + it should "send known public peers" in { implicit val system: ActorSystem = ActorSystem() @@ -582,7 +625,8 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana (implicit ec: ExecutionContext) extends Matchers { private val timeProvider = LocalTimeProvider - private val featureSerializers = Map(LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer) + private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) private val handshakeSerializer = new HandshakeSpec(featureSerializers, Int.MaxValue) private val peersSpec = new PeersSpec(featureSerializers, settings.network.maxPeerSpecObjects) private val messageSpecs = Seq(GetPeersSpec, peersSpec) diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index aea418ad3..7cb4a937d 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -3,8 +3,10 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} -import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs} +import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction} import sparkz.core.network.message._ +import sparkz.core.network.peer.PenaltyType import sparkz.core.network.peer.PenaltyType.MisbehaviorPenalty import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.SparkzSettings @@ -23,10 +25,14 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa implicit val actorSystem: ActorSystem = ActorSystem() implicit val executionContext: ExecutionContext = actorSystem.dispatchers.lookup("sparkz.executionContext") private val modifiersSpec = new ModifiersSpec(1024 * 1024) + private val requestModifierSpec = new RequestModifierSpec(settings.network.maxInvObjects) + private val invSpec = new InvSpec(settings.network.maxInvObjects) private val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(settings) - private val peer = ConnectedPeer(ConnectionId(new InetSocketAddress(10), new InetSocketAddress(11), Incoming), TestProbe().ref, 0L, None) - - private val messageSerializer = new MessageSerializer(Seq(modifiersSpec), settings.network.magicBytes, settings.network.messageLengthBytesLimit) + private val peerProbe = TestProbe() + private val peer = ConnectedPeer(ConnectionId(new InetSocketAddress(10), new InetSocketAddress(11), Incoming), peerProbe.ref, 0L, None) + private val messageSerializer = new MessageSerializer(Seq(modifiersSpec, invSpec, requestModifierSpec), + settings.network.magicBytes, + settings.network.messageLengthBytesLimit) "NodeViewSynchronizer" should "not penalize peer for sending valid modifiers" in { @@ -47,6 +53,203 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa networkController.expectMsg(PenalizePeer(peer.connectionId.remoteAddress, MisbehaviorPenalty)) } + it should "request transactions if handlingTransactionsEnabled is true" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = true + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + setupHistoryAndMempoolReaders(synchronizer) + + val transaction = TestTransaction(1, 1) + val invMsg = Message(invSpec, Right(InvData(Transaction.ModifierTypeId, Seq(transaction.id ))), Some(peer)) + + synchronizer ! roundTrip(invMsg) + + viewHolder.expectNoMessage() + networkController.expectNoMessage() + peerProbe.receiveOne(0.seconds) match { + case Message(mod: MessageSpec[_], data: Right[_, _], _) => + assert(mod.messageCode == RequestModifierSpec.MessageCode) + assert(data == Right(InvData(Transaction.ModifierTypeId, Vector(transaction.id)))) + case _ => fail("Wrong message received by peer connection handler") + } + } + + it should "not request transactions if handlingTransactionsEnabled is false" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = false + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + + setupHistoryAndMempoolReaders(synchronizer) + + val transaction = TestTransaction(1, 1) + val invMsg = Message(invSpec, Right(InvData(Transaction.ModifierTypeId, Seq(transaction.id))), Some(peer)) + + synchronizer ! roundTrip(invMsg) + viewHolder.expectNoMessage() + networkController.expectNoMessage() + peerProbe.expectNoMessage() + } + + + it should "return transactions if handlingTransactionsEnabled is true" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = true + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + setupHistoryAndMempoolReaders(synchronizer) + + val transaction = TestTransaction(1, 1) + + val mempool = new TestMempool { + override def modifierById(modifierId: ModifierId): Option[TestTransaction] = None + override def getAll(ids: Seq[ModifierId]): Seq[TestTransaction] = Seq(transaction) + } + + synchronizer ! ChangedMempool(mempool) + + val invMsg = Message(requestModifierSpec, Right(InvData(Transaction.ModifierTypeId, Seq(transaction.id ))), Some(peer)) + + synchronizer ! roundTrip(invMsg) + + viewHolder.expectNoMessage() + networkController.expectNoMessage() + peerProbe.receiveOne(0.seconds) match { + case Message(mod: MessageSpec[_], _: Either[_, ModifiersData], _) => + assert(mod.messageCode == ModifiersSpec.MessageCode) + case p => fail(s"Wrong message received by peer connection handler $p") + } + } + + it should "not return transactions if handlingTransactionsEnabled is false" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = false + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + setupHistoryAndMempoolReaders(synchronizer) + + val transaction = TestTransaction(1, 1) + + val mempool = new TestMempool { + override def modifierById(modifierId: ModifierId): Option[TestTransaction] = None + override def getAll(ids: Seq[ModifierId]): Seq[TestTransaction] = Seq(transaction) + } + + synchronizer ! ChangedMempool(mempool) + + val invMsg = Message(requestModifierSpec, Right(InvData(Transaction.ModifierTypeId, Seq(transaction.id))), Some(peer)) + + synchronizer ! roundTrip(invMsg) + + viewHolder.expectNoMessage() + networkController.expectNoMessage() + peerProbe.expectNoMessage() + } + + it should "broadcast transactions when SuccessfulTransaction if handlingTransactionsEnabled is true" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = true + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + + val transaction = TestTransaction(1, 1) + synchronizer ! SuccessfulTransaction(transaction) + + viewHolder.expectNoMessage() + peerProbe.expectNoMessage() + networkController.receiveOne(0.seconds) match { + + case SendToNetwork(msg: Message[_], _: SendingStrategy) => + assert(msg.spec.messageCode == InvSpec.MessageCode) + assert(msg.data == Success(InvData(Transaction.ModifierTypeId, Seq(transaction.id)))) + case p => fail(s"Wrong message received by peer connection handler: $p") + } + + } + + it should "not broadcast transactions when SuccessfulTransaction if handlingTransactionsEnabled is false" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = false + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + + val transaction = TestTransaction(1, 1) + synchronizer ! SuccessfulTransaction(transaction) + + viewHolder.expectNoMessage() + peerProbe.expectNoMessage() + networkController.expectNoMessage() + } + + it should "penalize peer when FailedTransaction if handlingTransactionsEnabled is true" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = true + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + + val transaction = TestTransaction(1, 1) + synchronizer ! FailedTransaction(transaction.id, new Exception(), true) + + viewHolder.expectNoMessage() + peerProbe.expectNoMessage() + networkController.receiveOne(0.seconds) match { + case PenalizePeer(remoteAddress: InetSocketAddress, penalty: PenaltyType) => + assert(remoteAddress == peer.connectionId.remoteAddress) + assert(penalty == PenaltyType.MisbehaviorPenalty) + case p => fail(s"Wrong message received by peer connection handler $p") + } + } + + it should "not penalize peer when FailedTransaction if handlingTransactionsEnabled is false" in { + + val networkSettings = settings.copy( + network = settings.network.copy( + handlingTransactionsEnabled = false + ) + ) + val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(networkSettings) + + val transaction = TestTransaction(1, 1) + synchronizer ! FailedTransaction(transaction.id, new Exception(), true) + + viewHolder.expectNoMessage() + peerProbe.expectNoMessage() + networkController.expectNoMessage() + } + + def setupHistoryAndMempoolReaders(synchronizer: ActorRef): Unit = { + val history = new TestHistory + val mempool = new TestMempool { + override def modifierById(modifierId: ModifierId): Option[TestTransaction] = None + } + + synchronizer ! ChangedHistory(history) + synchronizer ! ChangedMempool(mempool) + + } + def roundTrip(msg: Message[_]): Message[_] = { messageSerializer.deserialize(messageSerializer.serialize(msg), msg.source) match { case Failure(e) => throw e @@ -66,9 +269,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa Map(Transaction.ModifierTypeId -> TestTransactionSerializer) val nodeViewSynchronizerRef = actorSystem.actorOf(Props( - new NodeViewSynchronizer - [ - TestTransaction, + new NodeViewSynchronizer[TestTransaction, TestSyncInfo, TestSyncInfoMessageSpec.type, TestModifier, @@ -85,6 +286,8 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa ) { override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, settings.network, self) { override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested + override private[network] def clearStatusForModifier(id: ModifierId, oldStatus: ModifiersStatus): Unit = {} + override def setInvalid(modifierId: ModifierId): Option[ConnectedPeer] = Some(peer) } } )) diff --git a/src/test/scala/sparkz/core/network/peer/SendingStrategySpec.scala b/src/test/scala/sparkz/core/network/peer/SendingStrategySpec.scala new file mode 100644 index 000000000..f3640b8d4 --- /dev/null +++ b/src/test/scala/sparkz/core/network/peer/SendingStrategySpec.scala @@ -0,0 +1,62 @@ +package sparkz.core.network.peer + +import org.scalatest.BeforeAndAfter +import sparkz.ObjectGenerators +import sparkz.core.network._ + +import java.net.InetSocketAddress + +@SuppressWarnings(Array( + "org.wartremover.warts.Null", + "org.wartremover.warts.OptionPartial" +)) +class SendingStrategySpec extends NetworkTests with ObjectGenerators with BeforeAndAfter { + + private val peerAddress1 = new InetSocketAddress("1.1.1.1", 27017) + private val peerAddress2 = new InetSocketAddress("2.2.2.2", 27017) + private val peerAddress3 = new InetSocketAddress("3.3.3.3", 27017) + private val peerAddress4 = new InetSocketAddress("4.4.4.4", 27017) + + + "BroadcastTransaction" should "return empty list if input list is empty" in { + BroadcastTransaction.choose(Seq.empty) shouldBe Seq.empty + } + + it should "filter out peers with TransactionsDisabledPeerFeature" in { + + val peer1: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress1, peerAddress2, Outgoing),null, 1L, None) + val peer2: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress3, peerAddress1, Incoming),null, 1L, Some(getPeerInfo(peerAddress1,None,Seq.empty ))) + val peer3: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress2, peerAddress1, Incoming),null, 1L, Some(getPeerInfo(peerAddress1,None, + List[PeerFeature](LocalAddressPeerFeature(peerAddress1),TransactionsDisabledPeerFeature())))) + val peer4: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress1, peerAddress4, Outgoing), null, 1L, Some(getPeerInfo(peerAddress1, None, + List(LocalAddressPeerFeature(peerAddress1))))) + + val listOfPeers = List(peer1,peer2,peer3,peer4) + val result = BroadcastTransaction.choose(listOfPeers) + result.length shouldBe 3 + result.contains(peer1) shouldBe true + result.contains(peer2) shouldBe true + result.contains(peer4) shouldBe true + result.contains(peer3) shouldBe false + } + + "Broadcast" should "not filter out peers with TransactionsDisabledPeerFeature" in { + + val peer1: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress1, peerAddress2, Outgoing), null, 1L, None) + val peer2: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress3, peerAddress1, Incoming), null, 1L, + Some(getPeerInfo(peerAddress1, None, Seq.empty))) + val peer3: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress2, peerAddress1, Incoming), null, 1L, + Some(getPeerInfo(peerAddress1, None,List[PeerFeature](LocalAddressPeerFeature(peerAddress1), TransactionsDisabledPeerFeature())))) + val peer4: ConnectedPeer = ConnectedPeer(ConnectionId(peerAddress1, peerAddress4, Outgoing), null, 1L, + Some(getPeerInfo(peerAddress1, None,List(LocalAddressPeerFeature(peerAddress1))))) + + val listOfPeers = List(peer1, peer2, peer3, peer4) + val result = Broadcast.choose(listOfPeers) + result.length shouldBe 4 + result.contains(peer1) shouldBe true + result.contains(peer2) shouldBe true + result.contains(peer3) shouldBe true + result.contains(peer4) shouldBe true + } + +} \ No newline at end of file