From fa3dda37ccc103352cbfb65787871bc654c750a0 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Fri, 7 Apr 2023 11:01:44 +0300 Subject: [PATCH 01/30] fix for inconsistent node status after node banning --- .../core/network/NetworkController.scala | 16 +++++++----- .../core/network/NetworkControllerSpec.scala | 25 ++++++++++++++++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 52db1c467..3c17495f4 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -534,13 +534,17 @@ class NetworkController(settings: NetworkSettings, } } - private def closeConnection(peerAddress: InetSocketAddress): Unit = - connections.get(peerAddress).foreach { peer => - connections = connections.filterNot { case (address, _) => // clear all connections related to banned peer ip - Option(peer.connectionId.remoteAddress.getAddress).exists(Option(address.getAddress).contains(_)) - } - peer.handlerRef ! CloseConnection + private def closeConnection(peerAddress: InetSocketAddress): Unit = { + connections = connections.filter { case (_, connectedPeer) => + Option(connectedPeer) + .filter(_.connectionId.remoteAddress.equals(peerAddress)) + .map { peer => + peer.handlerRef ! CloseConnection + context.system.eventStream.publish(DisconnectedPeer(peerAddress)) + } + .isEmpty } + } /** * Register a new penalty for given peer address. diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index d69563adf..8bc366bb9 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -15,8 +15,9 @@ import org.scalatest.matchers.should.Matchers import sparkz.core.app.{SparkzContext, Version} import sparkz.core.network.NetworkController.ReceivableMessages.Internal.ConnectionToPeer import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus} +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedPeer import sparkz.core.network.message._ -import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, GetAllPeers, RandomPeerForConnectionExcluding} +import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding} import sparkz.core.network.peer._ import sparkz.core.settings.SparkzSettings import sparkz.core.utils.LocalTimeProvider @@ -535,6 +536,28 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } + it should "on disconnect update SyncTracker" in { + implicit val system: ActorSystem = ActorSystem() + val testProbe = TestProbe() + val bindAddress = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = bindAddress)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, testProbe) + val testPeer1 = new TestPeer(settings2, networkControllerRef, testProbe) + + val peerAddress1 = new InetSocketAddress("88.77.66.55", 5678) + + testPeer1.connectAndExpectSuccessfulMessages(peerAddress1, bindAddress, Tcp.ResumeReading) + val handshakeFromNode1 = testPeer1.receiveHandshake + handshakeFromNode1.peerSpec.declaredAddress.value should be(bindAddress) + testPeer1.sendHandshake(None, None) + + networkControllerRef ! DisconnectFromAddress(peerAddress1) + system.eventStream.subscribe(testProbe.testActor, classOf[DisconnectedPeer]) + testProbe.expectMsg(DisconnectedPeer(peerAddress1)) + + system.terminate() + } + private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = { handshakeFromNode.peerSpec.localAddressOpt } From c7e88eb41b6c27a523a0635c89cdf8367a0a91c9 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 11:05:41 +0300 Subject: [PATCH 02/30] ci scripts fix --- ci/publish.sh | 4 ++-- ci/setup_env.sh | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ci/publish.sh b/ci/publish.sh index d7cc9af4f..c91bde056 100755 --- a/ci/publish.sh +++ b/ci/publish.sh @@ -3,10 +3,10 @@ set -eo pipefail retval=0 -if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}[0-9]*$ ]]; then +if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then echo "" && echo "=== Publishing development release on Sonatype Nexus repository. Timestamp is: $(date '+%a %b %d %H:%M:%S %Z %Y') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publish -elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then +elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then echo "" && echo "=== Publishing production release on Maven repository. Timestamp is: $(date '+%Y-%m-%d %H:%M') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publishSigned sonatypeBundleRelease else diff --git a/ci/setup_env.sh b/ci/setup_env.sh index d6eae7017..65a8c0553 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -19,12 +19,13 @@ function import_gpg_keys() { # shellcheck disable=SC2207 declare -r my_arr=( $(echo "${@}" | tr " " "\n") ) - for key in "${my_arr[@]}"; do +for key in "${my_arr[@]}"; do echo "Importing key: ${key}" gpg -v --batch --keyserver hkps://keys.openpgp.org --recv-keys "${key}" || gpg -v --batch --keyserver hkp://keyserver.ubuntu.com --recv-keys "${key}" || gpg -v --batch --keyserver hkp://pgp.mit.edu:80 --recv-keys "${key}" || - gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" + gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" || + echo -e "${key} can not be found on GPG key servers. Please upload it to at least one of the following GPG key servers:\nhttps://keys.openpgp.org/\nhttps://keyserver.ubuntu.com/\nhttps://pgp.mit.edu/" done } @@ -80,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then export CONTAINER_PUBLISH="true" else # Checking if package version matches DEV release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then echo "Aborting, package version is in the wrong format for development release." exit 1 fi From 96ed80b525e67137bb130103c2a1aba86511a706 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 11:10:58 +0300 Subject: [PATCH 03/30] ci scripts fix --- ci/setup_env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 65a8c0553..cc63d3d9d 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -61,7 +61,7 @@ if [ -n "${TRAVIS_TAG}" ]; then # Prod vs dev release if ( git branch -r --contains "${TRAVIS_TAG}" | grep -xqE ". origin\/${PROD_RELEASE_BRANCH}$" ); then # Checking if package version matches PROD release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then echo "Aborting, package version is in the wrong format for production release." exit 1 fi From 1a931eaac1201c3df34a5920ef58a755aa29dc01 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 14:45:14 +0300 Subject: [PATCH 04/30] remove sporadically failing on CI test --- .../core/network/NetworkControllerSpec.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index 8bc366bb9..f022897c2 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -536,28 +536,6 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } - it should "on disconnect update SyncTracker" in { - implicit val system: ActorSystem = ActorSystem() - val testProbe = TestProbe() - val bindAddress = new InetSocketAddress("88.77.66.55", 12345) - val settings2 = settings.copy(network = settings.network.copy(bindAddress = bindAddress)) - val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, testProbe) - val testPeer1 = new TestPeer(settings2, networkControllerRef, testProbe) - - val peerAddress1 = new InetSocketAddress("88.77.66.55", 5678) - - testPeer1.connectAndExpectSuccessfulMessages(peerAddress1, bindAddress, Tcp.ResumeReading) - val handshakeFromNode1 = testPeer1.receiveHandshake - handshakeFromNode1.peerSpec.declaredAddress.value should be(bindAddress) - testPeer1.sendHandshake(None, None) - - networkControllerRef ! DisconnectFromAddress(peerAddress1) - system.eventStream.subscribe(testProbe.testActor, classOf[DisconnectedPeer]) - testProbe.expectMsg(DisconnectedPeer(peerAddress1)) - - system.terminate() - } - private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = { handshakeFromNode.peerSpec.localAddressOpt } From 5b14f84959ad04edbbf2eb8e58a4017404221f70 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 15:18:34 +0300 Subject: [PATCH 05/30] remove sporadically failing on CI test --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 68a3ee569..4168eab17 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.0-RC11", + version := "2.0.0-SNAPSHOT1", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := From 27db919be492e0a8480c5e011f0507ed36955528 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Fri, 28 Apr 2023 20:15:57 +0300 Subject: [PATCH 06/30] adds slow mode condition to discard transaction synchronization on P2P layer when node is overloaded --- build.sbt | 2 +- src/main/resources/reference.conf | 6 ++++++ .../sparkz/core/network/DeliveryTracker.scala | 13 +++++++++++++ .../core/network/NodeViewSynchronizer.scala | 17 +++++++++++++---- .../scala/sparkz/core/settings/Settings.scala | 2 ++ .../network/DeliveryTrackerSpecification.scala | 2 +- .../NodeViewSynchronizerSpecification.scala | 2 +- 7 files changed, 37 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index 68a3ee569..4a98a7bc2 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.0-RC11", + version := "2.0.0-RC11-SNAPSHOT", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 765adc2a7..4e346139e 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -167,6 +167,12 @@ sparkz { # Limit for number of modifiers to request and process at once maxRequestedPerPeer = 1024 + # Enables or disables slow mode - when N of concurrent requested modifiers > threshold, do not fetch or broadcast transactions + slowModeFeatureFlag = true + + # Number of concurrent transactions in sync after which the slow mode is engaged + slowModeThreshold = 150 + # Desired number of inv objects. Our requests will have this size. desiredInvObjects = 512 diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index bd05ebbd2..9b73e7725 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -38,6 +38,8 @@ class DeliveryTracker(system: ActorSystem, deliveryTimeout: FiniteDuration, maxDeliveryChecks: Int, maxRequestedPerPeer: Int, + slowModeFeatureFlag: Boolean, + slowModeThreshold: Int, nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding { protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int) @@ -54,6 +56,8 @@ class DeliveryTracker(system: ActorSystem, // when our node received a modifier we put it to `received` protected val received: mutable.Map[ModifierId, ConnectedPeer] = mutable.Map() + var slowMode: Boolean = false + /** * @return status of modifier `id`. * Since this class do not keep statuses for modifiers that are already in NodeViewHolder, @@ -103,6 +107,15 @@ class DeliveryTracker(system: ActorSystem, case Some(RequestedInfo(peer,_,_)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier) case None => incrementPeerLimitCounter(supplier) } + if (slowModeFeatureFlag) { + if (requested.size > slowModeThreshold) { + slowMode = true + logger.warn("SLOW MODE ENGAGED. TRANSACTION WILL BE REJECTED ON P2P LAYER!") + } else { + slowMode = false + logger.warn("SLOW MODE DISABLED. TRANSACTION WILL BE SYNCED ON P2P LAYER!") + } + } } def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index e989f31af..416a7fd29 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -57,6 +57,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer + protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag + protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeThreshold protected val invSpec = new InvSpec(networkSettings.maxInvObjects) protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects) protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize) @@ -68,7 +70,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote) } - protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, self) + protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, slowModeFeatureFlag, slowModeThreshold, self) protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider) protected var historyReaderOpt: Option[HR] = None @@ -98,8 +100,12 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes private def readersOpt: Option[(HR, MR)] = historyReaderOpt.flatMap(h => mempoolReaderOpt.map(mp => (h, mp))) protected def broadcastModifierInv[M <: NodeViewModifier](m: M): Unit = { - val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) - networkControllerRef ! SendToNetwork(msg, Broadcast) + m.modifierTypeId match { + case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load + case _ => + val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) + networkControllerRef ! SendToNetwork(msg, Broadcast) + } } protected def viewHolderEvents: Receive = { @@ -224,7 +230,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val modifierTypeId = invData.typeId val newModifierIds = (modifierTypeId match { case Transaction.ModifierTypeId => - invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown) + if (deliveryTracker.slowMode) + Seq() // do not request transactions due to the high load + else + invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown) case _ => invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown) }) diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index e5a612a78..1c35ac10d 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -31,6 +31,8 @@ case class NetworkSettings(nodeName: String, maxDeliveryChecks: Int, penalizeNonDelivery: Boolean, maxRequestedPerPeer: Int, + slowModeFeatureFlag: Boolean, + slowModeThreshold: Int, appVersion: String, agentName: String, maxModifiersSpecMessageSize: Int, diff --git a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala index 6ebcd53c3..0ca113642 100644 --- a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala +++ b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala @@ -159,7 +159,7 @@ class DeliveryTrackerSpecification extends AnyPropSpec val probe = TestProbe("p")(system) implicit val nvsStub: ActorRef = probe.testActor val dt = FiniteDuration(3, MINUTES) - new DeliveryTracker(system, deliveryTimeout = dt, maxDeliveryChecks = 2, maxRequestedPerPeer = 3, nvsStub) + new DeliveryTracker(system, deliveryTimeout = dt, maxDeliveryChecks = 2, maxRequestedPerPeer = 3, false, 0, nvsStub) } } diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 19bc251e1..ff005efe4 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -83,7 +83,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa timeProvider, modifierSerializers ) { - override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, self) { + override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, false, 0, self) { override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested } } From e19b09cb7e411f5c3c9868b17143cefbbb97c853 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 2 May 2023 14:52:05 +0300 Subject: [PATCH 07/30] fix compilation issues --- src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 416a7fd29..e74200aa7 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -58,7 +58,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag - protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeThreshold + protected val slowModeThreshold: Int = networkSettings.slowModeThreshold protected val invSpec = new InvSpec(networkSettings.maxInvObjects) protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects) protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize) From 990a75bc83cff0c6a2373a86c22d546e6a18ad36 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 2 May 2023 23:58:58 +0300 Subject: [PATCH 08/30] signed commit --- src/main/resources/reference.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 4e346139e..563aadfa2 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -170,7 +170,7 @@ sparkz { # Enables or disables slow mode - when N of concurrent requested modifiers > threshold, do not fetch or broadcast transactions slowModeFeatureFlag = true - # Number of concurrent transactions in sync after which the slow mode is engaged + # Number of concurrent transactions in sync after which the slow mode is enabled slowModeThreshold = 150 # Desired number of inv objects. Our requests will have this size. From e185dea2d98b7d79e97961d59cbb8f150d7dfdac Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 3 May 2023 00:09:38 +0300 Subject: [PATCH 09/30] ci GPG key server error --- ci/setup_env.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index d6eae7017..6e60f96d2 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -24,7 +24,8 @@ function import_gpg_keys() { gpg -v --batch --keyserver hkps://keys.openpgp.org --recv-keys "${key}" || gpg -v --batch --keyserver hkp://keyserver.ubuntu.com --recv-keys "${key}" || gpg -v --batch --keyserver hkp://pgp.mit.edu:80 --recv-keys "${key}" || - gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" + gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" || + echo -e "${key} can not be found on GPG key servers. Please upload it to at least one of the following GPG key servers:\nhttps://keys.openpgp.org/\nhttps://keyserver.ubuntu.com/\nhttps://pgp.mit.edu/" done } From cf9149a3a799de291daa9333eb79605a53367f54 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 4 May 2023 11:41:26 +0300 Subject: [PATCH 10/30] RC12-SNAPSHOT version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 4a98a7bc2..eaef3d5d6 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.0-RC11-SNAPSHOT", + version := "2.0.0-RC12-SNAPSHOT", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := From bb127e6bb641a5a65ccd437cb59f528f97a24527 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Mon, 15 May 2023 10:55:19 +0300 Subject: [PATCH 11/30] RC12-SNAPSHOT2 version + refactoring P2P rate limiting to be based on the node performance --- build.sbt | 2 +- src/main/resources/reference.conf | 8 +-- .../sparkz/core/network/DeliveryTracker.scala | 44 ++++++++++------ .../core/network/NodeViewSynchronizer.scala | 5 +- .../scala/sparkz/core/settings/Settings.scala | 2 +- .../DeliveryTrackerSpecification.scala | 50 +++++++++++++++++++ 6 files changed, 87 insertions(+), 24 deletions(-) diff --git a/build.sbt b/build.sbt index eaef3d5d6..24ee3c152 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.0-RC12-SNAPSHOT", + version := "2.0.0-RC12-SNAPSHOT2", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 563aadfa2..28756d3fe 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -167,11 +167,11 @@ sparkz { # Limit for number of modifiers to request and process at once maxRequestedPerPeer = 1024 - # Enables or disables slow mode - when N of concurrent requested modifiers > threshold, do not fetch or broadcast transactions - slowModeFeatureFlag = true + # Enables or disables slow mode - ignoring requesting or broadcasting new transaction when node is overloaded + slowModeFeatureFlag = false - # Number of concurrent transactions in sync after which the slow mode is enabled - slowModeThreshold = 150 + # Threshold of average time it takes node to apply a new modifier, after which we consider it overloaded and start throttling + slowModeThresholdMs = 2000 # Desired number of inv objects. Our requests will have this size. desiredInvObjects = 512 diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index 9b73e7725..3b96985ba 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -8,6 +8,7 @@ import sparkz.util.SparkzEncoding import sparkz.core.{ModifierTypeId, NodeViewModifier} import sparkz.util.{ModifierId, SparkzLogging} +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -39,7 +40,7 @@ class DeliveryTracker(system: ActorSystem, maxDeliveryChecks: Int, maxRequestedPerPeer: Int, slowModeFeatureFlag: Boolean, - slowModeThreshold: Int, + slowModeThresholdMs: Long, nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding { protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int) @@ -54,8 +55,9 @@ class DeliveryTracker(system: ActorSystem, protected val invalid: mutable.HashSet[ModifierId] = mutable.HashSet() // when our node received a modifier we put it to `received` - protected val received: mutable.Map[ModifierId, ConnectedPeer] = mutable.Map() + protected val received: mutable.Map[ModifierId, (ConnectedPeer, Long)] = mutable.Map() + private var averageProcessingTimeMs: Long = 0 var slowMode: Boolean = false /** @@ -103,19 +105,10 @@ class DeliveryTracker(system: ActorSystem, require(isCorrectTransition(status(id), Requested), s"Illegal status transition: ${status(id)} -> Requested") val cancellable = system.scheduler.scheduleOnce(deliveryTimeout, nvsRef, CheckDelivery(supplier, typeId, id)) requested.put(id, RequestedInfo(supplier, cancellable, checksDone)) match { - case Some(RequestedInfo(peer,_,_)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted - case Some(RequestedInfo(peer,_,_)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier) + case Some(RequestedInfo(peer, _, _)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted + case Some(RequestedInfo(peer, _, _)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier) case None => incrementPeerLimitCounter(supplier) } - if (slowModeFeatureFlag) { - if (requested.size > slowModeThreshold) { - slowMode = true - logger.warn("SLOW MODE ENGAGED. TRANSACTION WILL BE REJECTED ON P2P LAYER!") - } else { - slowMode = false - logger.warn("SLOW MODE DISABLED. TRANSACTION WILL BE SYNCED ON P2P LAYER!") - } - } } def setRequested(ids: Seq[ModifierId], typeId: ModifierTypeId, cp: ConnectedPeer) @@ -140,6 +133,10 @@ class DeliveryTracker(system: ActorSystem, .map(decrementPeerLimitCounter) case Received => received.remove(modifierId) + .collect { case (peer, timestamp) => + updateProcessingTime(timestamp) + peer + } case _ => None } @@ -179,12 +176,12 @@ class DeliveryTracker(system: ActorSystem, def setReceived(id: ModifierId, sender: ConnectedPeer): Unit = tryWithLogging { val oldStatus: ModifiersStatus = status(id) - require(isCorrectTransition(oldStatus, Invalid), s"Illegal status transition: $oldStatus -> Received") + require(isCorrectTransition(oldStatus, Received), s"Illegal status transition: $oldStatus -> Received") if (oldStatus != Received) { requested(id).cancellable.cancel() requested.remove(id) decrementPeerLimitCounter(sender) - received.put(id, sender) + received.put(id, (sender, System.nanoTime())) } } @@ -194,7 +191,7 @@ class DeliveryTracker(system: ActorSystem, case Requested => requested.get(id).map(_.peer) case Received => - received.get(id) + received.get(id).map(_._1) case _ => None } @@ -247,6 +244,7 @@ class DeliveryTracker(system: ActorSystem, .map(decrementPeerLimitCounter) case Received => received.remove(id) + .foreach(peer_timestamp => updateProcessingTime(peer_timestamp._2)) case _ => () } @@ -263,4 +261,18 @@ class DeliveryTracker(system: ActorSystem, log.warn("Unexpected error", e) Failure(e) } + + private def updateProcessingTime(startTime: Long): Unit = { + if (slowModeFeatureFlag) { + val elapsedMs: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) + averageProcessingTimeMs = (averageProcessingTimeMs * 0.9).toLong + (elapsedMs * 0.1).toLong + if (averageProcessingTimeMs > slowModeThresholdMs && !slowMode) { + slowMode = true + logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested or broadcasted.") + } else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) { + slowMode = false + logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.") + } + } + } } diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index e74200aa7..4af202120 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -58,7 +58,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag - protected val slowModeThreshold: Int = networkSettings.slowModeThreshold + protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs protected val invSpec = new InvSpec(networkSettings.maxInvObjects) protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects) protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize) @@ -70,7 +70,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote) } - protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, slowModeFeatureFlag, slowModeThreshold, self) + protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, slowModeFeatureFlag, slowModeThresholdMs, self) protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider) protected var historyReaderOpt: Option[HR] = None @@ -284,6 +284,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case Some(serializer: SparkzSerializer[TX]@unchecked) if typeId == Transaction.ModifierTypeId => // parse all transactions and send them to node view holder val parsed: Iterable[TX] = parseModifiers(requestedModifiers, serializer, remote) + parsed.foreach(tx => deliveryTracker.setReceived(tx.id, remote)) viewHolderRef ! TransactionsFromRemote(parsed) case Some(serializer: SparkzSerializer[PMOD]@unchecked) => diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 1c35ac10d..7714e5625 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -32,7 +32,7 @@ case class NetworkSettings(nodeName: String, penalizeNonDelivery: Boolean, maxRequestedPerPeer: Int, slowModeFeatureFlag: Boolean, - slowModeThreshold: Int, + slowModeThresholdMs: Long, appVersion: String, agentName: String, maxModifiersSpecMessageSize: Int, diff --git a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala index 0ca113642..919882d89 100644 --- a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala +++ b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala @@ -154,6 +154,56 @@ class DeliveryTrackerSpecification extends AnyPropSpec tracker.getPeerLimit(otherPeer) shouldBe 3 } + property("slow mode should be enabled when average processing time exceeds threshold") { + val system = ActorSystem() + val probe = TestProbe("p")(system) + implicit val nvsStub: ActorRef = probe.testActor + val dt = FiniteDuration(3, MINUTES) + val deliveryTracker = new DeliveryTracker( + system, + deliveryTimeout = dt, + maxDeliveryChecks = 2, + maxRequestedPerPeer = 3, + slowModeFeatureFlag = true, + slowModeThresholdMs = 100, + nvsRef = nvsStub) + deliveryTracker.slowMode shouldBe false + val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + + deliveryTracker.setRequested(modifiers, mtid, cp) + modifiers.foreach(deliveryTracker.setReceived(_, cp)) + deliveryTracker.slowMode shouldBe false + Thread.sleep(200) + deliveryTracker.slowMode shouldBe false + modifiers.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe true + } + + property("slow mode should depend on the feature flag") { + val system = ActorSystem() + val probe = TestProbe("p")(system) + implicit val nvsStub: ActorRef = probe.testActor + val dt = FiniteDuration(3, MINUTES) + val deliveryTracker = new DeliveryTracker( + system, + deliveryTimeout = dt, + maxDeliveryChecks = 2, + maxRequestedPerPeer = 3, + slowModeFeatureFlag = false, + slowModeThresholdMs = 10, + nvsRef = nvsStub) + deliveryTracker.slowMode shouldBe false + val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) + + deliveryTracker.setRequested(modifiers, mtid, cp) + modifiers.foreach(deliveryTracker.setReceived(_, cp)) + deliveryTracker.slowMode shouldBe false + Thread.sleep(200) + deliveryTracker.slowMode shouldBe false + modifiers.foreach(deliveryTracker.setHeld) + deliveryTracker.slowMode shouldBe false + } + private def genDeliveryTracker = { val system = ActorSystem() val probe = TestProbe("p")(system) From 6e51ac4516100726d4daca7657dfc800c245c9a7 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 16 May 2023 11:07:47 +0300 Subject: [PATCH 12/30] review fixes --- src/main/resources/reference.conf | 12 ++++++-- .../sparkz/core/network/DeliveryTracker.scala | 30 +++++++++++++++---- .../core/network/NodeViewSynchronizer.scala | 13 +++----- .../scala/sparkz/core/settings/Settings.scala | 2 ++ 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 28756d3fe..aa7addd94 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -167,12 +167,20 @@ sparkz { # Limit for number of modifiers to request and process at once maxRequestedPerPeer = 1024 - # Enables or disables slow mode - ignoring requesting or broadcasting new transaction when node is overloaded + # Enables or disables slow mode - ignoring requesting or broadcasting new transaction when node is overloaded. slowModeFeatureFlag = false - # Threshold of average time it takes node to apply a new modifier, after which we consider it overloaded and start throttling + # Threshold of average time it takes node to apply a new modifier(block or transaction), + # after which we consider it overloaded and start throttling. slowModeThresholdMs = 2000 + # Maximum number of modifiers that can be requested from remote peers, + # when the node is considered overloaded. Only affects transactions. + slowModeMaxRequested = 150 + + # The impact is single measurement has on a an average processing value. + slowModeMeasurementImpact = 0.1 + # Desired number of inv objects. Our requests will have this size. desiredInvObjects = 512 diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index 3b96985ba..f2ad10628 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -4,6 +4,7 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable} import sparkz.core.consensus.ContainsModifiers import sparkz.core.network.ModifiersStatus._ import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.CheckDelivery +import sparkz.core.settings.NetworkSettings import sparkz.util.SparkzEncoding import sparkz.core.{ModifierTypeId, NodeViewModifier} import sparkz.util.{ModifierId, SparkzLogging} @@ -36,13 +37,17 @@ import scala.util.{Failure, Try} * and its methods should not be called from lambdas, Future, Future.map, etc. */ class DeliveryTracker(system: ActorSystem, - deliveryTimeout: FiniteDuration, - maxDeliveryChecks: Int, - maxRequestedPerPeer: Int, - slowModeFeatureFlag: Boolean, - slowModeThresholdMs: Long, + networkSettings: NetworkSettings, nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding { + protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout + protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks + protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer + protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag + protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs + protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested + protected val slowModeMeasurementImpact: Float = networkSettings.slowModeMeasurementImpact + protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int) // when a remote peer is asked for a modifier we add the requested data to `requested` @@ -201,6 +206,19 @@ class DeliveryTracker(system: ActorSystem, maxRequestedPerPeer - peerLimits.getOrElse(peer, 0) } + /** + * Check if we have capacity to request more transactions from remote peers. + * In order to decide that node cannot request more transactions, all 3 conditions must be satisfied: + * - feature flag is enabled + * - current node is in slow mode - average time it takes to process a modifier is higher than a threshold + * - number of concurrently requested modifiers is bigger than a max allowed value + * + * @return + */ + def canRequestMoreTransactions: Boolean = { + !(slowModeFeatureFlag && slowMode && requested.size > slowModeMaxRequested) + } + private def incrementPeerLimitCounter(peer: ConnectedPeer): Unit = { peerLimits.get(peer) match { case Some(value) => peerLimits.put(peer, value + 1) @@ -265,7 +283,7 @@ class DeliveryTracker(system: ActorSystem, private def updateProcessingTime(startTime: Long): Unit = { if (slowModeFeatureFlag) { val elapsedMs: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - averageProcessingTimeMs = (averageProcessingTimeMs * 0.9).toLong + (elapsedMs * 0.1).toLong + averageProcessingTimeMs = (averageProcessingTimeMs * (1 - slowModeMeasurementImpact)).toLong + (elapsedMs * slowModeMeasurementImpact).toLong if (averageProcessingTimeMs > slowModeThresholdMs && !slowMode) { slowMode = true logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested or broadcasted.") diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 4af202120..36d46ed1b 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -54,11 +54,6 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes modifierSerializers: Map[ModifierTypeId, SparkzSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext) extends Actor with Synchronizer with SparkzLogging with SparkzEncoding { - protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout - protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks - protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer - protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag - protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs protected val invSpec = new InvSpec(networkSettings.maxInvObjects) protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects) protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize) @@ -70,7 +65,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote) } - protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, slowModeFeatureFlag, slowModeThresholdMs, self) + protected val deliveryTracker = new DeliveryTracker(context.system, networkSettings, self) protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider) protected var historyReaderOpt: Option[HR] = None @@ -230,10 +225,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val modifierTypeId = invData.typeId val newModifierIds = (modifierTypeId match { case Transaction.ModifierTypeId => - if (deliveryTracker.slowMode) - Seq() // do not request transactions due to the high load - else + if (deliveryTracker.canRequestMoreTransactions) invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown) + else + Seq() // do not request transactions due to the high load case _ => invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown) }) diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 7714e5625..b4fd8a861 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -33,6 +33,8 @@ case class NetworkSettings(nodeName: String, maxRequestedPerPeer: Int, slowModeFeatureFlag: Boolean, slowModeThresholdMs: Long, + slowModeMaxRequested: Int, + slowModeMeasurementImpact: Float, appVersion: String, agentName: String, maxModifiersSpecMessageSize: Int, From bdcb71600eab3ce7a87cbb4b817a2d9e20ef44aa Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 16 May 2023 11:35:11 +0300 Subject: [PATCH 13/30] test fixes --- .../sparkz/core/network/DeliveryTracker.scala | 2 +- .../scala/sparkz/core/settings/Settings.scala | 2 +- .../DeliveryTrackerSpecification.scala | 43 +++++++++++++------ .../NodeViewSynchronizerSpecification.scala | 2 +- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index f2ad10628..d3ff6d615 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -46,7 +46,7 @@ class DeliveryTracker(system: ActorSystem, protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested - protected val slowModeMeasurementImpact: Float = networkSettings.slowModeMeasurementImpact + protected val slowModeMeasurementImpact: Double = networkSettings.slowModeMeasurementImpact protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int) diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index b4fd8a861..0645da1c5 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -34,7 +34,7 @@ case class NetworkSettings(nodeName: String, slowModeFeatureFlag: Boolean, slowModeThresholdMs: Long, slowModeMaxRequested: Int, - slowModeMeasurementImpact: Float, + slowModeMeasurementImpact: Double, appVersion: String, agentName: String, maxModifiersSpecMessageSize: Int, diff --git a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala index 919882d89..c0d2007bc 100644 --- a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala +++ b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala @@ -2,6 +2,8 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem} import akka.testkit.TestProbe +import org.mockito.Mockito.when +import org.mockito.MockitoSugar.mock import org.scalatest.matchers.should.Matchers import org.scalatest.propspec.AnyPropSpec import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks @@ -9,9 +11,10 @@ import sparkz.ObjectGenerators import sparkz.core.consensus.ContainsModifiers import sparkz.core.network.ModifiersStatus._ import sparkz.core.serialization.SparkzSerializer -import sparkz.core.{PersistentNodeViewModifier, ModifierTypeId} +import sparkz.core.settings.NetworkSettings +import sparkz.core.{ModifierTypeId, PersistentNodeViewModifier} import sparkz.crypto.hash.Blake2b256 -import sparkz.util.{bytesToId, ModifierId} +import sparkz.util.{ModifierId, bytesToId} import scala.collection.concurrent.TrieMap import scala.concurrent.ExecutionContext.Implicits.global @@ -159,13 +162,16 @@ class DeliveryTrackerSpecification extends AnyPropSpec val probe = TestProbe("p")(system) implicit val nvsStub: ActorRef = probe.testActor val dt = FiniteDuration(3, MINUTES) + val networkSettings = mock[NetworkSettings] + when(networkSettings.deliveryTimeout).thenReturn(dt) + when(networkSettings.maxDeliveryChecks).thenReturn(2) + when(networkSettings.maxRequestedPerPeer).thenReturn(3) + when(networkSettings.slowModeFeatureFlag).thenReturn(true) + when(networkSettings.slowModeThresholdMs).thenReturn(100) + when(networkSettings.slowModeMeasurementImpact).thenReturn(0.1) val deliveryTracker = new DeliveryTracker( system, - deliveryTimeout = dt, - maxDeliveryChecks = 2, - maxRequestedPerPeer = 3, - slowModeFeatureFlag = true, - slowModeThresholdMs = 100, + networkSettings, nvsRef = nvsStub) deliveryTracker.slowMode shouldBe false val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) @@ -184,13 +190,15 @@ class DeliveryTrackerSpecification extends AnyPropSpec val probe = TestProbe("p")(system) implicit val nvsStub: ActorRef = probe.testActor val dt = FiniteDuration(3, MINUTES) + val networkSettings = mock[NetworkSettings] + when(networkSettings.deliveryTimeout).thenReturn(dt) + when(networkSettings.maxDeliveryChecks).thenReturn(2) + when(networkSettings.maxRequestedPerPeer).thenReturn(3) + when(networkSettings.slowModeFeatureFlag).thenReturn(false) + when(networkSettings.slowModeThresholdMs).thenReturn(100) val deliveryTracker = new DeliveryTracker( system, - deliveryTimeout = dt, - maxDeliveryChecks = 2, - maxRequestedPerPeer = 3, - slowModeFeatureFlag = false, - slowModeThresholdMs = 10, + networkSettings, nvsRef = nvsStub) deliveryTracker.slowMode shouldBe false val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ ""))) @@ -209,7 +217,16 @@ class DeliveryTrackerSpecification extends AnyPropSpec val probe = TestProbe("p")(system) implicit val nvsStub: ActorRef = probe.testActor val dt = FiniteDuration(3, MINUTES) - new DeliveryTracker(system, deliveryTimeout = dt, maxDeliveryChecks = 2, maxRequestedPerPeer = 3, false, 0, nvsStub) + val networkSettings = mock[NetworkSettings] + when(networkSettings.deliveryTimeout).thenReturn(dt) + when(networkSettings.maxDeliveryChecks).thenReturn(2) + when(networkSettings.maxRequestedPerPeer).thenReturn(3) + when(networkSettings.slowModeFeatureFlag).thenReturn(true) + when(networkSettings.slowModeThresholdMs).thenReturn(100) + new DeliveryTracker( + system, + networkSettings, + nvsRef = nvsStub) } } diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index ff005efe4..aea418ad3 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -83,7 +83,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa timeProvider, modifierSerializers ) { - override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, false, 0, self) { + override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, settings.network, self) { override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested } } From a7f262059a207f70b8583610d226ae5d7d0eaeec Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 17 May 2023 16:05:09 +0300 Subject: [PATCH 14/30] review fixes --- build.sbt | 2 +- src/main/scala/sparkz/core/network/DeliveryTracker.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 24ee3c152..4cf92affc 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.0-RC12-SNAPSHOT2", + version := "2.0.1-SNAPSHOTX", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala index d3ff6d615..c9821d7ec 100644 --- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala +++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala @@ -286,7 +286,7 @@ class DeliveryTracker(system: ActorSystem, averageProcessingTimeMs = (averageProcessingTimeMs * (1 - slowModeMeasurementImpact)).toLong + (elapsedMs * slowModeMeasurementImpact).toLong if (averageProcessingTimeMs > slowModeThresholdMs && !slowMode) { slowMode = true - logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested or broadcasted.") + logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested and tx broadcast will be limited.") } else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) { slowMode = false logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.") From 8634aeb48322229d48979f0e0d48a529ce9d76db Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 17 May 2023 16:18:21 +0300 Subject: [PATCH 15/30] review fixes --- build.sbt | 2 +- src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 4cf92affc..b6db47fc7 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.1-SNAPSHOTX", + version := "2.0.1-SNAPSHOT1", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 36d46ed1b..9ccd7fa10 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -96,7 +96,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes protected def broadcastModifierInv[M <: NodeViewModifier](m: M): Unit = { m.modifierTypeId match { - case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load + case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load case _ => val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) networkControllerRef ! SendToNetwork(msg, Broadcast) From cc3cf32794d36c53b2c46f5fdb16f7c33be962a3 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 17 May 2023 17:23:22 +0300 Subject: [PATCH 16/30] review fixes --- ci/setup_env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 6e60f96d2..6c2b84958 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -61,7 +61,7 @@ if [ -n "${TRAVIS_TAG}" ]; then # Prod vs dev release if ( git branch -r --contains "${TRAVIS_TAG}" | grep -xqE ". origin\/${PROD_RELEASE_BRANCH}$" ); then # Checking if package version matches PROD release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then echo "Aborting, package version is in the wrong format for production release." exit 1 fi From 1c4cbd1e9a8d159427d49d582b5f55882f488bf9 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 17 May 2023 17:45:26 +0300 Subject: [PATCH 17/30] ci version fix --- ci/setup_env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 6c2b84958..77dfdfffe 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -81,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then export CONTAINER_PUBLISH="true" else # Checking if package version matches DEV release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}$ ]]; then echo "Aborting, package version is in the wrong format for development release." exit 1 fi From 5c1deccc55a41097d8b9815210d4a280f079c38d Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 17 May 2023 17:49:04 +0300 Subject: [PATCH 18/30] ci version fix --- ci/setup_env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 77dfdfffe..5604dbfc3 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -81,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then export CONTAINER_PUBLISH="true" else # Checking if package version matches DEV release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT[0-9]+){1}$ ]]; then echo "Aborting, package version is in the wrong format for development release." exit 1 fi From 560526a5495a6b48bbe3c2aedcf8a201a3ef581f Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 11:00:06 +0300 Subject: [PATCH 19/30] ci version fix --- ci/publish.sh | 4 ++-- ci/setup_env.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/publish.sh b/ci/publish.sh index d7cc9af4f..c91bde056 100755 --- a/ci/publish.sh +++ b/ci/publish.sh @@ -3,10 +3,10 @@ set -eo pipefail retval=0 -if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}[0-9]*$ ]]; then +if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then echo "" && echo "=== Publishing development release on Sonatype Nexus repository. Timestamp is: $(date '+%a %b %d %H:%M:%S %Z %Y') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publish -elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then +elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then echo "" && echo "=== Publishing production release on Maven repository. Timestamp is: $(date '+%Y-%m-%d %H:%M') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publishSigned sonatypeBundleRelease else diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 5604dbfc3..5e65d5693 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -81,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then export CONTAINER_PUBLISH="true" else # Checking if package version matches DEV release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT[0-9]+){1}$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then echo "Aborting, package version is in the wrong format for development release." exit 1 fi From 84a0d5404635b217824f99568bfd5bddf0cfa91a Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Fri, 7 Apr 2023 11:01:44 +0300 Subject: [PATCH 20/30] fix for inconsistent node status after node banning --- .../core/network/NetworkController.scala | 16 +++++++----- .../core/network/NetworkControllerSpec.scala | 25 ++++++++++++++++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 52db1c467..3c17495f4 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -534,13 +534,17 @@ class NetworkController(settings: NetworkSettings, } } - private def closeConnection(peerAddress: InetSocketAddress): Unit = - connections.get(peerAddress).foreach { peer => - connections = connections.filterNot { case (address, _) => // clear all connections related to banned peer ip - Option(peer.connectionId.remoteAddress.getAddress).exists(Option(address.getAddress).contains(_)) - } - peer.handlerRef ! CloseConnection + private def closeConnection(peerAddress: InetSocketAddress): Unit = { + connections = connections.filter { case (_, connectedPeer) => + Option(connectedPeer) + .filter(_.connectionId.remoteAddress.equals(peerAddress)) + .map { peer => + peer.handlerRef ! CloseConnection + context.system.eventStream.publish(DisconnectedPeer(peerAddress)) + } + .isEmpty } + } /** * Register a new penalty for given peer address. diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index d69563adf..8bc366bb9 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -15,8 +15,9 @@ import org.scalatest.matchers.should.Matchers import sparkz.core.app.{SparkzContext, Version} import sparkz.core.network.NetworkController.ReceivableMessages.Internal.ConnectionToPeer import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus} +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedPeer import sparkz.core.network.message._ -import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, GetAllPeers, RandomPeerForConnectionExcluding} +import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding} import sparkz.core.network.peer._ import sparkz.core.settings.SparkzSettings import sparkz.core.utils.LocalTimeProvider @@ -535,6 +536,28 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } + it should "on disconnect update SyncTracker" in { + implicit val system: ActorSystem = ActorSystem() + val testProbe = TestProbe() + val bindAddress = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = bindAddress)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, testProbe) + val testPeer1 = new TestPeer(settings2, networkControllerRef, testProbe) + + val peerAddress1 = new InetSocketAddress("88.77.66.55", 5678) + + testPeer1.connectAndExpectSuccessfulMessages(peerAddress1, bindAddress, Tcp.ResumeReading) + val handshakeFromNode1 = testPeer1.receiveHandshake + handshakeFromNode1.peerSpec.declaredAddress.value should be(bindAddress) + testPeer1.sendHandshake(None, None) + + networkControllerRef ! DisconnectFromAddress(peerAddress1) + system.eventStream.subscribe(testProbe.testActor, classOf[DisconnectedPeer]) + testProbe.expectMsg(DisconnectedPeer(peerAddress1)) + + system.terminate() + } + private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = { handshakeFromNode.peerSpec.localAddressOpt } From 159b019b3d719116a08e3aa24fc1cf434fe7400c Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 11:05:41 +0300 Subject: [PATCH 21/30] ci scripts fix --- ci/setup_env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/setup_env.sh b/ci/setup_env.sh index 5e65d5693..cc63d3d9d 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -19,7 +19,7 @@ function import_gpg_keys() { # shellcheck disable=SC2207 declare -r my_arr=( $(echo "${@}" | tr " " "\n") ) - for key in "${my_arr[@]}"; do +for key in "${my_arr[@]}"; do echo "Importing key: ${key}" gpg -v --batch --keyserver hkps://keys.openpgp.org --recv-keys "${key}" || gpg -v --batch --keyserver hkp://keyserver.ubuntu.com --recv-keys "${key}" || From 6c0512ec3dff3fd8a86f9c9cf2df3e99fa4b38bf Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 11:10:58 +0300 Subject: [PATCH 22/30] ci scripts fix From 00e9d5612cdc3ed57f8c6a726f8a2dde17c20678 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 14:45:14 +0300 Subject: [PATCH 23/30] remove sporadically failing on CI test --- .../core/network/NetworkControllerSpec.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index 8bc366bb9..f022897c2 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -536,28 +536,6 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } - it should "on disconnect update SyncTracker" in { - implicit val system: ActorSystem = ActorSystem() - val testProbe = TestProbe() - val bindAddress = new InetSocketAddress("88.77.66.55", 12345) - val settings2 = settings.copy(network = settings.network.copy(bindAddress = bindAddress)) - val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, testProbe) - val testPeer1 = new TestPeer(settings2, networkControllerRef, testProbe) - - val peerAddress1 = new InetSocketAddress("88.77.66.55", 5678) - - testPeer1.connectAndExpectSuccessfulMessages(peerAddress1, bindAddress, Tcp.ResumeReading) - val handshakeFromNode1 = testPeer1.receiveHandshake - handshakeFromNode1.peerSpec.declaredAddress.value should be(bindAddress) - testPeer1.sendHandshake(None, None) - - networkControllerRef ! DisconnectFromAddress(peerAddress1) - system.eventStream.subscribe(testProbe.testActor, classOf[DisconnectedPeer]) - testProbe.expectMsg(DisconnectedPeer(peerAddress1)) - - system.terminate() - } - private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = { handshakeFromNode.peerSpec.localAddressOpt } From 07e557573059aee911421eecb45a5231be0254a4 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 16:58:46 +0300 Subject: [PATCH 24/30] increase too low wait time that fails on CI --- src/test/scala/sparkz/core/network/NetworkControllerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index f022897c2..2cd55441a 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -523,7 +523,7 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { peerManagerProbe.expectMsg(RandomPeerForConnectionExcluding(Seq())) peerManagerProbe.reply(Some(getPeerInfo(peerAddressOne))) // Wait for the message to be received - Thread.sleep(2) + Thread.sleep(200) // Second attempt, discarding the peer we tried just before networkControllerRef ! ConnectionToPeer(emptyActiveConnections, emptyUnconfirmedConnections) From 389784e9ececbad8b1cc9bf66c9c831a882e8d55 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 18:06:57 +0300 Subject: [PATCH 25/30] version 2.0.1-SNAPSHOT --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index b6db47fc7..7fb4e0d79 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.1-SNAPSHOT1", + version := "2.0.1-SNAPSHOT", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := From 0e0ad987fc03e209798d7fee6df28b2cfde46ea3 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 18:12:13 +0300 Subject: [PATCH 26/30] revert ci script changes --- ci/publish.sh | 4 ++-- ci/setup_env.sh | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ci/publish.sh b/ci/publish.sh index c91bde056..d7cc9af4f 100755 --- a/ci/publish.sh +++ b/ci/publish.sh @@ -3,10 +3,10 @@ set -eo pipefail retval=0 -if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then +if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}[0-9]*$ ]]; then echo "" && echo "=== Publishing development release on Sonatype Nexus repository. Timestamp is: $(date '+%a %b %d %H:%M:%S %Z %Y') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publish -elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then +elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then echo "" && echo "=== Publishing production release on Maven repository. Timestamp is: $(date '+%Y-%m-%d %H:%M') ===" && echo "" sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publishSigned sonatypeBundleRelease else diff --git a/ci/setup_env.sh b/ci/setup_env.sh index cc63d3d9d..f923f04e4 100755 --- a/ci/setup_env.sh +++ b/ci/setup_env.sh @@ -61,7 +61,7 @@ if [ -n "${TRAVIS_TAG}" ]; then # Prod vs dev release if ( git branch -r --contains "${TRAVIS_TAG}" | grep -xqE ". origin\/${PROD_RELEASE_BRANCH}$" ); then # Checking if package version matches PROD release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then echo "Aborting, package version is in the wrong format for production release." exit 1 fi @@ -81,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then export CONTAINER_PUBLISH="true" else # Checking if package version matches DEV release version - if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then + if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}$ ]]; then echo "Aborting, package version is in the wrong format for development release." exit 1 fi From 930fab6f04df4f0c50bdd18531bcd551233a9bbc Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 18 May 2023 18:30:38 +0300 Subject: [PATCH 27/30] ci readme --- ci/README.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 ci/README.md diff --git a/ci/README.md b/ci/README.md new file mode 100644 index 000000000..0933f5d2f --- /dev/null +++ b/ci/README.md @@ -0,0 +1,58 @@ +# New version build functionality + +As of 2022/06/27 CI/CD pipeline adds functionality to build and publish multiple `DIGIT.DIGIT.DIGIT-SNAPSHOT` versions of `zendoo-sc-crypotolib` package +with the help of set_version.sh script. + +`set_version.sh` script is located under **ci/devtools** directory and automates preparation steps for building/releasing a new +version of the artifacts by setting the provided version for all the required dependencies across the configuration files. + +--- +## Prerequisites for publishing a package: + - Singed by GPG key commit and valid GitHub tag in the format of `DIGIT.DIGIT.DIGIT` or `DIGIT.DIGIT.DIGIT-SNAPSHOT` + - GitHub tag matching `${pom_version_of_package}"[0-9]*$` regex + - Your(a person who pushes a tag) GPG key being added to CI/CD pipeline build settings + +Otherwise, the build process will run without entering the publishing stage. + +`DIGIT.DIGIT.DIGIT-SNAPSHOT` package version can be built multiple times by adjusting GitHub tag name accordingly. For example: +``` +GitHub tag = 1.1.1-SNAPSHOT can build 1.1.1-SNAPSHOT package +GitHub tag = 1.1.1-SNAPSHOT1 can build 1.1.1-SNAPSHOT package +GitHub tag = 1.1.1-SNAPSHOT2 can build 1.1.1-SNAPSHOT package +``` +All SNAPSHOT packages are being pushed to a snapshot repository configured under pom.xml file: +``` + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + +``` +and can be referred to inside the configuration files by providing the full version, that can be found inside nexus [repository](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/) + +--- +## Usage +Before starting the build process use `set_version.sh` script if needed by providing two arguments in the following format: +``` + ./ci/devtools/set_version.sh --help + Usage: Provide OLD and NEW versions as the 1st and 2nd arguments respectively. + It has to match the following format: + DIGIT.DIGIT.DIGIT or DIGIT.DIGIT.DIGIT-SNAPSHOT + + For example: + ./set_version.sh 5.5.5 5.5.5-SNAPSHOT + ./set_version.sh 5.5.5-SNAPSHOT 5.5.5 +``` +| Changes made by set_version.sh script need to be committed before the build. | +|------------------------------------------------------------------------------| + +--- +## How to refer +- Find all the existing versions of [2.0.1-SNAPSHOT package](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/sparkz-core_2.13/2.0.1-SNAPSHOT/) +- Use the full version of SNAPSHOT package as a dependency in the following format for your project. +``` + + io.horizen + sparkz-core_2.12 + 2.0.1-20230310.201529-1 + +``` \ No newline at end of file From d518c9caf16c5478b1365e76c1e67648081f00e1 Mon Sep 17 00:00:00 2001 From: MarcoOl94 Date: Thu, 1 Jun 2023 17:41:01 +0200 Subject: [PATCH 28/30] Added additional log --- src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 9ccd7fa10..cb3a7a3ae 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -284,6 +284,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case Some(serializer: SparkzSerializer[PMOD]@unchecked) => // parse all modifiers and put them to modifiers cache + log.info(s"Received block ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}") val parsed: Iterable[PMOD] = parseModifiers(requestedModifiers, serializer, remote) val valid: Iterable[PMOD] = parsed.filter(validateAndSetStatus(remote, _)) if (valid.nonEmpty) viewHolderRef ! ModifiersFromRemote[PMOD](valid) From 8b97154a96ecff917b1770d0afd094362c01b1fe Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Fri, 23 Jun 2023 11:57:37 +0200 Subject: [PATCH 29/30] update release notes --- release-notes.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/release-notes.md b/release-notes.md index 2405772b6..3a73330c9 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,7 @@ +2.0.1 +--------- +* P2p rate limitng feature + 2.0.0-RC11 --------- * Changed library for the Bcrypt hashing algorithm and added additional unit tests From a7c4f011b5cbaf93f7ec4a996c781f7c3d456f04 Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Fri, 23 Jun 2023 18:06:17 +0200 Subject: [PATCH 30/30] 2.0.1 final --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 7fb4e0d79..bee84cd5d 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.1-SNAPSHOT", + version := "2.0.1", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra :=