Skip to content

Commit

Permalink
Merge pull request #66 from HorizenOfficial/dev
Browse files Browse the repository at this point in the history
2.0.2 to main
paolocappelletti authored Jul 26, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents cf632ab + 7dc0542 commit 12a7f51
Showing 16 changed files with 663 additions and 41 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
version := "2.0.1",
version := "2.0.2",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
9 changes: 8 additions & 1 deletion examples/src/test/resources/settings.conf
Original file line number Diff line number Diff line change
@@ -165,6 +165,13 @@ sparkz {
# Max penalty score peer can accumulate before being banned
penaltyScoreThreshold = 100

#########################
# Other settings #
#########################

# Enables transactions in the mempool
handlingTransactionsEnabled = true

}

ntp {
@@ -192,4 +199,4 @@ sparkz {
password = "cookies"
walletDir = "/tmp/scorex-test/data/wallet"
}
}
}
5 changes: 5 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
2.0.2
---------
* P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled
* Seeder nodes support

2.0.1
---------
* P2p rate limitng feature
20 changes: 19 additions & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -181,6 +181,18 @@ sparkz {
# The impact is single measurement has on a an average processing value.
slowModeMeasurementImpact = 0.1

# Enabled or disables rebroadcasting of modifiers after the slow mode is over.
rebroadcastEnabled = false

# Delay between rebroadcasting batches of modifiers.
rebroadcastDelay = 30s

# Maximum number of modifiers to keep in the rebroadcast queue.
rebroadcastQueueSize = 1024

# number of modifiers to re-broadcast every rebroadcastDelay
rebroadcastBatchSize = 75

# Desired number of inv objects. Our requests will have this size.
desiredInvObjects = 512

@@ -201,6 +213,12 @@ sparkz {
# Max penalty score peer can accumulate before being banned
penaltyScoreThreshold = 100

#########################
# Other settings #
#########################

# Enables transactions in the mempool
handlingTransactionsEnabled = true
}

ntp {
@@ -213,4 +231,4 @@ sparkz {
# server answer timeout
timeout = 30s
}
}
}
24 changes: 16 additions & 8 deletions src/main/scala/sparkz/core/NodeViewHolder.scala
Original file line number Diff line number Diff line change
@@ -149,18 +149,22 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier]
//todo: this method causes delays in a block processing as it removes transactions from mempool and checks
//todo: validity of remaining transactions in a synchronous way. Do this job async!
protected def updateMemPool(blocksRemoved: Seq[PMOD], blocksApplied: Seq[PMOD], memPool: MP, state: MS): MP = {
val rolledBackTxs = blocksRemoved.flatMap(extractTransactions)
if (sparksSettings.network.handlingTransactionsEnabled){
val rolledBackTxs = blocksRemoved.flatMap(extractTransactions)

val appliedTxs = blocksApplied.flatMap(extractTransactions)
val appliedTxs = blocksApplied.flatMap(extractTransactions)

memPool.putWithoutCheck(rolledBackTxs).filter { tx =>
!appliedTxs.exists(t => t.id == tx.id) && {
state match {
case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess
case _ => true
memPool.putWithoutCheck(rolledBackTxs).filter { tx =>
!appliedTxs.exists(t => t.id == tx.id) && {
state match {
case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess
case _ => true
}
}
}
}
else
memPool
}

private def trimChainSuffix(suffix: IndexedSeq[PMOD], rollbackPoint: sparkz.util.ModifierId): IndexedSeq[PMOD] = {
@@ -344,7 +348,11 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier]

protected def transactionsProcessing: Receive = {
case newTxs: NewTransactions[TX @unchecked] =>
newTxs.txs.foreach(txModify)
if (sparksSettings.network.handlingTransactionsEnabled)
newTxs.txs.foreach(txModify)
else
newTxs.txs.foreach(tx => context.system.eventStream.publish(
FailedTransaction(tx.id, new Exception("Transactions handling disabled"), immediateFailure = false)))
case EliminateTransactions(ids) =>
val updatedPool = memoryPool().filter(tx => !ids.contains(tx.id))
updateNodeView(updatedMempool = Some(updatedPool))
50 changes: 42 additions & 8 deletions src/main/scala/sparkz/core/network/DeliveryTracker.scala
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package sparkz.core.network
import akka.actor.{ActorRef, ActorSystem, Cancellable}
import sparkz.core.consensus.ContainsModifiers
import sparkz.core.network.ModifiersStatus._
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.CheckDelivery
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{CheckDelivery, TransactionRebroadcast}
import sparkz.core.settings.NetworkSettings
import sparkz.util.SparkzEncoding
import sparkz.core.{ModifierTypeId, NodeViewModifier}
@@ -38,7 +38,7 @@ import scala.util.{Failure, Try}
*/
class DeliveryTracker(system: ActorSystem,
networkSettings: NetworkSettings,
nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding {
nvsRef: ActorRef)(implicit ec: ExecutionContext) extends SparkzLogging with SparkzEncoding {

protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
@@ -47,6 +47,10 @@ class DeliveryTracker(system: ActorSystem,
protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs
protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested
protected val slowModeMeasurementImpact: Double = networkSettings.slowModeMeasurementImpact
protected val rebroadcastEnabled: Boolean = networkSettings.rebroadcastEnabled
protected val rebroadcastDelay: FiniteDuration = networkSettings.rebroadcastDelay
protected val rebroadcastQueueSize: Int = networkSettings.rebroadcastQueueSize
protected val rebroadcastBatchSize: Int = networkSettings.rebroadcastBatchSize

protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int)

@@ -62,6 +66,11 @@ class DeliveryTracker(system: ActorSystem,
// when our node received a modifier we put it to `received`
protected val received: mutable.Map[ModifierId, (ConnectedPeer, Long)] = mutable.Map()

// when the modifier is created but not broadcasted, we put it to `rebroadcastQueue`
protected val rebroadcastQueue: mutable.Queue[ModifierId] = mutable.Queue()

private var transactionRebroadcastEvent: Cancellable = Cancellable.alreadyCancelled

private var averageProcessingTimeMs: Long = 0
var slowMode: Boolean = false

@@ -92,8 +101,7 @@ class DeliveryTracker(system: ActorSystem,
*
* @return `true` if number of checks was not exceed, `false` otherwise
*/
def onStillWaiting(cp: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierId: ModifierId)
(implicit ec: ExecutionContext): Try[Unit] =
def onStillWaiting(cp: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierId: ModifierId): Try[Unit] =
tryWithLogging {
val checks = requested(modifierId).checks + 1
setUnknown(modifierId)
@@ -104,8 +112,7 @@ class DeliveryTracker(system: ActorSystem,
/**
* Set status of modifier with id `id` to `Requested`
*/
private def setRequested(id: ModifierId, typeId: ModifierTypeId, supplier: ConnectedPeer, checksDone: Int = 0)
(implicit ec: ExecutionContext): Unit =
private def setRequested(id: ModifierId, typeId: ModifierTypeId, supplier: ConnectedPeer, checksDone: Int = 0): Unit =
tryWithLogging {
require(isCorrectTransition(status(id), Requested), s"Illegal status transition: ${status(id)} -> Requested")
val cancellable = system.scheduler.scheduleOnce(deliveryTimeout, nvsRef, CheckDelivery(supplier, typeId, id))
@@ -116,8 +123,8 @@ class DeliveryTracker(system: ActorSystem,
}
}

def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer)
(implicit ec: ExecutionContext): Unit = ids.foreach(setRequested(_, typeId, cp))
def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer): Unit =
ids.foreach(setRequested(_, typeId, cp))

/**
* Modified with id `id` is permanently invalid - set its status to `Invalid`
@@ -219,6 +226,32 @@ class DeliveryTracker(system: ActorSystem,
!(slowModeFeatureFlag && slowMode && requested.size > slowModeMaxRequested)
}

def putInRebroadcastQueue(modifierId: ModifierId): Unit = {
if (rebroadcastEnabled) {
rebroadcastQueue.enqueue(modifierId)
while (rebroadcastQueue.size > rebroadcastQueueSize) {
rebroadcastQueue.dequeue()
}
}
}

def getRebroadcastModifiers: Seq[ModifierId] = {
rebroadcastQueue.take(rebroadcastBatchSize).toSeq
.map(_ => rebroadcastQueue.dequeue())
}

/**
* Schedule rebroadcast of transactions if rebroadcast is enabled and there are transactions to rebroadcast.
* If there is already scheduled rebroadcast, cancel it and schedule new one, it means that during the delay node
* entered slow mode again.
*/
def scheduleRebroadcastIfNeeded(): Unit = {
if (rebroadcastEnabled && rebroadcastQueue.nonEmpty) {
transactionRebroadcastEvent.cancel()
transactionRebroadcastEvent = system.scheduler.scheduleOnce(rebroadcastDelay, nvsRef, TransactionRebroadcast)
}
}

private def incrementPeerLimitCounter(peer: ConnectedPeer): Unit = {
peerLimits.get(peer) match {
case Some(value) => peerLimits.put(peer, value + 1)
@@ -290,6 +323,7 @@ class DeliveryTracker(system: ActorSystem,
} else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) {
slowMode = false
logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.")
scheduleRebroadcastIfNeeded()
}
}
}
7 changes: 6 additions & 1 deletion src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
@@ -374,7 +374,12 @@ class NetworkController(settings: NetworkSettings,
}
}
val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress)
val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature
val mandatoryFeatures = if (settings.handlingTransactionsEnabled) {
sparkzContext.features :+ mySessionIdFeature
}
else {
sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature()
}
val peerFeatures = if (isLocal) {
val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort)
val localAddrFeature = LocalAddressPeerFeature(la)
Loading

0 comments on commit 12a7f51

Please sign in to comment.