From 9e47c1db6a45f7330bea63b50f3cebe5b0196b7b Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 22 Sep 2023 11:20:59 -0400 Subject: [PATCH 1/7] sim-lib: rename generate activity in preparation for refactor --- sim-lib/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 340b7764..53844c99 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -369,7 +369,8 @@ impl Simulation { // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that // has been configured, passing in the channel that is used to notify data collection that events have been // generated. - self.generate_activity(event_sender, &mut tasks).await?; + self.dispatch_activity_producers(event_sender, &mut tasks) + .await?; if let Some(total_time) = self.total_time { let t = self.shutdown_trigger.clone(); @@ -433,7 +434,7 @@ impl Simulation { log::debug!("Simulator data collection set up."); } - async fn generate_activity( + async fn dispatch_activity_producers( &self, output_sender: Sender, tasks: &mut JoinSet<()>, From 0c8cba2d133083209528bc2b3c80fab310198653 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 22 Sep 2023 11:37:05 -0400 Subject: [PATCH 2/7] sim-lib: separate consumer and producer spin up into dedicated methods When we add random activity generation, we're going to want consumers for every node that we have execution on. Previously, we only ran consumers for nodes that are listed as the source in activity description. This change separates consumer and producer spin up and allows the caller to specify which nodes to run consumers for. --- sim-lib/src/lib.rs | 70 ++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 53844c99..335a5a20 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -366,11 +366,21 @@ impl Simulation { let (event_sender, event_receiver) = channel(1); self.run_data_collection(event_receiver, &mut tasks); + // Create consumers for every source node that is listed in our activity. + let collecting_nodes = self + .activity + .iter() + .map(|activity| activity.source) + .collect(); + + let producer_senders = + self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks); + // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that // has been configured, passing in the channel that is used to notify data collection that events have been // generated. - self.dispatch_activity_producers(event_sender, &mut tasks) - .await?; + self.dispatch_activity_producers(producer_senders, &mut tasks) + .await; if let Some(total_time) = self.total_time { let t = self.shutdown_trigger.clone(); @@ -434,29 +444,26 @@ impl Simulation { log::debug!("Simulator data collection set up."); } - async fn dispatch_activity_producers( + /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation + /// has already ensured that we have execution on every nodes listed in consuming_nodes. + fn dispatch_consumers( &self, + consuming_nodes: HashSet, output_sender: Sender, tasks: &mut JoinSet<()>, - ) -> Result<(), SimulationError> { - let shutdown = self.shutdown_trigger.clone(); - let listener = self.shutdown_listener.clone(); - - // Before we start the simulation, we'll spin up the infrastructure that we need to record data: - // We only need to spin up producers for nodes that are contained in our activity description, as there will be - // no events for nodes that are not source nodes. - let mut producer_channels = HashMap::new(); - - for (id, node) in self.nodes.iter().filter(|(pk, _)| { - self.activity - .iter() - .map(|a| a.source) - .collect::>() - .contains(pk) - }) { - // For each active node, we'll create a sender and receiver channel to produce and consumer - // events. We do not buffer channels as we expect events to clear quickly. + ) -> HashMap> { + let mut channels = HashMap::new(); + + for (id, node) in self + .nodes + .iter() + .filter(|(id, _)| consuming_nodes.contains(id)) + { + // For each node we have execution on, we'll create a sender and receiver channel to produce and consumer + // events and insert producer in our tracking map. We do not buffer channels as we expect events to clear + // quickly. let (sender, receiver) = channel(1); + channels.insert(*id, sender.clone()); // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull // events from and the results sender to report the events it has triggered for further monitoring. @@ -464,25 +471,28 @@ impl Simulation { node.clone(), receiver, output_sender.clone(), - shutdown.clone(), + self.shutdown_trigger.clone(), )); - - // Add the producer channel to our map so that various activity descriptions can use it. We may have multiple - // activity descriptions that have the same source node. - producer_channels.insert(id, sender); } + channels + } + + /// Responsible for spinning up producers for a set of activity descriptions. + async fn dispatch_activity_producers( + &self, + producer_channels: HashMap>, + tasks: &mut JoinSet<()>, + ) { for description in self.activity.iter() { let sender_chan = producer_channels.get(&description.source).unwrap(); tasks.spawn(produce_events( *description, sender_chan.clone(), - shutdown.clone(), - listener.clone(), + self.shutdown_trigger.clone(), + self.shutdown_listener.clone(), )); } - - Ok(()) } } From 36517f543e7bd7bc21d45250f7e748dabe17262d Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 25 Sep 2023 14:19:23 -0400 Subject: [PATCH 3/7] multi: surface listchannels on lightning node --- sim-lib/src/cln.rs | 40 ++++++++++++++++++++++++++++++++++++++-- sim-lib/src/lib.rs | 5 +++++ sim-lib/src/lnd.rs | 21 ++++++++++++++++++++- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/sim-lib/src/cln.rs b/sim-lib/src/cln.rs index fd772b8f..dd4e75d3 100644 --- a/sim-lib/src/cln.rs +++ b/sim-lib/src/cln.rs @@ -2,8 +2,8 @@ use async_trait::async_trait; use bitcoin::secp256k1::PublicKey; use cln_grpc::pb::{ listpays_pays::ListpaysPaysStatus, node_client::NodeClient, Amount, GetinfoRequest, - GetinfoResponse, KeysendRequest, KeysendResponse, ListnodesRequest, ListpaysRequest, - ListpaysResponse, + GetinfoResponse, KeysendRequest, KeysendResponse, ListchannelsRequest, ListnodesRequest, + ListpaysRequest, ListpaysResponse, }; use lightning::ln::features::NodeFeatures; use lightning::ln::PaymentHash; @@ -85,6 +85,36 @@ impl ClnNode { }, }) } + + /// Fetch channels belonging to the local node, initiated locally if is_source is true, and initiated remotely if + /// is_source is false. Introduced as a helper function because CLN doesn't have a single API to list all of our + /// node's channels. + async fn node_channels(&mut self, is_source: bool) -> Result, LightningError> { + let req = if is_source { + ListchannelsRequest { + source: Some(self.info.pubkey.serialize().to_vec()), + ..Default::default() + } + } else { + ListchannelsRequest { + destination: Some(self.info.pubkey.serialize().to_vec()), + ..Default::default() + } + }; + + let resp = self + .client + .list_channels(req) + .await + .map_err(|err| LightningError::ListChannelsError(err.to_string()))? + .into_inner(); + + Ok(resp + .channels + .into_iter() + .map(|channel| channel.amount_msat.unwrap_or_default().msat) + .collect()) + } } #[async_trait] @@ -198,6 +228,12 @@ impl LightningNode for ClnNode { )) } } + + async fn list_channels(&mut self) -> Result, LightningError> { + let mut node_channels = self.node_channels(true).await?; + node_channels.extend(self.node_channels(false).await?); + Ok(node_channels) + } } async fn reader(filename: &str) -> Result, Error> { diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 335a5a20..9c401966 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -101,6 +101,8 @@ pub enum LightningError { ValidationError(String), #[error("Permanent error: {0:?}")] PermanentError(String), + #[error("List channels error: {0}")] + ListChannelsError(String), } #[derive(Debug, Clone)] @@ -130,6 +132,9 @@ pub trait LightningNode { ) -> Result; /// Gets the list of features of a given node async fn get_node_features(&mut self, node: PublicKey) -> Result; + /// Lists all channels, at present only returns a vector of channel capacities in msat because no further + /// information is required. + async fn list_channels(&mut self) -> Result, LightningError>; } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/sim-lib/src/lnd.rs b/sim-lib/src/lnd.rs index 50e5ca41..ef958807 100644 --- a/sim-lib/src/lnd.rs +++ b/sim-lib/src/lnd.rs @@ -10,7 +10,7 @@ use bitcoin::secp256k1::PublicKey; use lightning::ln::features::NodeFeatures; use lightning::ln::{PaymentHash, PaymentPreimage}; use tonic_lnd::lnrpc::{payment::PaymentStatus, GetInfoRequest, GetInfoResponse}; -use tonic_lnd::lnrpc::{NodeInfoRequest, PaymentFailureReason}; +use tonic_lnd::lnrpc::{ListChannelsRequest, NodeInfoRequest, PaymentFailureReason}; use tonic_lnd::routerrpc::TrackPaymentRequest; use tonic_lnd::tonic::Code::Unavailable; use tonic_lnd::tonic::Status; @@ -210,6 +210,25 @@ impl LightningNode for LndNode { )) } } + + async fn list_channels(&mut self) -> Result, LightningError> { + let channels = self + .client + .lightning() + .list_channels(ListChannelsRequest { + ..Default::default() + }) + .await + .map_err(|err| LightningError::ListChannelsError(err.to_string()))? + .into_inner(); + + // Capacity is returned in satoshis, so we convert to msat. + Ok(channels + .channels + .iter() + .map(|channel| 1000 * channel.capacity as u64) + .collect()) + } } fn string_to_payment_hash(hash: &str) -> Result { From f4a1a49146611e0bdd14c1d71c5904cd31cddac2 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 25 Sep 2023 11:19:24 -0400 Subject: [PATCH 4/7] multi: add network capacity based generator When we want to simulate random activity in large network, keeping track of our graph will be the most expensive part of the simulation. This is because we'll want to cache node capacities for easy lookup, and because a weighted index has a large memory footprint (N -1 u64s for our use). To minimize the cost of tracking these values, we use a single NetworkGraphView tracker that will provide expensive operations in a single location. --- Cargo.lock | 27 ++++++++++++ sim-lib/Cargo.toml | 1 + sim-lib/src/lib.rs | 9 ++++ sim-lib/src/random_activity.rs | 80 ++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 sim-lib/src/random_activity.rs diff --git a/Cargo.lock b/Cargo.lock index 68dd10d2..8296001a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,6 +864,12 @@ version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +[[package]] +name = "libm" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" + [[package]] name = "lightning" version = "0.0.116" @@ -966,6 +972,16 @@ dependencies = [ "libc", ] +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", + "libm", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -1295,6 +1311,16 @@ dependencies = [ "getrandom 0.2.10", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -1588,6 +1614,7 @@ dependencies = [ "log", "mpsc", "rand", + "rand_distr", "serde", "serde_json", "serde_millis", diff --git a/sim-lib/Cargo.toml b/sim-lib/Cargo.toml index 2b5f24b9..c4669c24 100644 --- a/sim-lib/Cargo.toml +++ b/sim-lib/Cargo.toml @@ -25,3 +25,4 @@ rand = "0.8.5" hex = "0.4.3" csv = "1.2.2" serde_millis = "0.1.1" +rand_distr = "0.4.3" diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 9c401966..9cd66469 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -19,6 +19,7 @@ use triggered::{Listener, Trigger}; pub mod cln; pub mod lnd; +mod random_activity; mod serializers; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -80,6 +81,8 @@ pub enum SimulationError { CsvError(#[from] csv::Error), #[error("File Error")] FileError, + #[error("Random activity Error: {0}")] + RandomActivityError(String), } // Phase 2: Event Queue @@ -137,6 +140,12 @@ pub trait LightningNode { async fn list_channels(&mut self) -> Result, LightningError>; } +pub trait NetworkGenerator { + // sample_node_by_capacity randomly picks a node within the network weighted by its capacity deployed to the + // network in channels. It returns the node's public key and its capacity in millisatoshis. + fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64); +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PaymentResult { pub htlc_count: usize, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs new file mode 100644 index 00000000..30539f76 --- /dev/null +++ b/sim-lib/src/random_activity.rs @@ -0,0 +1,80 @@ +use core::fmt; +use std::collections::HashMap; +use std::fmt::Display; + +use bitcoin::secp256k1::PublicKey; +use rand_distr::{Distribution, WeightedIndex}; + +use crate::{NetworkGenerator, SimulationError}; + +/// NetworkGraphView maintains a view of the network graph that can be used to pick nodes by their deployed liquidity +/// and track node capacity within the network. Tracking nodes in the network is memory-expensive, so we use a single +/// tracker for the whole network (in an unbounded environment, we'd make one _per_ node generating random activity, +/// which has a view of the full network except for itself). +pub struct NetworkGraphView { + node_picker: WeightedIndex, + nodes: Vec<(PublicKey, u64)>, +} + +impl NetworkGraphView { + // Creates a network view for the map of node public keys to capacity (in millisatoshis) provided. Returns an error + // if any node's capacity is zero (the node cannot receive), or there are not at least two nodes (one node can't + // send to itself). + pub fn new(node_capacities: HashMap) -> Result { + if node_capacities.len() < 2 { + return Err(SimulationError::RandomActivityError( + "at least two nodes required for activity generation".to_string(), + )); + } + + if node_capacities.values().any(|v| *v == 0) { + return Err(SimulationError::RandomActivityError( + "network generator created with zero capacity node".to_string(), + )); + } + + // To create a weighted index we're going to need a vector of nodes that we index and weights that are set + // by their deployed capacity. To efficiently store our view of nodes capacity, we're also going to store + // capacity along with the node pubkey because we query the two at the same time. Zero capacity nodes are + // filtered out because they have no chance of being selected (and wont' be able to receive payments). + let nodes = node_capacities.iter().map(|(k, v)| (*k, *v)).collect(); + + let node_picker = WeightedIndex::new(node_capacities.into_values().collect::>()) + .map_err(|e| SimulationError::RandomActivityError(e.to_string()))?; + + Ok(NetworkGraphView { node_picker, nodes }) + } +} + +impl NetworkGenerator for NetworkGraphView { + /// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's + /// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with + /// very small graphs, or those with one node significantly more capitalized than others). + fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64) { + let mut rng = rand::thread_rng(); + + // While it's very unlikely that we can't pick a destination that is not our source, it's possible that there's + // a bug in our selection, so we track attempts to select a non-source node so that we can warn if this takes + // improbably long. + let mut i = 1; + loop { + let index = self.node_picker.sample(&mut rng); + let destination = self.nodes[index]; + + if destination.0 != source { + return destination; + } + + if i % 50 == 0 { + log::warn!("Unable to select a destination for: {source} after {i} attempts. Please report a bug!") + } + i += 1 + } + } +} + +impl Display for NetworkGraphView { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "network graph view with: {} channels", self.nodes.len()) + } +} From 07181a5a9667ba34bba244131dea8da5c71e4cb1 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 22 Sep 2023 14:48:13 -0400 Subject: [PATCH 5/7] sim-lib: add activity generator trait and implementation --- sim-lib/src/lib.rs | 8 ++ sim-lib/src/random_activity.rs | 148 ++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 9cd66469..46f3146d 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -146,6 +146,14 @@ pub trait NetworkGenerator { fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64); } +pub trait PaymentGenerator { + // Returns the number of seconds that a node should wait until firing its next payment. + fn next_payment_wait(&self) -> time::Duration; + + // Returns a payment amount based on the capacity of the sending and receiving node. + fn payment_amount(&self, destination_capacity: u64) -> Result; +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PaymentResult { pub htlc_count: usize, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 30539f76..a7508032 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -3,9 +3,13 @@ use std::collections::HashMap; use std::fmt::Display; use bitcoin::secp256k1::PublicKey; -use rand_distr::{Distribution, WeightedIndex}; +use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex}; +use std::time::Duration; -use crate::{NetworkGenerator, SimulationError}; +use crate::{NetworkGenerator, PaymentGenerator, SimulationError}; + +const HOURS_PER_MONTH: u64 = 30 * 24; +const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60; /// NetworkGraphView maintains a view of the network graph that can be used to pick nodes by their deployed liquidity /// and track node capacity within the network. Tracking nodes in the network is memory-expensive, so we use a single @@ -78,3 +82,143 @@ impl Display for NetworkGraphView { write!(f, "network graph view with: {} channels", self.nodes.len()) } } + +/// PaymentActivityGenerator manages generation of random payments for an individual node. For some multiplier of the +/// node's capacity, it will produce payments such that the node sends multiplier * capacity over a calendar month. +/// While the expected amount to be sent in a month and the mean payment amount are set, the generator will introduce +/// randomness both in the time between events and the variance of payment amounts sent to mimic more realistic +/// payment flows. +pub struct PaymentActivityGenerator { + multiplier: f64, + expected_payment_amt: u64, + source_capacity: u64, + event_dist: Exp, +} + +impl PaymentActivityGenerator { + /// Creates a new activity generator for a node, returning an error if the node has insufficient capacity deployed + /// for the expected payment amount provided. Capacity is defined as the sum of the channels that the node has + /// open in the network divided by two (so that capacity is not double counted with channel counterparties). + pub fn new( + source_capacity_msat: u64, + expected_payment_amt: u64, + multiplier: f64, + ) -> Result { + PaymentActivityGenerator::validate_capacity(source_capacity_msat, expected_payment_amt)?; + + // Lamda for the exponential distribution that we'll use to randomly time events is equal to the number of + // events that we expect to see within our set period. + let lamda = events_per_month(source_capacity_msat, multiplier, expected_payment_amt) + / (SECONDS_PER_MONTH as f64); + + let event_dist = + Exp::new(lamda).map_err(|e| SimulationError::RandomActivityError(e.to_string()))?; + + Ok(PaymentActivityGenerator { + multiplier, + expected_payment_amt, + source_capacity: source_capacity_msat, + event_dist, + }) + } + + /// Validates that the generator will be able to generate payment amounts based on the node's capacity and the + /// simulation's expected payment amount. + pub fn validate_capacity( + node_capacity_msat: u64, + expected_payment_amt: u64, + ) -> Result<(), SimulationError> { + // We will not be able to generate payments if the variance of sigma squared for our log normal distribution + // is < 0 (because we have to take a square root). + // + // Sigma squared is calculated as: 2(ln(payment_limit) - ln(expected_payment_amt)) + // Where: payment_limit = node_capacity_msat / 2. + // + // Therefore we can only process payments if: 2(ln(payment_limit) - ln(expected_payment_amt)) >= 0 + // ln(payment_limit) >= ln(expected_payment_amt) + // e^(ln(payment_limit) >= e^(ln(expected_payment_amt)) + // payment_limit >= expected_payment_amt + // node_capacity_msat / 2 >= expected_payment_amt + // node_capacity_msat >= 2 * expected_payment_amt + let min_required_capacity = 2 * expected_payment_amt; + if node_capacity_msat < min_required_capacity { + return Err(SimulationError::RandomActivityError(format!( + "node needs at least {} capacity (has: {}) to process expected payment amount: {}", + min_required_capacity, node_capacity_msat, expected_payment_amt + ))); + } + + Ok(()) + } +} + +/// Returns the number of events that the simulation expects the node to process per month based on its capacity, a +/// multiplier which expresses the capital efficiently of the network (how "much" it uses its deployed liquidity) and +/// the expected payment amount for the simulation. +/// +/// The total amount that we expect this node to send is capacity * multiplier, because the multiplier is the +/// expression of how many times a node sends its capacity within a month. For example: +/// - A multiplier of 0.5 indicates that the node processes half of its total channel capacity in sends in a month. +/// - A multiplier of 2 indicates that hte node processes twice of its total capacity in sends in a month. +/// +/// The number of sends that the simulation will dispatch for this node is simply the total amount that the node is +/// expected to send divided by the expected payment amount (how much we'll send on average) for the simulation. +fn events_per_month(source_capacity_msat: u64, multiplier: f64, expected_payment_amt: u64) -> f64 { + (source_capacity_msat as f64 * multiplier) / expected_payment_amt as f64 +} + +impl PaymentGenerator for PaymentActivityGenerator { + /// Returns the amount of time until the next payment should be scheduled for the node. + fn next_payment_wait(&self) -> Duration { + let mut rng = rand::thread_rng(); + Duration::from_secs(self.event_dist.sample(&mut rng) as u64) + } + + /// Returns the payment amount for a payment to a node with the destination capacity provided. The expected value + /// for the payment is the simulation expected payment amount, and the variance is determined by the channel + /// capacity of the source and destination node. Variance is calculated such that 95% of payment amounts generated + /// will fall between the expected payment amount and 50% of the capacity of the node with the least channel + /// capacity. While the expected value of payments remains the same, scaling variance by node capacity means that + /// nodes with more deployed capital will see a larger range of payment values than those with smaller total + /// channel capacity. + fn payment_amount(&self, destination_capacity: u64) -> Result { + let payment_limit = std::cmp::min(self.source_capacity, destination_capacity) / 2; + + let ln_pmt_amt = (self.expected_payment_amt as f64).ln(); + let ln_limit = (payment_limit as f64).ln(); + + let mu = 2.0 * ln_pmt_amt - ln_limit; + let sigma_square = 2.0 * (ln_limit - ln_pmt_amt); + + if sigma_square < 0.0 { + return Err(SimulationError::RandomActivityError(format!( + "payment amount not possible for limit: {payment_limit}, sigma squared: {sigma_square}" + ))); + } + + let log_normal = LogNormal::new(mu, sigma_square.sqrt()) + .map_err(|e| SimulationError::RandomActivityError(e.to_string()))?; + + let mut rng = rand::thread_rng(); + Ok(log_normal.sample(&mut rng) as u64) + } +} + +impl Display for PaymentActivityGenerator { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let monthly_events = events_per_month( + self.source_capacity, + self.multiplier, + self.expected_payment_amt, + ); + + write!( + f, + "activity generator for capacity: {} with multiplier {}: {} payments per month ({} per hour)", + self.source_capacity, + self.multiplier, + monthly_events, + monthly_events / HOURS_PER_MONTH as f64 + ) + } +} From 59e8d4913ed020e0dfc763d1473e18de92dae558 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 4 Oct 2023 13:31:19 -0400 Subject: [PATCH 6/7] sim-lib: add dispatch of random activity generation --- sim-lib/src/lib.rs | 185 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 173 insertions(+), 12 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 46f3146d..3bc41e26 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -13,15 +13,22 @@ use thiserror::Error; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinSet; -use tokio::time; -use tokio::time::Duration; +use tokio::{select, time, time::Duration}; use triggered::{Listener, Trigger}; +use self::random_activity::{NetworkGraphView, PaymentActivityGenerator}; + pub mod cln; pub mod lnd; mod random_activity; mod serializers; +/// The default expected payment amount for the simulation, around ~$10 at the time of writing. +pub const EXPECTED_PAYMENT_AMOUNT: u64 = 3_800_000; + +/// The number of times over each node in the network sends its total deployed capacity in a calendar month. +pub const ACTIVITY_MULTIPLIER: f64 = 2.0; + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum NodeConnection { #[serde(alias = "lnd", alias = "Lnd")] @@ -265,6 +272,11 @@ pub struct Simulation { total_time: Option, /// The number of activity results to batch before printing in CSV. print_batch_size: u32, + /// The expected payment size for the network. + expected_payment_msat: u64, + /// The number of times that the network sends its total capacity in a month of operation when generating random + /// activity. + activity_multiplier: f64, } const DEFAULT_PRINT_BATCH_SIZE: u32 = 500; @@ -284,6 +296,8 @@ impl Simulation { shutdown_listener, total_time: total_time.map(|x| Duration::from_secs(x as u64)), print_batch_size: print_batch_size.unwrap_or(DEFAULT_PRINT_BATCH_SIZE), + expected_payment_msat: EXPECTED_PAYMENT_AMOUNT, + activity_multiplier: ACTIVITY_MULTIPLIER, } } @@ -388,21 +402,39 @@ impl Simulation { let (event_sender, event_receiver) = channel(1); self.run_data_collection(event_receiver, &mut tasks); - // Create consumers for every source node that is listed in our activity. - let collecting_nodes = self - .activity - .iter() - .map(|activity| activity.source) - .collect(); + // Create consumers for every source node when dealing with activity descriptions, or only for nodes with + // sufficient capacity if generating random activity. Since we have to query the capacity of every node + // in our network for picking random activity nodes, we cache this value here to be used later when we spin + // up producers. + let mut random_activity_nodes = HashMap::new(); + let collecting_nodes = if !self.activity.is_empty() { + self.activity + .iter() + .map(|activity| activity.source) + .collect() + } else { + random_activity_nodes.extend(self.random_activity_nodes().await?); + random_activity_nodes.keys().cloned().collect() + }; let producer_senders = self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks); // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that - // has been configured, passing in the channel that is used to notify data collection that events have been - // generated. - self.dispatch_activity_producers(producer_senders, &mut tasks) - .await; + // has been configured (if any), passing in the channel that is used to notify data collection that events have + // been generated. Alternatively, we'll generate random activity if there is no activity specified. + if !self.activity.is_empty() { + self.dispatch_activity_producers(producer_senders, &mut tasks) + .await; + } else { + log::info!( + "Generating random activity with multiplier: {}, average payment amount: {}.", + self.activity_multiplier, + self.expected_payment_msat + ); + self.dispatch_random_producers(random_activity_nodes, producer_senders, &mut tasks) + .await?; + } if let Some(total_time) = self.total_time { let t = self.shutdown_trigger.clone(); @@ -466,6 +498,29 @@ impl Simulation { log::debug!("Simulator data collection set up."); } + /// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes + /// that have sufficient capacity to generate payments of our expected payment amount. + async fn random_activity_nodes(&self) -> Result, SimulationError> { + // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to + // avoid double counting capacity (as each node has a counterparty in the channel). + let mut node_capacities = HashMap::new(); + for (pk, node) in self.nodes.iter() { + let chan_capacity = node.lock().await.list_channels().await?.iter().sum::(); + + if let Err(e) = PaymentActivityGenerator::validate_capacity( + chan_capacity, + self.expected_payment_msat, + ) { + log::warn!("Node: {} not eligible for activity generation: {e}.", *pk); + continue; + } + + node_capacities.insert(*pk, chan_capacity / 2); + } + + Ok(node_capacities) + } + /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation /// has already ensured that we have execution on every nodes listed in consuming_nodes. fn dispatch_consumers( @@ -516,6 +571,52 @@ impl Simulation { )); } } + + /// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are + /// provided for each node represented in producer channels. + async fn dispatch_random_producers( + &self, + node_capacities: HashMap, + producer_channels: HashMap>, + tasks: &mut JoinSet<()>, + ) -> Result<(), SimulationError> { + let network_generator = + Arc::new(Mutex::new(NetworkGraphView::new(node_capacities.clone())?)); + + log::info!( + "Created network generator: {}.", + network_generator.lock().await + ); + + for (pk, sender) in producer_channels.into_iter() { + let source_capacity = match node_capacities.get(&pk) { + Some(capacity) => *capacity, + None => { + return Err(SimulationError::RandomActivityError(format!( + "Random activity generator run for: {} with unknown capacity.", + pk + ))); + } + }; + + let node_generator = PaymentActivityGenerator::new( + source_capacity, + self.expected_payment_msat, + self.activity_multiplier, + )?; + + tasks.spawn(produce_random_events( + pk, + network_generator.clone(), + node_generator, + sender.clone(), + self.shutdown_trigger.clone(), + self.shutdown_listener.clone(), + )); + } + + Ok(()) + } } // consume_events processes events that are crated for a lightning node that we can execute events on. Any output @@ -637,6 +738,66 @@ async fn produce_events( shutdown.trigger(); } +async fn produce_random_events( + source: PublicKey, + network_generator: Arc>, + node_generator: A, + sender: Sender, + shutdown: Trigger, + listener: Listener, +) { + log::info!("Started random activity producer for {source}: {node_generator}."); + + loop { + let wait = node_generator.next_payment_wait(); + log::debug!("Next payment for {source} in {:?} seconds.", wait); + + select! { + biased; + _ = listener.clone() => { + log::debug!("Random activity generator for {source} received signal to shut down."); + break; + }, + // Wait until our time to next payment has elapsed then execute a random amount payment to a random + // destination. + _ = time::sleep(wait) => { + let destination = network_generator.lock().await.sample_node_by_capacity(source); + + // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get + // a payment amount something has gone wrong (because we should have validated that we can always + // generate amounts), so we exit. + let amount = match node_generator.payment_amount(destination.1) { + Ok(amt) => { + if amt == 0 { + log::debug!("Skipping zero amount payment for {source} -> {}.", destination.0); + continue; + } + amt + }, + Err(e) => { + log::error!("Could not get amount for {source} -> {}: {e}. Please report a bug!", destination.0); + break; + }, + }; + + log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination.0); + + // Send the payment, exiting if we can no longer send to the consumer. + let event = SimulationEvent::SendPayment(destination.0, amount); + if let Err(e) = sender.send(event).await { + log::debug!( + "Stopped random producer for {amount}: {source} -> {}. Consumer error: {e}.", destination.0, + ); + break; + } + }, + } + } + + log::debug!("Stopped random activity producer {source}."); + shutdown.trigger(); +} + async fn consume_simulation_results( receiver: Receiver<(Payment, PaymentResult)>, listener: Listener, From 46566b2bb29c3a76499809ac72fb06c3c31401a5 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 25 Sep 2023 13:28:19 -0400 Subject: [PATCH 7/7] multi: allow empty activity description and validate random activity Allow parsing of empty activity descriptions to facilitate default run with random activity generation. We still use a vector in the Simulation struct to allow a path where we have a combination of random and specific activity. This commit also adds validation that we have at least two nodes when running random activity. This is required because destination nodes are chosen from within the nodes that we have execution on (to prevent liquidity draining and eventual death spirals). --- sim-lib/src/lib.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 3bc41e26..6806f8bc 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -62,7 +62,7 @@ pub struct ClnConnection { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Config { pub nodes: Vec, - pub activity: Vec, + pub activity: Option>, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -284,14 +284,14 @@ const DEFAULT_PRINT_BATCH_SIZE: u32 = 500; impl Simulation { pub fn new( nodes: HashMap>>, - activity: Vec, + activity: Option>, total_time: Option, print_batch_size: Option, ) -> Self { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); Self { nodes, - activity, + activity: activity.unwrap_or_default(), shutdown_trigger, shutdown_listener, total_time: total_time.map(|x| Duration::from_secs(x as u64)), @@ -302,8 +302,15 @@ impl Simulation { } /// validate_activity validates that the user-provided activity description is achievable for the network that - /// we're working with. + /// we're working with. If no activity description is provided, then it ensures that we have configured a network + /// that is suitable for random activity generation. async fn validate_activity(&self) -> Result<(), LightningError> { + if self.activity.is_empty() && self.nodes.len() <= 1 { + return Err(LightningError::ValidationError( + "At least two nodes required for random activity generation.".to_string(), + )); + } + for payment_flow in self.activity.iter() { // We need every source node that is configured to execute some activity to be included in our set of // nodes so that we can execute events on it.