Skip to content

Commit

Permalink
Merge pull request #61 from HorizenOfficial/dev
Browse files Browse the repository at this point in the history
2_0_1_final
  • Loading branch information
paolocappelletti authored Jun 23, 2023
2 parents 81eea41 + 65e4aea commit cf632ab
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
version := "2.0.0-RC11",
version := "2.0.1",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
Expand Down
58 changes: 58 additions & 0 deletions ci/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# New version build functionality

As of 2022/06/27 CI/CD pipeline adds functionality to build and publish multiple `DIGIT.DIGIT.DIGIT-SNAPSHOT` versions of `zendoo-sc-crypotolib` package
with the help of set_version.sh script.

`set_version.sh` script is located under **ci/devtools** directory and automates preparation steps for building/releasing a new
version of the artifacts by setting the provided version for all the required dependencies across the configuration files.

---
## Prerequisites for publishing a package:
- Singed by GPG key commit and valid GitHub tag in the format of `DIGIT.DIGIT.DIGIT` or `DIGIT.DIGIT.DIGIT-SNAPSHOT`
- GitHub tag matching `${pom_version_of_package}"[0-9]*$` regex
- Your(a person who pushes a tag) GPG key being added to CI/CD pipeline build settings

Otherwise, the build process will run without entering the publishing stage.

`DIGIT.DIGIT.DIGIT-SNAPSHOT` package version can be built multiple times by adjusting GitHub tag name accordingly. For example:
```
GitHub tag = 1.1.1-SNAPSHOT can build 1.1.1-SNAPSHOT package
GitHub tag = 1.1.1-SNAPSHOT1 can build 1.1.1-SNAPSHOT package
GitHub tag = 1.1.1-SNAPSHOT2 can build 1.1.1-SNAPSHOT package
```
All SNAPSHOT packages are being pushed to a snapshot repository configured under pom.xml file:
```
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
```
and can be referred to inside the configuration files by providing the full version, that can be found inside nexus [repository](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/)

---
## Usage
Before starting the build process use `set_version.sh` script if needed by providing two arguments in the following format:
```
./ci/devtools/set_version.sh --help
Usage: Provide OLD and NEW versions as the 1st and 2nd arguments respectively.
It has to match the following format:
DIGIT.DIGIT.DIGIT or DIGIT.DIGIT.DIGIT-SNAPSHOT
For example:
./set_version.sh 5.5.5 5.5.5-SNAPSHOT
./set_version.sh 5.5.5-SNAPSHOT 5.5.5
```
| Changes made by set_version.sh script need to be committed before the build. |
|------------------------------------------------------------------------------|

---
## How to refer
- Find all the existing versions of [2.0.1-SNAPSHOT package](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/sparkz-core_2.13/2.0.1-SNAPSHOT/)
- Use the full version of SNAPSHOT package as a dependency in the following format for your project.
```
<dependency>
<groupId>io.horizen</groupId>
<artifactId>sparkz-core_2.12</artifactId>
<version>2.0.1-20230310.201529-1</version>
</dependency>
```
4 changes: 2 additions & 2 deletions ci/publish.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
set -eo pipefail
retval=0

if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}[0-9]*$ ]]; then
if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then
echo "" && echo "=== Publishing development release on Sonatype Nexus repository. Timestamp is: $(date '+%a %b %d %H:%M:%S %Z %Y') ===" && echo ""
sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publish
elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then
elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "" && echo "=== Publishing production release on Maven repository. Timestamp is: $(date '+%Y-%m-%d %H:%M') ===" && echo ""
sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publishSigned sonatypeBundleRelease
else
Expand Down
9 changes: 5 additions & 4 deletions ci/setup_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ function import_gpg_keys() {
# shellcheck disable=SC2207
declare -r my_arr=( $(echo "${@}" | tr " " "\n") )

for key in "${my_arr[@]}"; do
for key in "${my_arr[@]}"; do
echo "Importing key: ${key}"
gpg -v --batch --keyserver hkps://keys.openpgp.org --recv-keys "${key}" ||
gpg -v --batch --keyserver hkp://keyserver.ubuntu.com --recv-keys "${key}" ||
gpg -v --batch --keyserver hkp://pgp.mit.edu:80 --recv-keys "${key}" ||
gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}"
gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" ||
echo -e "${key} can not be found on GPG key servers. Please upload it to at least one of the following GPG key servers:\nhttps://keys.openpgp.org/\nhttps://keyserver.ubuntu.com/\nhttps://pgp.mit.edu/"
done
}

Expand Down Expand Up @@ -60,7 +61,7 @@ if [ -n "${TRAVIS_TAG}" ]; then
# Prod vs dev release
if ( git branch -r --contains "${TRAVIS_TAG}" | grep -xqE ". origin\/${PROD_RELEASE_BRANCH}$" ); then
# Checking if package version matches PROD release version
if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then
if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "Aborting, package version is in the wrong format for production release."
exit 1
fi
Expand All @@ -80,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then
export CONTAINER_PUBLISH="true"
else
# Checking if package version matches DEV release version
if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}$ ]]; then
if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then
echo "Aborting, package version is in the wrong format for development release."
exit 1
fi
Expand Down
4 changes: 4 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2.0.1
---------
* P2p rate limitng feature

2.0.0-RC11
---------
* Changed library for the Bcrypt hashing algorithm and added additional unit tests
Expand Down
14 changes: 14 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ sparkz {
# Limit for number of modifiers to request and process at once
maxRequestedPerPeer = 1024

# Enables or disables slow mode - ignoring requesting or broadcasting new transaction when node is overloaded.
slowModeFeatureFlag = false

# Threshold of average time it takes node to apply a new modifier(block or transaction),
# after which we consider it overloaded and start throttling.
slowModeThresholdMs = 2000

# Maximum number of modifiers that can be requested from remote peers,
# when the node is considered overloaded. Only affects transactions.
slowModeMaxRequested = 150

# The impact is single measurement has on a an average processing value.
slowModeMeasurementImpact = 0.1

# Desired number of inv objects. Our requests will have this size.
desiredInvObjects = 512

Expand Down
61 changes: 52 additions & 9 deletions src/main/scala/sparkz/core/network/DeliveryTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable}
import sparkz.core.consensus.ContainsModifiers
import sparkz.core.network.ModifiersStatus._
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.CheckDelivery
import sparkz.core.settings.NetworkSettings
import sparkz.util.SparkzEncoding
import sparkz.core.{ModifierTypeId, NodeViewModifier}
import sparkz.util.{ModifierId, SparkzLogging}

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -35,11 +37,17 @@ import scala.util.{Failure, Try}
* and its methods should not be called from lambdas, Future, Future.map, etc.
*/
class DeliveryTracker(system: ActorSystem,
deliveryTimeout: FiniteDuration,
maxDeliveryChecks: Int,
maxRequestedPerPeer: Int,
networkSettings: NetworkSettings,
nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding {

protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer
protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag
protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs
protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested
protected val slowModeMeasurementImpact: Double = networkSettings.slowModeMeasurementImpact

protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int)

// when a remote peer is asked for a modifier we add the requested data to `requested`
Expand All @@ -52,7 +60,10 @@ class DeliveryTracker(system: ActorSystem,
protected val invalid: mutable.HashSet[ModifierId] = mutable.HashSet()

// when our node received a modifier we put it to `received`
protected val received: mutable.Map[ModifierId, ConnectedPeer] = mutable.Map()
protected val received: mutable.Map[ModifierId, (ConnectedPeer, Long)] = mutable.Map()

private var averageProcessingTimeMs: Long = 0
var slowMode: Boolean = false

/**
* @return status of modifier `id`.
Expand Down Expand Up @@ -99,8 +110,8 @@ class DeliveryTracker(system: ActorSystem,
require(isCorrectTransition(status(id), Requested), s"Illegal status transition: ${status(id)} -> Requested")
val cancellable = system.scheduler.scheduleOnce(deliveryTimeout, nvsRef, CheckDelivery(supplier, typeId, id))
requested.put(id, RequestedInfo(supplier, cancellable, checksDone)) match {
case Some(RequestedInfo(peer,_,_)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted
case Some(RequestedInfo(peer,_,_)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier)
case Some(RequestedInfo(peer, _, _)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted
case Some(RequestedInfo(peer, _, _)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier)
case None => incrementPeerLimitCounter(supplier)
}
}
Expand All @@ -127,6 +138,10 @@ class DeliveryTracker(system: ActorSystem,
.map(decrementPeerLimitCounter)
case Received =>
received.remove(modifierId)
.collect { case (peer, timestamp) =>
updateProcessingTime(timestamp)
peer
}
case _ =>
None
}
Expand Down Expand Up @@ -166,12 +181,12 @@ class DeliveryTracker(system: ActorSystem,
def setReceived(id: ModifierId, sender: ConnectedPeer): Unit =
tryWithLogging {
val oldStatus: ModifiersStatus = status(id)
require(isCorrectTransition(oldStatus, Invalid), s"Illegal status transition: $oldStatus -> Received")
require(isCorrectTransition(oldStatus, Received), s"Illegal status transition: $oldStatus -> Received")
if (oldStatus != Received) {
requested(id).cancellable.cancel()
requested.remove(id)
decrementPeerLimitCounter(sender)
received.put(id, sender)
received.put(id, (sender, System.nanoTime()))
}
}

Expand All @@ -181,7 +196,7 @@ class DeliveryTracker(system: ActorSystem,
case Requested =>
requested.get(id).map(_.peer)
case Received =>
received.get(id)
received.get(id).map(_._1)
case _ =>
None
}
Expand All @@ -191,6 +206,19 @@ class DeliveryTracker(system: ActorSystem,
maxRequestedPerPeer - peerLimits.getOrElse(peer, 0)
}

/**
* Check if we have capacity to request more transactions from remote peers.
* In order to decide that node cannot request more transactions, all 3 conditions must be satisfied:
* - feature flag is enabled
* - current node is in slow mode - average time it takes to process a modifier is higher than a threshold
* - number of concurrently requested modifiers is bigger than a max allowed value
*
* @return
*/
def canRequestMoreTransactions: Boolean = {
!(slowModeFeatureFlag && slowMode && requested.size > slowModeMaxRequested)
}

private def incrementPeerLimitCounter(peer: ConnectedPeer): Unit = {
peerLimits.get(peer) match {
case Some(value) => peerLimits.put(peer, value + 1)
Expand Down Expand Up @@ -234,6 +262,7 @@ class DeliveryTracker(system: ActorSystem,
.map(decrementPeerLimitCounter)
case Received =>
received.remove(id)
.foreach(peer_timestamp => updateProcessingTime(peer_timestamp._2))
case _ =>
()
}
Expand All @@ -250,4 +279,18 @@ class DeliveryTracker(system: ActorSystem,
log.warn("Unexpected error", e)
Failure(e)
}

private def updateProcessingTime(startTime: Long): Unit = {
if (slowModeFeatureFlag) {
val elapsedMs: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
averageProcessingTimeMs = (averageProcessingTimeMs * (1 - slowModeMeasurementImpact)).toLong + (elapsedMs * slowModeMeasurementImpact).toLong
if (averageProcessingTimeMs > slowModeThresholdMs && !slowMode) {
slowMode = true
logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested and tx broadcast will be limited.")
} else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) {
slowMode = false
logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.")
}
}
}
}
16 changes: 10 additions & 6 deletions src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,17 @@ class NetworkController(settings: NetworkSettings,
}
}

private def closeConnection(peerAddress: InetSocketAddress): Unit =
connections.get(peerAddress).foreach { peer =>
connections = connections.filterNot { case (address, _) => // clear all connections related to banned peer ip
Option(peer.connectionId.remoteAddress.getAddress).exists(Option(address.getAddress).contains(_))
}
peer.handlerRef ! CloseConnection
private def closeConnection(peerAddress: InetSocketAddress): Unit = {
connections = connections.filter { case (_, connectedPeer) =>
Option(connectedPeer)
.filter(_.connectionId.remoteAddress.equals(peerAddress))
.map { peer =>
peer.handlerRef ! CloseConnection
context.system.eventStream.publish(DisconnectedPeer(peerAddress))
}
.isEmpty
}
}

/**
* Register a new penalty for given peer address.
Expand Down
20 changes: 13 additions & 7 deletions src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
modifierSerializers: Map[ModifierTypeId, SparkzSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext)
extends Actor with Synchronizer with SparkzLogging with SparkzEncoding {

protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer
protected val invSpec = new InvSpec(networkSettings.maxInvObjects)
protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects)
protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize)
Expand All @@ -68,7 +65,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote)
}

protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, self)
protected val deliveryTracker = new DeliveryTracker(context.system, networkSettings, self)
protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider)

protected var historyReaderOpt: Option[HR] = None
Expand Down Expand Up @@ -98,8 +95,12 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
private def readersOpt: Option[(HR, MR)] = historyReaderOpt.flatMap(h => mempoolReaderOpt.map(mp => (h, mp)))

protected def broadcastModifierInv[M <: NodeViewModifier](m: M): Unit = {
val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None)
networkControllerRef ! SendToNetwork(msg, Broadcast)
m.modifierTypeId match {
case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load
case _ =>
val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None)
networkControllerRef ! SendToNetwork(msg, Broadcast)
}
}

protected def viewHolderEvents: Receive = {
Expand Down Expand Up @@ -224,7 +225,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
val modifierTypeId = invData.typeId
val newModifierIds = (modifierTypeId match {
case Transaction.ModifierTypeId =>
invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown)
if (deliveryTracker.canRequestMoreTransactions)
invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown)
else
Seq() // do not request transactions due to the high load
case _ =>
invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown)
})
Expand Down Expand Up @@ -275,10 +279,12 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
case Some(serializer: SparkzSerializer[TX]@unchecked) if typeId == Transaction.ModifierTypeId =>
// parse all transactions and send them to node view holder
val parsed: Iterable[TX] = parseModifiers(requestedModifiers, serializer, remote)
parsed.foreach(tx => deliveryTracker.setReceived(tx.id, remote))
viewHolderRef ! TransactionsFromRemote(parsed)

case Some(serializer: SparkzSerializer[PMOD]@unchecked) =>
// parse all modifiers and put them to modifiers cache
log.info(s"Received block ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}")
val parsed: Iterable[PMOD] = parseModifiers(requestedModifiers, serializer, remote)
val valid: Iterable[PMOD] = parsed.filter(validateAndSetStatus(remote, _))
if (valid.nonEmpty) viewHolderRef ! ModifiersFromRemote[PMOD](valid)
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/sparkz/core/settings/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ case class NetworkSettings(nodeName: String,
maxDeliveryChecks: Int,
penalizeNonDelivery: Boolean,
maxRequestedPerPeer: Int,
slowModeFeatureFlag: Boolean,
slowModeThresholdMs: Long,
slowModeMaxRequested: Int,
slowModeMeasurementImpact: Double,
appVersion: String,
agentName: String,
maxModifiersSpecMessageSize: Int,
Expand Down
Loading

0 comments on commit cf632ab

Please sign in to comment.