Skip to content

Commit

Permalink
Merge pull request #82 from HorizenOfficial/is/fix_knownPeers_sync
Browse files Browse the repository at this point in the history
SDK-1731 - node not able to get new peers other than the known one
  • Loading branch information
paolocappelletti authored Mar 5, 2024
2 parents 65dfd18 + 4edad88 commit abfc538
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class NetworkController(settings: NetworkSettings,

val peersAlreadyTriedFewTimeBefore = getPeersWeAlreadyTriedToConnectFewTimeAgo

val randomPeerF = peerManagerRef ? RandomPeerForConnectionExcluding(peersAddresses ++ peersAlreadyTriedFewTimeBefore)
val randomPeerF = peerManagerRef ? RandomPeerForConnectionExcluding(peersAddresses ++ peersAlreadyTriedFewTimeBefore, settings.onlyConnectToKnownPeers)
randomPeerF.mapTo[Option[PeerInfo]].foreach {
case Some(peerInfo) =>
peerInfo.peerSpec.address.foreach(address => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext:
// fill database with known peers
settings.knownPeers.foreach { address =>
if (!NetworkUtils.isSelf(address, settings.bindAddress, sparkzContext.externalNodeAddress)) {
knownPeers += address -> PeerDatabaseValue(address, PeerInfo.fromAddress(address), PeerConfidence.High)
knownPeers += address -> PeerDatabaseValue(address, PeerInfo.fromAddress(address), PeerConfidence.KnownPeer)
}
}

Expand Down Expand Up @@ -104,10 +104,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext:
}

override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] =
if (settings.onlyConnectToKnownPeers)
knownPeers
else
knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers
knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers

override def blacklistedPeers: Seq[InetAddress] = blacklist
.collect { case (address, bannedTill) if checkBanned(address, bannedTill) =>
Expand Down Expand Up @@ -174,10 +171,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext:
}

override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] =
if (settings.onlyConnectToKnownPeers)
knownPeers
else
knownPeers ++ bucketManager.getRandomPeers
knownPeers ++ bucketManager.getRandomPeers

override def updatePeer(peerDatabaseValue: PeerDatabaseValue): Unit = {
if (peerIsNotBlacklistedAndNotKnownPeer(peerDatabaseValue)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/network/peer/PeerDatabase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object PeerDatabase {
*/
object PeerConfidence extends Enumeration {
type PeerConfidence = Value
val Unknown, Low, Medium, High, Forger: Value = Value
val Unknown, Low, Medium, KnownPeer, Forger: Value = Value
}

case class PeerDatabaseValue(address: InetSocketAddress, peerInfo: PeerInfo, confidence: PeerConfidence) {
Expand Down
42 changes: 20 additions & 22 deletions src/main/scala/sparkz/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sparkz.core.network.peer
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import sparkz.core.app.SparkzContext
import sparkz.core.network._
import sparkz.core.network.peer.PeerDatabase.PeerConfidence.PeerConfidence
import sparkz.core.network.peer.PeerDatabase.{PeerConfidence, PeerDatabaseValue}
import sparkz.core.settings.SparkzSettings
import sparkz.core.utils.NetworkUtils
Expand Down Expand Up @@ -174,9 +175,10 @@ object PeerManager {
blacklistedPeers: Seq[InetAddress],
sparkzContext: SparkzContext): Seq[PeerInfo] = {
val recentlySeenNonBlacklisted = peers.values.toSeq
.filter { p =>
(p.peerInfo.connectionType.isDefined || p.peerInfo.lastHandshake > 0) &&
!blacklistedPeers.contains(p.address.getAddress)
.filterNot(peer => blacklistedPeers.contains(peer.address.getAddress))
.filter { p => p.peerInfo.connectionType.isDefined ||
p.peerInfo.lastHandshake > 0 ||
p.confidence == PeerConfidence.KnownPeer
}
Random.shuffle(recentlySeenNonBlacklisted).take(howMany).map(_.peerInfo)
}
Expand All @@ -189,34 +191,30 @@ object PeerManager {
sparkzContext: SparkzContext): Map[InetSocketAddress, PeerInfo] = peers.map(p => p._1 -> p._2.peerInfo)
}

case class RandomPeerForConnectionExcluding(excludedPeers: Seq[Option[InetSocketAddress]]) extends GetPeers[Option[PeerInfo]] {
case class RandomPeerForConnectionExcluding(excludedPeers: Seq[Option[InetSocketAddress]], onlyKnownPeers: Boolean = false) extends GetPeers[Option[PeerInfo]] {
private val secureRandom = new SecureRandom()

override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue],
blacklistedPeers: Seq[InetAddress],
sparkzContext: SparkzContext): Option[PeerInfo] = {
var response: Option[PeerInfo] = None
val candidates: Map[PeerConfidence, Seq[PeerDatabaseValue]] = peers.values.toSeq
.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _))
.groupBy(_.confidence)

val forgerPeers = peers.filter(_._2.confidence == PeerConfidence.Forger)
val forgerCandidates = forgerPeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq
val knownPeersCandidates = candidates.getOrElse(PeerConfidence.KnownPeer, Seq())
val forgerCandidates = candidates.getOrElse(PeerConfidence.Forger, Seq())

if (forgerCandidates.nonEmpty) {
response = Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo)
if (onlyKnownPeers) {
Some(knownPeersCandidates(secureRandom.nextInt(knownPeersCandidates.size)).peerInfo)
} else {
val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High)
val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq

if (highConfidenceCandidates.nonEmpty) {
response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo)
} else {
val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq

if (candidates.nonEmpty)
response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo)
}
if (forgerCandidates.nonEmpty)
Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo)
else if (knownPeersCandidates.nonEmpty)
Some(knownPeersCandidates(secureRandom.nextInt(knownPeersCandidates.size)).peerInfo)
else if (candidates.nonEmpty)
Some(candidates.values.flatten.toSeq(secureRandom.nextInt(candidates.size)).peerInfo)
else None
}

