diff --git a/build.sbt b/build.sbt index dc780590..68cbc048 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.2", + version := "2.0.3", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/release-notes.md b/release-notes.md index f681d7c6..d7bcf8fb 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,7 @@ +2.0.3 +--------- +* Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized + 2.0.2 --------- * P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 1bb5d51c..471e1eed 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -83,6 +83,7 @@ class NetworkController(settings: NetworkSettings, override def receive: Receive = bindingLogic orElse + startConnectingCommand orElse businessLogic orElse peerCommands orElse connectionEvents orElse @@ -92,8 +93,6 @@ class NetworkController(settings: NetworkSettings, private def bindingLogic: Receive = { case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) - scheduleConnectionToPeer() - scheduleDroppingDeadConnections() case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -101,6 +100,13 @@ class NetworkController(settings: NetworkSettings, context stop self } + private def startConnectingCommand: Receive = { + case StartConnectingPeers => + log.info("Start connecting to peers") + scheduleConnectionToPeer() + scheduleDroppingDeadConnections() + } + private def networkTime(): Time = sparkzContext.timeProvider.time() private def businessLogic: Receive = { @@ -253,14 +259,14 @@ class NetworkController(settings: NetworkSettings, * Schedule a periodic connection to a random known peer */ private def scheduleConnectionToPeer(): Unit = { - context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay) { - () => { - if (canEstablishNewOutgoingConnection) { - log.trace(s"Looking for a new random connection") - connectionToPeer(connections, unconfirmedConnections) - } + val connectionTask: Runnable = () => { + if (canEstablishNewOutgoingConnection) { + log.trace(s"Looking for a new random connection") + connectionToPeer(connections, unconfirmedConnections) } } + context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay)(connectionTask) + connectionTask.run() } private def canEstablishNewOutgoingConnection: Boolean = { @@ -583,6 +589,8 @@ object NetworkController { case object GetConnectedPeers + case object StartConnectingPeers + /** * Get p2p network status */ diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index d2ad5181..98cb9370 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -6,7 +6,7 @@ import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, Modifi import sparkz.core.consensus.History._ import sparkz.core.consensus.{History, HistoryReader, SyncInfo} import sparkz.core.network.ModifiersStatus.Requested -import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers} import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._ import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType @@ -76,6 +76,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes // register as a handler for synchronization-specific types of messages val messageSpecs: Seq[MessageSpec[_]] = Seq(invSpec, requestModifierSpec, modifiersSpec, syncInfoSpec) networkControllerRef ! RegisterMessageSpecs(messageSpecs, self) + // trigger connecting to peers + networkControllerRef ! StartConnectingPeers // register as a listener for peers got connected (handshaked) or disconnected context.system.eventStream.subscribe(self, classOf[HandshakedPeer]) @@ -169,9 +171,13 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes } protected def sendSync(syncTracker: SyncTracker, history: HR): Unit = { + logger.debug("SYNC INFO: Trying to send sync info") val peers = statusTracker.peersToSyncWith() if (peers.nonEmpty) { + logger.debug(s"SYNC INFO: Sending sync info to peers $peers") networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(history.syncInfo), None), SendToPeers(peers)) + } else { + logger.debug("SYNC INFO: No peers to send sync info to") } } @@ -181,6 +187,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes //sync info is coming from another node protected def processSync(syncInfo: SI, remote: ConnectedPeer): Unit = { + log.debug(s"SYNC INFO: Got sync info from $remote") historyReaderOpt match { case Some(historyReader) => val ext = historyReader.continuationIds(syncInfo, networkSettings.desiredInvObjects) diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 7cb4a937..1d447f0a 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -3,7 +3,7 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} -import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} +import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers} import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction} import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType @@ -291,7 +291,9 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa } } )) + networkControllerProbe.expectMsgType[RegisterMessageSpecs](5000.millis) + networkControllerProbe.expectMsgType[StartConnectingPeers.type](5000.millis) viewHolderProbe.expectMsgType[GetNodeViewChanges](5000.millis) (nodeViewSynchronizerRef, networkControllerProbe, viewHolderProbe)