Skip to content

Commit

Permalink
Merge pull request #70 from HorizenOfficial/is/sync_issue_fix
Browse files Browse the repository at this point in the history
start connecting to nodes only after the Synchronizer is initialized
  • Loading branch information
paolocappelletti authored Sep 1, 2023
2 parents 12a7f51 + 2340892 commit 9c6dae6
Showing 5 changed files with 32 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
version := "2.0.2",
version := "2.0.3",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
4 changes: 4 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2.0.3
---------
* Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized

2.0.2
---------
* P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled
24 changes: 16 additions & 8 deletions src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@ class NetworkController(settings: NetworkSettings,

override def receive: Receive =
bindingLogic orElse
startConnectingCommand orElse
businessLogic orElse
peerCommands orElse
connectionEvents orElse
@@ -92,15 +93,20 @@ class NetworkController(settings: NetworkSettings,
private def bindingLogic: Receive = {
case Bound(_) =>
log.info("Successfully bound to the port " + settings.bindAddress.getPort)
scheduleConnectionToPeer()
scheduleDroppingDeadConnections()

case CommandFailed(_: Bind) =>
log.error("Network port " + settings.bindAddress.getPort + " already in use!")
java.lang.System.exit(1) // Terminate node if port is in use
context stop self
}

private def startConnectingCommand: Receive = {
case StartConnectingPeers =>
log.info("Start connecting to peers")
scheduleConnectionToPeer()
scheduleDroppingDeadConnections()
}

private def networkTime(): Time = sparkzContext.timeProvider.time()

private def businessLogic: Receive = {
@@ -253,14 +259,14 @@ class NetworkController(settings: NetworkSettings,
* Schedule a periodic connection to a random known peer
*/
private def scheduleConnectionToPeer(): Unit = {
context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay) {
() => {
if (canEstablishNewOutgoingConnection) {
log.trace(s"Looking for a new random connection")
connectionToPeer(connections, unconfirmedConnections)
}
val connectionTask: Runnable = () => {
if (canEstablishNewOutgoingConnection) {
log.trace(s"Looking for a new random connection")
connectionToPeer(connections, unconfirmedConnections)
}
}
context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay)(connectionTask)
connectionTask.run()
}

private def canEstablishNewOutgoingConnection: Boolean = {
@@ -583,6 +589,8 @@ object NetworkController {

case object GetConnectedPeers

case object StartConnectingPeers

/**
* Get p2p network status
*/
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, Modifi
import sparkz.core.consensus.History._
import sparkz.core.consensus.{History, HistoryReader, SyncInfo}
import sparkz.core.network.ModifiersStatus.Requested
import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers}
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._
import sparkz.core.network.message._
import sparkz.core.network.peer.PenaltyType
@@ -76,6 +76,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
// register as a handler for synchronization-specific types of messages
val messageSpecs: Seq[MessageSpec[_]] = Seq(invSpec, requestModifierSpec, modifiersSpec, syncInfoSpec)
networkControllerRef ! RegisterMessageSpecs(messageSpecs, self)
// trigger connecting to peers
networkControllerRef ! StartConnectingPeers

// register as a listener for peers got connected (handshaked) or disconnected
context.system.eventStream.subscribe(self, classOf[HandshakedPeer])
@@ -169,9 +171,13 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
}

protected def sendSync(syncTracker: SyncTracker, history: HR): Unit = {
logger.debug("SYNC INFO: Trying to send sync info")
val peers = statusTracker.peersToSyncWith()
if (peers.nonEmpty) {
logger.debug(s"SYNC INFO: Sending sync info to peers $peers")
networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(history.syncInfo), None), SendToPeers(peers))
} else {
logger.debug("SYNC INFO: No peers to send sync info to")
}
}

@@ -181,6 +187,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes

//sync info is coming from another node
protected def processSync(syncInfo: SI, remote: ConnectedPeer): Unit = {
log.debug(s"SYNC INFO: Got sync info from $remote")
historyReaderOpt match {
case Some(historyReader) =>
val ext = historyReader.continuationIds(syncInfo, networkSettings.desiredInvObjects)
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package sparkz.core.network
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.TestProbe
import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote}
import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers}
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction}
import sparkz.core.network.message._
import sparkz.core.network.peer.PenaltyType
@@ -291,7 +291,9 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa
}
}
))

networkControllerProbe.expectMsgType[RegisterMessageSpecs](5000.millis)
networkControllerProbe.expectMsgType[StartConnectingPeers.type](5000.millis)
viewHolderProbe.expectMsgType[GetNodeViewChanges](5000.millis)

(nodeViewSynchronizerRef, networkControllerProbe, viewHolderProbe)

0 comments on commit 9c6dae6

Please sign in to comment.