response
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,53 +277,13 @@ class InMemoryPeerDatabaseSpec extends NetworkTests with ObjectGenerators with B
db.addOrUpdateKnownPeer(peerDatabaseValueThree)

val allPeers = db.allPeers
allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High)
allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.KnownPeer)
allPeers.contains(firstAddress) shouldBe true
allPeers.contains(secondAddress) shouldBe true
allPeers.contains(thirdAddress) shouldBe true
}
}

it should "only return knownPeers if the flag is set to true" in {
val firstAddress = new InetSocketAddress(10)
val secondAddress = new InetSocketAddress(11)
val thirdAddress = new InetSocketAddress(12)
val forthAddress = new InetSocketAddress(13)
val fifthAddress = new InetSocketAddress(14)
val sixthAddress = new InetSocketAddress(15)
val knownPeers = Seq(firstAddress, secondAddress, thirdAddress)

def withDbHavingKnownPeers(test: InMemoryPeerDatabase => Assertion): Assertion =
test(new InMemoryPeerDatabase(
settings.copy(network = settings.network.copy(penaltySafeInterval = 1.seconds, knownPeers = knownPeers, onlyConnectToKnownPeers = true)),
sparkzContext
))

withDbHavingKnownPeers { db =>
val extraPeerOne = PeerDatabaseValue(forthAddress, getPeerInfo(forthAddress), PeerConfidence.Unknown)
val extraPeerTwo = PeerDatabaseValue(fifthAddress, getPeerInfo(fifthAddress), PeerConfidence.Unknown)
val extraPeerThree = PeerDatabaseValue(sixthAddress, getPeerInfo(sixthAddress), PeerConfidence.Unknown)

db.addOrUpdateKnownPeer(extraPeerOne)
db.addOrUpdateKnownPeer(extraPeerTwo)
db.addOrUpdateKnownPeer(extraPeerThree)

val allPeers = db.allPeers
allPeers.size shouldBe 3
allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High)
allPeers.contains(firstAddress) shouldBe true
allPeers.contains(secondAddress) shouldBe true
allPeers.contains(thirdAddress) shouldBe true

val randomPeersSubset = db.randomPeersSubset
randomPeersSubset.size shouldBe 3
randomPeersSubset.foreach(p => p._2.confidence shouldBe PeerConfidence.High)
randomPeersSubset.contains(firstAddress) shouldBe true
randomPeersSubset.contains(secondAddress) shouldBe true
randomPeersSubset.contains(thirdAddress) shouldBe true
}
}

it should "check blacklisted peers expiration and return only not expired" in {
withDb { db =>
db.blacklistedPeers shouldBe empty
Expand Down

0 comments on commit abfc538

Please sign in to comment.