From b2cf983e97c648e3e817bdb6eeb927a21f0b4a13 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 31 Aug 2023 17:15:47 +0300 Subject: [PATCH 1/2] start connecting to nodes only after the Synchronizer is initialized --- .../core/network/NetworkController.scala | 24 ++++++++++++------- .../core/network/NodeViewSynchronizer.scala | 9 ++++++- .../NodeViewSynchronizerSpecification.scala | 4 +++- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 1bb5d51c..471e1eed 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -83,6 +83,7 @@ class NetworkController(settings: NetworkSettings, override def receive: Receive = bindingLogic orElse + startConnectingCommand orElse businessLogic orElse peerCommands orElse connectionEvents orElse @@ -92,8 +93,6 @@ class NetworkController(settings: NetworkSettings, private def bindingLogic: Receive = { case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) - scheduleConnectionToPeer() - scheduleDroppingDeadConnections() case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -101,6 +100,13 @@ class NetworkController(settings: NetworkSettings, context stop self } + private def startConnectingCommand: Receive = { + case StartConnectingPeers => + log.info("Start connecting to peers") + scheduleConnectionToPeer() + scheduleDroppingDeadConnections() + } + private def networkTime(): Time = sparkzContext.timeProvider.time() private def businessLogic: Receive = { @@ -253,14 +259,14 @@ class NetworkController(settings: NetworkSettings, * Schedule a periodic connection to a random known peer */ private def scheduleConnectionToPeer(): Unit = { - context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay) { - () => { - if (canEstablishNewOutgoingConnection) { - log.trace(s"Looking for a new random connection") - connectionToPeer(connections, unconfirmedConnections) - } + val connectionTask: Runnable = () => { + if (canEstablishNewOutgoingConnection) { + log.trace(s"Looking for a new random connection") + connectionToPeer(connections, unconfirmedConnections) } } + context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay)(connectionTask) + connectionTask.run() } private def canEstablishNewOutgoingConnection: Boolean = { @@ -583,6 +589,8 @@ object NetworkController { case object GetConnectedPeers + case object StartConnectingPeers + /** * Get p2p network status */ diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index d2ad5181..98cb9370 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -6,7 +6,7 @@ import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, Modifi import sparkz.core.consensus.History._ import sparkz.core.consensus.{History, HistoryReader, SyncInfo} import sparkz.core.network.ModifiersStatus.Requested -import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers} import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._ import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType @@ -76,6 +76,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes // register as a handler for synchronization-specific types of messages val messageSpecs: Seq[MessageSpec[_]] = Seq(invSpec, requestModifierSpec, modifiersSpec, syncInfoSpec) networkControllerRef ! RegisterMessageSpecs(messageSpecs, self) + // trigger connecting to peers + networkControllerRef ! StartConnectingPeers // register as a listener for peers got connected (handshaked) or disconnected context.system.eventStream.subscribe(self, classOf[HandshakedPeer]) @@ -169,9 +171,13 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes } protected def sendSync(syncTracker: SyncTracker, history: HR): Unit = { + logger.debug("SYNC INFO: Trying to send sync info") val peers = statusTracker.peersToSyncWith() if (peers.nonEmpty) { + logger.debug(s"SYNC INFO: Sending sync info to peers $peers") networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(history.syncInfo), None), SendToPeers(peers)) + } else { + logger.debug("SYNC INFO: No peers to send sync info to") } } @@ -181,6 +187,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes //sync info is coming from another node protected def processSync(syncInfo: SI, remote: ConnectedPeer): Unit = { + log.debug(s"SYNC INFO: Got sync info from $remote") historyReaderOpt match { case Some(historyReader) => val ext = historyReader.continuationIds(syncInfo, networkSettings.desiredInvObjects) diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 7cb4a937..1d447f0a 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -3,7 +3,7 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} -import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers} import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction} import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType @@ -291,7 +291,9 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa } } )) + networkControllerProbe.expectMsgType[RegisterMessageSpecs](5000.millis) + networkControllerProbe.expectMsgType[StartConnectingPeers.type](5000.millis) viewHolderProbe.expectMsgType[GetNodeViewChanges](5000.millis) (nodeViewSynchronizerRef, networkControllerProbe, viewHolderProbe) From 2340892dec038dd126f98731b68c378b98d27951 Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Fri, 1 Sep 2023 14:10:14 +0200 Subject: [PATCH 2/2] bump version and changelog --- build.sbt | 2 +- release-notes.md | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index dc780590..68cbc048 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.2", + version := "2.0.3", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/release-notes.md b/release-notes.md index f681d7c6..d7bcf8fb 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,7 @@ +2.0.3 +--------- +* Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized + 2.0.2 --------- * P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled