diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 5722194a..ff6fdae3 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -317,7 +317,7 @@ class NetworkController(settings: NetworkSettings, val peersAlreadyTriedFewTimeBefore = getPeersWeAlreadyTriedToConnectFewTimeAgo - val randomPeerF = peerManagerRef ? RandomPeerForConnectionExcluding(peersAddresses ++ peersAlreadyTriedFewTimeBefore) + val randomPeerF = peerManagerRef ? RandomPeerForConnectionExcluding(peersAddresses ++ peersAlreadyTriedFewTimeBefore, settings.onlyConnectToKnownPeers) randomPeerF.mapTo[Option[PeerInfo]].foreach { case Some(peerInfo) => peerInfo.peerSpec.address.foreach(address => { diff --git a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala index b33ee300..2d1bc40c 100644 --- a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala @@ -50,7 +50,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: // fill database with known peers settings.knownPeers.foreach { address => if (!NetworkUtils.isSelf(address, settings.bindAddress, sparkzContext.externalNodeAddress)) { - knownPeers += address -> PeerDatabaseValue(address, PeerInfo.fromAddress(address), PeerConfidence.High) + knownPeers += address -> PeerDatabaseValue(address, PeerInfo.fromAddress(address), PeerConfidence.KnownPeer) } } @@ -104,10 +104,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: } override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] = - if (settings.onlyConnectToKnownPeers) - knownPeers - else - knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers + knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers override def blacklistedPeers: Seq[InetAddress] = blacklist .collect { case (address, bannedTill) if checkBanned(address, bannedTill) => @@ -174,10 +171,7 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: } override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] = - if (settings.onlyConnectToKnownPeers) - knownPeers - else - knownPeers ++ bucketManager.getRandomPeers + knownPeers ++ bucketManager.getRandomPeers override def updatePeer(peerDatabaseValue: PeerDatabaseValue): Unit = { if (peerIsNotBlacklistedAndNotKnownPeer(peerDatabaseValue)) { diff --git a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala index 24dd03fe..2d67ff88 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala @@ -45,7 +45,7 @@ object PeerDatabase { */ object PeerConfidence extends Enumeration { type PeerConfidence = Value - val Unknown, Low, Medium, High, Forger: Value = Value + val Unknown, Low, Medium, KnownPeer, Forger: Value = Value } case class PeerDatabaseValue(address: InetSocketAddress, peerInfo: PeerInfo, confidence: PeerConfidence) { diff --git a/src/main/scala/sparkz/core/network/peer/PeerManager.scala b/src/main/scala/sparkz/core/network/peer/PeerManager.scala index 29e0c966..fae036ce 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerManager.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerManager.scala @@ -3,6 +3,7 @@ package sparkz.core.network.peer import akka.actor.{Actor, ActorRef, ActorSystem, Props} import sparkz.core.app.SparkzContext import sparkz.core.network._ +import sparkz.core.network.peer.PeerDatabase.PeerConfidence.PeerConfidence import sparkz.core.network.peer.PeerDatabase.{PeerConfidence, PeerDatabaseValue} import sparkz.core.settings.SparkzSettings import sparkz.core.utils.NetworkUtils @@ -174,9 +175,10 @@ object PeerManager { blacklistedPeers: Seq[InetAddress], sparkzContext: SparkzContext): Seq[PeerInfo] = { val recentlySeenNonBlacklisted = peers.values.toSeq - .filter { p => - (p.peerInfo.connectionType.isDefined || p.peerInfo.lastHandshake > 0) && - !blacklistedPeers.contains(p.address.getAddress) + .filterNot(peer => blacklistedPeers.contains(peer.address.getAddress)) + .filter { p => p.peerInfo.connectionType.isDefined || + p.peerInfo.lastHandshake > 0 || + p.confidence == PeerConfidence.KnownPeer } Random.shuffle(recentlySeenNonBlacklisted).take(howMany).map(_.peerInfo) } @@ -189,34 +191,30 @@ object PeerManager { sparkzContext: SparkzContext): Map[InetSocketAddress, PeerInfo] = peers.map(p => p._1 -> p._2.peerInfo) } - case class RandomPeerForConnectionExcluding(excludedPeers: Seq[Option[InetSocketAddress]]) extends GetPeers[Option[PeerInfo]] { + case class RandomPeerForConnectionExcluding(excludedPeers: Seq[Option[InetSocketAddress]], onlyKnownPeers: Boolean = false) extends GetPeers[Option[PeerInfo]] { private val secureRandom = new SecureRandom() override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue], blacklistedPeers: Seq[InetAddress], sparkzContext: SparkzContext): Option[PeerInfo] = { - var response: Option[PeerInfo] = None + val candidates: Map[PeerConfidence, Seq[PeerDatabaseValue]] = peers.values.toSeq + .filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)) + .groupBy(_.confidence) - val forgerPeers = peers.filter(_._2.confidence == PeerConfidence.Forger) - val forgerCandidates = forgerPeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + val knownPeersCandidates = candidates.getOrElse(PeerConfidence.KnownPeer, Seq()) + val forgerCandidates = candidates.getOrElse(PeerConfidence.Forger, Seq()) - if (forgerCandidates.nonEmpty) { - response = Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo) + if (onlyKnownPeers) { + Some(knownPeersCandidates(secureRandom.nextInt(knownPeersCandidates.size)).peerInfo) } else { - val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) - val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (highConfidenceCandidates.nonEmpty) { - response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) - } else { - val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (candidates.nonEmpty) - response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) - } + if (forgerCandidates.nonEmpty) + Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo) + else if (knownPeersCandidates.nonEmpty) + Some(knownPeersCandidates(secureRandom.nextInt(knownPeersCandidates.size)).peerInfo) + else if (candidates.nonEmpty) + Some(candidates.values.flatten.toSeq(secureRandom.nextInt(candidates.size)).peerInfo) + else None } - - response } } diff --git a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala index 40a06f90..10b560d2 100644 --- a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala +++ b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala @@ -277,53 +277,13 @@ class InMemoryPeerDatabaseSpec extends NetworkTests with ObjectGenerators with B db.addOrUpdateKnownPeer(peerDatabaseValueThree) val allPeers = db.allPeers - allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High) + allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.KnownPeer) allPeers.contains(firstAddress) shouldBe true allPeers.contains(secondAddress) shouldBe true allPeers.contains(thirdAddress) shouldBe true } } - it should "only return knownPeers if the flag is set to true" in { - val firstAddress = new InetSocketAddress(10) - val secondAddress = new InetSocketAddress(11) - val thirdAddress = new InetSocketAddress(12) - val forthAddress = new InetSocketAddress(13) - val fifthAddress = new InetSocketAddress(14) - val sixthAddress = new InetSocketAddress(15) - val knownPeers = Seq(firstAddress, secondAddress, thirdAddress) - - def withDbHavingKnownPeers(test: InMemoryPeerDatabase => Assertion): Assertion = - test(new InMemoryPeerDatabase( - settings.copy(network = settings.network.copy(penaltySafeInterval = 1.seconds, knownPeers = knownPeers, onlyConnectToKnownPeers = true)), - sparkzContext - )) - - withDbHavingKnownPeers { db => - val extraPeerOne = PeerDatabaseValue(forthAddress, getPeerInfo(forthAddress), PeerConfidence.Unknown) - val extraPeerTwo = PeerDatabaseValue(fifthAddress, getPeerInfo(fifthAddress), PeerConfidence.Unknown) - val extraPeerThree = PeerDatabaseValue(sixthAddress, getPeerInfo(sixthAddress), PeerConfidence.Unknown) - - db.addOrUpdateKnownPeer(extraPeerOne) - db.addOrUpdateKnownPeer(extraPeerTwo) - db.addOrUpdateKnownPeer(extraPeerThree) - - val allPeers = db.allPeers - allPeers.size shouldBe 3 - allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High) - allPeers.contains(firstAddress) shouldBe true - allPeers.contains(secondAddress) shouldBe true - allPeers.contains(thirdAddress) shouldBe true - - val randomPeersSubset = db.randomPeersSubset - randomPeersSubset.size shouldBe 3 - randomPeersSubset.foreach(p => p._2.confidence shouldBe PeerConfidence.High) - randomPeersSubset.contains(firstAddress) shouldBe true - randomPeersSubset.contains(secondAddress) shouldBe true - randomPeersSubset.contains(thirdAddress) shouldBe true - } - } - it should "check blacklisted peers expiration and return only not expired" in { withDb { db => db.blacklistedPeers shouldBe empty