Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.0.2 to main #66

Merged
merged 18 commits into from
Jul 26, 2023
Merged
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion examples/src/test/resources/settings.conf
Original file line number Diff line number Diff line change
@@ -165,6 +165,13 @@ sparkz {
# Max penalty score peer can accumulate before being banned
penaltyScoreThreshold = 100

#########################
# Other settings #
#########################

# Enables transactions in the mempool
handlingTransactionsEnabled = true

}

ntp {
@@ -192,4 +199,4 @@ sparkz {
password = "cookies"
walletDir = "/tmp/scorex-test/data/wallet"
}
}
}
8 changes: 7 additions & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -201,6 +201,12 @@ sparkz {
# Max penalty score peer can accumulate before being banned
penaltyScoreThreshold = 100

#########################
# Other settings #
#########################

# Enables transactions in the mempool
handlingTransactionsEnabled = true
}

ntp {
@@ -213,4 +219,4 @@ sparkz {
# server answer timeout
timeout = 30s
}
}
}
24 changes: 16 additions & 8 deletions src/main/scala/sparkz/core/NodeViewHolder.scala
Original file line number Diff line number Diff line change
@@ -149,18 +149,22 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier]
//todo: this method causes delays in a block processing as it removes transactions from mempool and checks
//todo: validity of remaining transactions in a synchronous way. Do this job async!
protected def updateMemPool(blocksRemoved: Seq[PMOD], blocksApplied: Seq[PMOD], memPool: MP, state: MS): MP = {
val rolledBackTxs = blocksRemoved.flatMap(extractTransactions)
if (sparksSettings.network.handlingTransactionsEnabled){
val rolledBackTxs = blocksRemoved.flatMap(extractTransactions)

val appliedTxs = blocksApplied.flatMap(extractTransactions)
val appliedTxs = blocksApplied.flatMap(extractTransactions)

memPool.putWithoutCheck(rolledBackTxs).filter { tx =>
!appliedTxs.exists(t => t.id == tx.id) && {
state match {
case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess
case _ => true
memPool.putWithoutCheck(rolledBackTxs).filter { tx =>
!appliedTxs.exists(t => t.id == tx.id) && {
state match {
case v: TransactionValidation[TX @unchecked] => v.validate(tx).isSuccess
case _ => true
}
}
}
}
else
memPool
}

private def trimChainSuffix(suffix: IndexedSeq[PMOD], rollbackPoint: sparkz.util.ModifierId): IndexedSeq[PMOD] = {
@@ -344,7 +348,11 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier]

protected def transactionsProcessing: Receive = {
case newTxs: NewTransactions[TX @unchecked] =>
newTxs.txs.foreach(txModify)
if (sparksSettings.network.handlingTransactionsEnabled)
newTxs.txs.foreach(txModify)
else
newTxs.txs.foreach(tx => context.system.eventStream.publish(
FailedTransaction(tx.id, new Exception("Transactions handling disabled"), immediateFailure = false)))
case EliminateTransactions(ids) =>
val updatedPool = memoryPool().filter(tx => !ids.contains(tx.id))
updateNodeView(updatedMempool = Some(updatedPool))
7 changes: 6 additions & 1 deletion src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
@@ -374,7 +374,12 @@ class NetworkController(settings: NetworkSettings,
}
}
val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress)
val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature
val mandatoryFeatures = if (settings.handlingTransactionsEnabled) {
sparkzContext.features :+ mySessionIdFeature
}
else {
sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature()
}
val peerFeatures = if (isLocal) {
val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort)
val localAddrFeature = LocalAddressPeerFeature(la)
28 changes: 19 additions & 9 deletions src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -99,19 +99,26 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load
case _ =>
val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None)
networkControllerRef ! SendToNetwork(msg, Broadcast)
if (m.modifierTypeId == Transaction.ModifierTypeId)
networkControllerRef ! SendToNetwork(msg, BroadcastTransaction)
else
networkControllerRef ! SendToNetwork(msg, Broadcast)
}
}

protected def viewHolderEvents: Receive = {
case SuccessfulTransaction(tx) =>
deliveryTracker.setHeld(tx.id)
broadcastModifierInv(tx)
if (networkSettings.handlingTransactionsEnabled) {
deliveryTracker.setHeld(tx.id)
broadcastModifierInv(tx)
}

case FailedTransaction(id, _, immediateFailure) =>
val senderOpt = deliveryTracker.setInvalid(id)
// penalize sender only in case transaction was invalidated at first validation.
if (immediateFailure) senderOpt.foreach(penalizeMisbehavingPeer)
if (networkSettings.handlingTransactionsEnabled) {
val senderOpt = deliveryTracker.setInvalid(id)
// penalize sender only in case transaction was invalidated at first validation.
if (immediateFailure) senderOpt.foreach(penalizeMisbehavingPeer)
}

case SyntacticallySuccessfulModifier(mod) =>
deliveryTracker.setHeld(mod.id)
@@ -225,10 +232,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
val modifierTypeId = invData.typeId
val newModifierIds = (modifierTypeId match {
case Transaction.ModifierTypeId =>
if (deliveryTracker.canRequestMoreTransactions)
if (deliveryTracker.canRequestMoreTransactions && networkSettings.handlingTransactionsEnabled)
invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown)
else
Seq() // do not request transactions due to the high load
Seq.empty
case _ =>
invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown)
})
@@ -250,7 +257,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
readersOpt.foreach { readers =>
val objs: Seq[NodeViewModifier] = invData.typeId match {
case typeId: ModifierTypeId if typeId == Transaction.ModifierTypeId =>
readers._2.getAll(invData.ids)
if (networkSettings.handlingTransactionsEnabled)
readers._2.getAll(invData.ids)
else
Seq.empty
case _: ModifierTypeId =>
invData.ids.flatMap(id => readers._1.modifierById(id))
}
10 changes: 10 additions & 0 deletions src/main/scala/sparkz/core/network/SendingStrategy.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sparkz.core.network

import sparkz.core.network.peer.TransactionsDisabledPeerFeature

import java.security.SecureRandom

trait SendingStrategy {
@@ -21,6 +23,14 @@ case object Broadcast extends SendingStrategy {
override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers
}


case object BroadcastTransaction extends SendingStrategy {
override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = {
peers.filter(p => p.peerInfo.flatMap{
info => info.peerSpec.features.collectFirst {case f:TransactionsDisabledPeerFeature => true}}.isEmpty)
}
}

case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy {
override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] =
peers.filterNot(exceptOf.contains)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package sparkz.core.network.peer

import sparkz.core.network.PeerFeature
import sparkz.core.network.PeerFeature.Id
import sparkz.core.network.message.Message
import sparkz.util.serialization._
import sparkz.core.serialization.SparkzSerializer

/**
* This peer feature allows to detect peers who don't support transactions
*/
case class TransactionsDisabledPeerFeature() extends PeerFeature {

override type M = TransactionsDisabledPeerFeature
override val featureId: Id = TransactionsDisabledPeerFeature.featureId

override def serializer: TransactionsDisabledPeerFeatureSerializer.type = TransactionsDisabledPeerFeatureSerializer

}

object TransactionsDisabledPeerFeature {

val featureId: Id = 4: Byte

}

object TransactionsDisabledPeerFeatureSerializer extends SparkzSerializer[TransactionsDisabledPeerFeature] {


override def parse(r: Reader): TransactionsDisabledPeerFeature = {
TransactionsDisabledPeerFeature()
}

override def serialize(obj: TransactionsDisabledPeerFeature, w: Writer): Unit = {}
}
3 changes: 2 additions & 1 deletion src/main/scala/sparkz/core/settings/Settings.scala
Original file line number Diff line number Diff line change
@@ -57,7 +57,8 @@ case class NetworkSettings(nodeName: String,
storageBackupDelay: FiniteDuration,
temporalBanDuration: FiniteDuration,
penaltySafeInterval: FiniteDuration,
penaltyScoreThreshold: Int)
penaltyScoreThreshold: Int,
handlingTransactionsEnabled: Boolean)

case class SparkzSettings(dataDir: File,
logDir: File,
7 changes: 7 additions & 0 deletions src/test/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -187,6 +187,13 @@ sparkz {
# Max penalty score peer can accumulate before being banned
penaltyScoreThreshold = 100

############################
# Synchronization settings #
############################

# Enable p2p transactions synchronization
syncTransactionsEnabled = true

}

ntp {
48 changes: 46 additions & 2 deletions src/test/scala/sparkz/core/network/NetworkControllerSpec.scala
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedP
import sparkz.core.network.message._
import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding}
import sparkz.core.network.peer._
import sparkz.core.serialization.SparkzSerializer
import sparkz.core.settings.SparkzSettings
import sparkz.core.utils.LocalTimeProvider

@@ -31,7 +32,8 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures {

import scala.concurrent.ExecutionContext.Implicits.global

private val featureSerializers = Map(LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer)
private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer,
TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer)

"A NetworkController" should "send local address on handshake when peer and node address are in localhost" in {
implicit val system: ActorSystem = ActorSystem()
@@ -110,6 +112,47 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures {
system.terminate()
}

it should "not send TransactionsDisabledPeerFeature on handshake when handlingTransactionsEnabled is true" in {
implicit val system: ActorSystem = ActorSystem()

val tcpManagerProbe = TestProbe()
val (networkControllerRef: ActorRef, _) = createNetworkController(settings, tcpManagerProbe)
val testPeer = new TestPeer(settings, networkControllerRef, tcpManagerProbe)

val peerAddr = new InetSocketAddress("127.0.0.1", 5678)
val nodeAddr = new InetSocketAddress("127.0.0.1", settings.network.bindAddress.getPort)
testPeer.connectAndExpectSuccessfulMessages(peerAddr, nodeAddr, Tcp.ResumeReading)

val handshakeFromNode = testPeer.receiveHandshake
handshakeFromNode.peerSpec.features.exists{case f:TransactionsDisabledPeerFeature => true; case _ => false} shouldBe false

system.terminate()
}

it should "send TransactionsDisabledPeerFeature on handshake when handlingTransactionsEnabled is false" in {
implicit val system: ActorSystem = ActorSystem()

val tcpManagerProbe = TestProbe()
val networkSettings = settings.copy(
network = settings.network.copy(
handlingTransactionsEnabled = false
)
)

val (networkControllerRef: ActorRef, _) = createNetworkController(networkSettings, tcpManagerProbe)
val testPeer = new TestPeer(networkSettings, networkControllerRef, tcpManagerProbe)

val peerAddr = new InetSocketAddress("127.0.0.1", 5678)
val nodeAddr = new InetSocketAddress("127.0.0.1", networkSettings.network.bindAddress.getPort)
testPeer.connectAndExpectSuccessfulMessages(peerAddr, nodeAddr, Tcp.ResumeReading)

val handshakeFromNode = testPeer.receiveHandshake
handshakeFromNode.peerSpec.features.exists { case f: TransactionsDisabledPeerFeature => true; case _ => false } shouldBe true

system.terminate()
}


it should "send known public peers" in {
implicit val system: ActorSystem = ActorSystem()

@@ -582,7 +625,8 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana
(implicit ec: ExecutionContext) extends Matchers {

private val timeProvider = LocalTimeProvider
private val featureSerializers = Map(LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer)
private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer,
TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer)
private val handshakeSerializer = new HandshakeSpec(featureSerializers, Int.MaxValue)
private val peersSpec = new PeersSpec(featureSerializers, settings.network.maxPeerSpecObjects)
private val messageSpecs = Seq(GetPeersSpec, peersSpec)
Loading