From 99d398dd036b0016f67b4a6c7815bf523121657b Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 25 Sep 2023 20:51:11 +0000 Subject: [PATCH 1/6] sim-cli: simplifies main.rs by deduplicating the node creation Node creation logic was mostly duplicated, creates a node based on LightningNode instead of on its implementations to reduce the code replication. I'd be nice the be able to do this with a different container than Arc>, since that will save us from having to acquire the mutex after creating `node`. However, I don't know of any other container that is is send and which size is known at compile time. We may need to ask this to some elder rust wizards --- sim-cli/src/main.rs | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index 7bbcb5c2..e7c1d56a 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -40,33 +40,23 @@ async fn main() -> anyhow::Result<()> { let mut clients: HashMap>> = HashMap::new(); for connection in nodes { - // TODO: We should simplify this into two minimal branches plus shared logging and inserting into the list - match connection { - NodeConnection::LND(c) => { - let node_id = c.id; - let lnd = LndNode::new(c).await?; + // TODO: Feels like there should be a better way of doing this without having to Arc>> it at this time. + // Box sort of works, but we won't know the size of the dyn LightningNode at compile time so the compiler will + // scream at us when trying to create the Arc> later on while adding the node to the clients map + let node: Arc> = match connection { + NodeConnection::LND(c) => Arc::new(Mutex::new(LndNode::new(c).await?)), + NodeConnection::CLN(c) => Arc::new(Mutex::new(ClnNode::new(c).await?)), + }; - log::info!( - "Connected to {} - Node ID: {}.", - lnd.get_info().alias, - lnd.get_info().pubkey - ); + let node_info = node.lock().await.get_info().clone(); - clients.insert(node_id, Arc::new(Mutex::new(lnd))); - } - NodeConnection::CLN(c) => { - let node_id = c.id; - let cln = ClnNode::new(c).await?; + log::info!( + "Connected to {} - Node ID: {}.", + node_info.alias, + node_info.pubkey + ); - log::info!( - "Connected to {} - Node ID: {}.", - cln.get_info().alias, - cln.get_info().pubkey - ); - - clients.insert(node_id, Arc::new(Mutex::new(cln))); - } - } + clients.insert(node_info.pubkey, node); } let sim = Simulation::new(clients, activity, cli.total_time, cli.print_batch_size); From 204772a6662376852d4c4f8308caad9695fbd5f5 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 26 Sep 2023 10:46:46 -0400 Subject: [PATCH 2/6] sim-lib: refactors lib to place connection info to its respective node file --- sim-lib/src/cln.rs | 17 ++++++++++++++--- sim-lib/src/lib.rs | 26 ++------------------------ sim-lib/src/lnd.rs | 15 ++++++++++++--- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/sim-lib/src/cln.rs b/sim-lib/src/cln.rs index de173980..f5875760 100644 --- a/sim-lib/src/cln.rs +++ b/sim-lib/src/cln.rs @@ -9,15 +9,26 @@ use cln_grpc::pb::{ use lightning::ln::features::NodeFeatures; use lightning::ln::PaymentHash; +use serde::{Deserialize, Serialize}; use tokio::fs::File; use tokio::io::{AsyncReadExt, Error}; use tokio::time::{self, Duration}; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; use triggered::Listener; -use crate::{ - ClnConnection, LightningError, LightningNode, NodeInfo, PaymentOutcome, PaymentResult, -}; +use crate::{serializers, LightningError, LightningNode, NodeInfo, PaymentOutcome, PaymentResult}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ClnConnection { + pub id: PublicKey, + pub address: String, + #[serde(deserialize_with = "serializers::deserialize_path")] + pub ca_cert: String, + #[serde(deserialize_with = "serializers::deserialize_path")] + pub client_cert: String, + #[serde(deserialize_with = "serializers::deserialize_path")] + pub client_key: String, +} pub struct ClnNode { pub client: NodeClient, diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index ce6f302b..a8b2348f 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -33,31 +33,9 @@ pub const ACTIVITY_MULTIPLIER: f64 = 2.0; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum NodeConnection { #[serde(alias = "lnd", alias = "Lnd")] - LND(LndConnection), + LND(lnd::LndConnection), #[serde(alias = "cln", alias = "Cln")] - CLN(ClnConnection), -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LndConnection { - pub id: PublicKey, - pub address: String, - #[serde(deserialize_with = "serializers::deserialize_path")] - pub macaroon: String, - #[serde(deserialize_with = "serializers::deserialize_path")] - pub cert: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ClnConnection { - pub id: PublicKey, - pub address: String, - #[serde(deserialize_with = "serializers::deserialize_path")] - pub ca_cert: String, - #[serde(deserialize_with = "serializers::deserialize_path")] - pub client_cert: String, - #[serde(deserialize_with = "serializers::deserialize_path")] - pub client_key: String, + CLN(cln::ClnConnection), } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/sim-lib/src/lnd.rs b/sim-lib/src/lnd.rs index 61532920..0b50556b 100644 --- a/sim-lib/src/lnd.rs +++ b/sim-lib/src/lnd.rs @@ -1,15 +1,14 @@ use std::collections::HashSet; use std::{collections::HashMap, str::FromStr}; -use crate::{ - LightningError, LightningNode, LndConnection, NodeInfo, PaymentOutcome, PaymentResult, -}; +use crate::{serializers, LightningError, LightningNode, NodeInfo, PaymentOutcome, PaymentResult}; use async_trait::async_trait; use bitcoin::hashes::{sha256, Hash}; use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use lightning::ln::features::NodeFeatures; use lightning::ln::{PaymentHash, PaymentPreimage}; +use serde::{Deserialize, Serialize}; use tonic_lnd::lnrpc::{payment::PaymentStatus, GetInfoRequest, GetInfoResponse}; use tonic_lnd::lnrpc::{ListChannelsRequest, NodeInfoRequest, PaymentFailureReason}; use tonic_lnd::routerrpc::TrackPaymentRequest; @@ -21,6 +20,16 @@ use triggered::Listener; const KEYSEND_KEY: u64 = 5482373484; const SEND_PAYMENT_TIMEOUT_SECS: i32 = 300; +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LndConnection { + pub id: PublicKey, + pub address: String, + #[serde(deserialize_with = "serializers::deserialize_path")] + pub macaroon: String, + #[serde(deserialize_with = "serializers::deserialize_path")] + pub cert: String, +} + pub struct LndNode { client: Client, info: NodeInfo, From ff1ddd56944962f637493b77ed553eb5429d7a6a Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 15 Sep 2023 16:19:28 -0400 Subject: [PATCH 3/6] sim-all: Allows aliases within node definition in the config file Crates a new enum, `NodeId`, which is used to parse the node identifier from the config file to either a `PublicKey` or an `Alias`. Internally, we will identify nodes as `PublicKey`s, so we add do further checks when creating {Cln, Lnd}Connection. Moreover, we make sure that the provided variant matches the one from the backend. In case of a `PublicKey`, if the assertion fails the code will return a `ValidationError`. In the case of an `Alias`, it will simply log a warning. The rationale for not failing if the provided alias does not match the one from the backend is that we want to allow user to identify nodes using whatever name they rather (alice/bob/carol/..., n1, n2, n3, ...), so there is not strong requirement for aliases to match (but it feels right to let the user know, just in case). Finally, if an identifier is used more than once, the simulation will also fail with a `ValidationError`, independently of the identifier variant. --- sim-cli/src/main.rs | 20 +++++++++++++++++++- sim-lib/src/cln.rs | 38 +++++++++++++++++++++++--------------- sim-lib/src/lib.rs | 26 ++++++++++++++++++++++++++ sim-lib/src/lnd.rs | 25 ++++++++++++++++--------- sim-lib/src/serializers.rs | 31 ++++++++++++++++++++++++++++++- 5 files changed, 114 insertions(+), 26 deletions(-) diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index e7c1d56a..a1ab5baa 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -6,7 +6,9 @@ use tokio::sync::Mutex; use clap::Parser; use log::LevelFilter; -use sim_lib::{cln::ClnNode, lnd::LndNode, Config, LightningNode, NodeConnection, Simulation}; +use sim_lib::{ + cln::ClnNode, lnd::LndNode, Config, LightningError, LightningNode, NodeConnection, Simulation, +}; use simple_logger::SimpleLogger; #[derive(Parser)] @@ -38,6 +40,7 @@ async fn main() -> anyhow::Result<()> { let Config { nodes, activity } = serde_json::from_str(&config_str)?; let mut clients: HashMap>> = HashMap::new(); + let mut alias_node_map = HashMap::new(); for connection in nodes { // TODO: Feels like there should be a better way of doing this without having to Arc>> it at this time. @@ -56,6 +59,21 @@ async fn main() -> anyhow::Result<()> { node_info.pubkey ); + if clients.contains_key(&node_info.pubkey) { + anyhow::bail!(LightningError::ValidationError(format!( + "duplicated node: {}.", + node_info.pubkey + ))); + } + + if alias_node_map.contains_key(&node_info.alias) { + anyhow::bail!(LightningError::ValidationError(format!( + "duplicated node: {}.", + node_info.alias + ))); + } + + alias_node_map.insert(node_info.alias.clone(), node_info.pubkey); clients.insert(node_info.pubkey, node); } diff --git a/sim-lib/src/cln.rs b/sim-lib/src/cln.rs index f5875760..f0d27a91 100644 --- a/sim-lib/src/cln.rs +++ b/sim-lib/src/cln.rs @@ -3,8 +3,8 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use cln_grpc::pb::{ listpays_pays::ListpaysPaysStatus, node_client::NodeClient, Amount, GetinfoRequest, - GetinfoResponse, KeysendRequest, KeysendResponse, ListchannelsRequest, ListnodesRequest, - ListpaysRequest, ListpaysResponse, + KeysendRequest, KeysendResponse, ListchannelsRequest, ListnodesRequest, ListpaysRequest, + ListpaysResponse, }; use lightning::ln::features::NodeFeatures; use lightning::ln::PaymentHash; @@ -16,11 +16,14 @@ use tokio::time::{self, Duration}; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; use triggered::Listener; -use crate::{serializers, LightningError, LightningNode, NodeInfo, PaymentOutcome, PaymentResult}; +use crate::{ + serializers, LightningError, LightningNode, NodeId, NodeInfo, PaymentOutcome, PaymentResult, +}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ClnConnection { - pub id: PublicKey, + #[serde(with = "serializers::serde_node_id")] + pub id: NodeId, pub address: String, #[serde(deserialize_with = "serializers::deserialize_path")] pub ca_cert: String, @@ -67,16 +70,22 @@ impl ClnNode { })?, ); - let GetinfoResponse { - id, - alias, - our_features, - .. - } = client + let (id, mut alias, our_features) = client .getinfo(GetinfoRequest {}) .await - .map_err(|err| LightningError::GetInfoError(err.to_string()))? - .into_inner(); + .map(|r| { + let inner = r.into_inner(); + ( + inner.id, + inner.alias.unwrap_or_default(), + inner.our_features, + ) + }) + .map_err(|err| LightningError::GetInfoError(err.to_string()))?; + + let pubkey = PublicKey::from_slice(&id) + .map_err(|err| LightningError::GetInfoError(err.to_string()))?; + connection.id.validate(&pubkey, &mut alias)?; //FIXME: our_features is returning None, but it should not :S let features = if let Some(features) = our_features { @@ -88,10 +97,9 @@ impl ClnNode { Ok(Self { client, info: NodeInfo { - pubkey: PublicKey::from_slice(&id) - .map_err(|err| LightningError::GetInfoError(err.to_string()))?, + pubkey, features, - alias: alias.unwrap_or("".to_string()), + alias, }, }) } diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index a8b2348f..7155e135 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -38,6 +38,32 @@ pub enum NodeConnection { CLN(cln::ClnConnection), } +#[derive(Serialize, Debug, Clone)] +pub enum NodeId { + PublicKey(PublicKey), + Alias(String), +} + +impl NodeId { + pub fn validate(&self, node_id: &PublicKey, alias: &mut String) -> Result<(), LightningError> { + match self { + crate::NodeId::PublicKey(pk) => { + if pk != node_id { + return Err(LightningError::ValidationError(format!( + "the provided node id does not match the one returned by the backend ({} != {}).", pk, node_id))); + } + } + crate::NodeId::Alias(a) => { + if alias != a { + log::warn!("The provided alias does not match the one returned by the backend ({} != {}).", a, alias) + } + *alias = a.to_string(); + } + } + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Config { pub nodes: Vec, diff --git a/sim-lib/src/lnd.rs b/sim-lib/src/lnd.rs index 0b50556b..8fda8bdf 100644 --- a/sim-lib/src/lnd.rs +++ b/sim-lib/src/lnd.rs @@ -1,7 +1,9 @@ use std::collections::HashSet; use std::{collections::HashMap, str::FromStr}; -use crate::{serializers, LightningError, LightningNode, NodeInfo, PaymentOutcome, PaymentResult}; +use crate::{ + serializers, LightningError, LightningNode, NodeId, NodeInfo, PaymentOutcome, PaymentResult, +}; use async_trait::async_trait; use bitcoin::hashes::{sha256, Hash}; use bitcoin::secp256k1::PublicKey; @@ -22,7 +24,8 @@ const SEND_PAYMENT_TIMEOUT_SECS: i32 = 300; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LndConnection { - pub id: PublicKey, + #[serde(with = "serializers::serde_node_id")] + pub id: NodeId, pub address: String, #[serde(deserialize_with = "serializers::deserialize_path")] pub macaroon: String, @@ -54,15 +57,16 @@ fn parse_node_features(features: HashSet) -> NodeFeatures { } impl LndNode { - pub async fn new(conn_data: LndConnection) -> Result { - let mut client = tonic_lnd::connect(conn_data.address, conn_data.cert, conn_data.macaroon) - .await - .map_err(|err| LightningError::ConnectionError(err.to_string()))?; + pub async fn new(connection: LndConnection) -> Result { + let mut client = + tonic_lnd::connect(connection.address, connection.cert, connection.macaroon) + .await + .map_err(|err| LightningError::ConnectionError(err.to_string()))?; let GetInfoResponse { identity_pubkey, features, - alias, + mut alias, .. } = client .lightning() @@ -71,11 +75,14 @@ impl LndNode { .map_err(|err| LightningError::GetInfoError(err.to_string()))? .into_inner(); + let pubkey = PublicKey::from_str(&identity_pubkey) + .map_err(|err| LightningError::GetInfoError(err.to_string()))?; + connection.id.validate(&pubkey, &mut alias)?; + Ok(Self { client, info: NodeInfo { - pubkey: PublicKey::from_str(&identity_pubkey) - .map_err(|err| LightningError::GetInfoError(err.to_string()))?, + pubkey, features: parse_node_features(features.keys().cloned().collect()), alias, }, diff --git a/sim-lib/src/serializers.rs b/sim-lib/src/serializers.rs index c942e92c..3fd46fa7 100644 --- a/sim-lib/src/serializers.rs +++ b/sim-lib/src/serializers.rs @@ -2,7 +2,6 @@ use expanduser::expanduser; use serde::Deserialize; pub mod serde_option_payment_hash { - use lightning::ln::PaymentHash; pub fn serialize(hash: &Option, serializer: S) -> Result @@ -16,6 +15,36 @@ pub mod serde_option_payment_hash { } } +pub mod serde_node_id { + use super::*; + use std::str::FromStr; + + use crate::NodeId; + use bitcoin::secp256k1::PublicKey; + + pub fn serialize(id: &NodeId, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&match id { + NodeId::PublicKey(p) => p.to_string(), + NodeId::Alias(s) => s.to_string(), + }) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + if let Ok(pk) = PublicKey::from_str(&s) { + Ok(NodeId::PublicKey(pk)) + } else { + Ok(NodeId::Alias(s)) + } + } +} + pub fn deserialize_path<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, From b5020822bc6e76729c38cd142ed6d65b87d6f9c3 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 20 Sep 2023 11:42:36 +0000 Subject: [PATCH 4/6] sim-all: allows aliases to be used as source/destination of activities The approach followed for this is pretty straight-forward: instead of loading `PublicKey`s directly to our `ActivityDefinition` data structure, we create an intermediary class `ActivityParser` that accepts `NodeId`. However, we internally identify activities using `PublicKey`s, so we do an additional mapping step once the data is loaded to go from `ActivityParser` to `ActivityDefinition`, hence, once data is passed to the simulator it is always identified by `PublicKey`. For activity destinations, we also need to make sure that if an alias has been provided we do control that node, otherwise the Alias->PublicKey mapping cannot be done, in which case we fail with a `ValidationError`. --- sim-cli/src/main.rs | 43 +++++++++++++++++++++++-- sim-lib/src/lib.rs | 76 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 107 insertions(+), 12 deletions(-) diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index a1ab5baa..ace66c28 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -7,7 +7,8 @@ use tokio::sync::Mutex; use clap::Parser; use log::LevelFilter; use sim_lib::{ - cln::ClnNode, lnd::LndNode, Config, LightningError, LightningNode, NodeConnection, Simulation, + cln::ClnNode, lnd::LndNode, ActivityDefinition, Config, LightningError, LightningNode, + NodeConnection, NodeId, Simulation, }; use simple_logger::SimpleLogger; @@ -37,7 +38,10 @@ async fn main() -> anyhow::Result<()> { .unwrap(); let config_str = std::fs::read_to_string(cli.config)?; - let Config { nodes, activity } = serde_json::from_str(&config_str)?; + let Config { + nodes, + mut activity, + } = serde_json::from_str(&config_str)?; let mut clients: HashMap>> = HashMap::new(); let mut alias_node_map = HashMap::new(); @@ -77,7 +81,40 @@ async fn main() -> anyhow::Result<()> { clients.insert(node_info.pubkey, node); } - let sim = Simulation::new(clients, activity, cli.total_time, cli.print_batch_size); + let mut validated_activities = vec![]; + // Make all the activities identifiable by PK internally + for act in activity.iter_mut() { + // We can only map aliases to nodes we control, so if either the source or destination alias + // is not in alias_node_map, we fail + if let NodeId::Alias(a) = &act.source { + if let Some(pk) = alias_node_map.get(a) { + act.source = NodeId::PublicKey(*pk); + } else { + anyhow::bail!(LightningError::ValidationError(format!( + "activity source alias {} not found in nodes.", + act.source + ))); + } + } + if let NodeId::Alias(a) = &act.destination { + if let Some(pk) = alias_node_map.get(a) { + act.destination = NodeId::PublicKey(*pk); + } else { + anyhow::bail!(LightningError::ValidationError(format!( + "unknown activity destination: {}.", + act.destination + ))); + } + } + validated_activities.push(ActivityDefinition::try_from(act)?); + } + + let sim = Simulation::new( + clients, + validated_activities, + cli.total_time, + cli.print_batch_size, + ); let sim2 = sim.clone(); ctrlc::set_handler(move || { diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 7155e135..e628602b 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -62,27 +62,85 @@ impl NodeId { } Ok(()) } + + pub fn get_pk(&self) -> Result<&PublicKey, String> { + if let NodeId::PublicKey(pk) = self { + Ok(pk) + } else { + Err("NodeId is not a PublicKey".to_string()) + } + } +} + +impl std::fmt::Display for NodeId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + NodeId::PublicKey(pk) => pk.to_string(), + NodeId::Alias(a) => a.to_owned(), + } + ) + } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Config { pub nodes: Vec, - pub activity: Option>, + #[serde(default)] + pub activity: Vec, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +/// Data structure used to parse information from the simulation file. It allows source and destination to be +/// [NodeId], which enables the use of public keys and aliases in the simulation description. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActivityParser { + // The source of the payment. + #[serde(with = "serializers::serde_node_id")] + pub source: NodeId, + // The destination of the payment. + #[serde(with = "serializers::serde_node_id")] + pub destination: NodeId, + // The interval of the event, as in every how many seconds the payment is performed. + pub interval_secs: u16, + // The amount of m_sat to used in this payment. + pub amount_msat: u64, +} + +/// Data structure used internally by the simulator. Both source and destination are represented as [PublicKey] here. +/// This is constructed during activity validation and passed along to the [Simulation]. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ActivityDefinition { // The source of the payment. pub source: PublicKey, // The destination of the payment. pub destination: PublicKey, // The interval of the event, as in every how many seconds the payment is performed. - #[serde(alias = "interval_secs")] - pub interval: u16, + pub interval_secs: u16, // The amount of m_sat to used in this payment. pub amount_msat: u64, } +impl TryFrom<&mut ActivityParser> for ActivityDefinition { + type Error = LightningError; + + fn try_from(a: &mut ActivityParser) -> Result { + let source = *a.source.get_pk().map_err(Self::Error::ValidationError)?; + let destination = *a + .destination + .get_pk() + .map_err(Self::Error::ValidationError)?; + + Ok(Self { + source, + destination, + interval_secs: a.interval_secs, + amount_msat: a.amount_msat, + }) + } +} + #[derive(Debug, Error)] pub enum SimulationError { #[error("Lightning Error: {0:?}")] @@ -290,14 +348,14 @@ const DEFAULT_PRINT_BATCH_SIZE: u32 = 500; impl Simulation { pub fn new( nodes: HashMap>>, - activity: Option>, + activity: Vec, total_time: Option, print_batch_size: Option, ) -> Self { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); Self { nodes, - activity: activity.unwrap_or_default(), + activity, shutdown_trigger, shutdown_listener, total_time: total_time.map(|x| Duration::from_secs(x as u64)), @@ -574,7 +632,7 @@ impl Simulation { for description in self.activity.iter() { let sender_chan = producer_channels.get(&description.source).unwrap(); tasks.spawn(produce_events( - *description, + description.clone(), sender_chan.clone(), self.shutdown_trigger.clone(), self.shutdown_listener.clone(), @@ -706,12 +764,12 @@ async fn produce_events( listener: Listener, ) { let e: SimulationEvent = SimulationEvent::SendPayment(act.destination, act.amount_msat); - let interval = time::Duration::from_secs(act.interval as u64); + let interval = time::Duration::from_secs(act.interval_secs as u64); log::debug!( "Started producer for {} every {}s: {} -> {}.", act.amount_msat, - act.interval, + act.interval_secs, act.source, act.destination ); From e884245b8b80d79bd4f2d9954ec11244527d11fd Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 27 Sep 2023 10:24:37 -0400 Subject: [PATCH 5/6] sim-all: logs aliases and pks if both are known This commit redefines what data about a node is passed along to the simulator on activities. Previously, we used to pass only the src/dst `PublicKey`. However, by doing this we had no way of logging (nor accessing) any other information about them. This is mainly because all this mapping is only available on `sim-cli::main.rs`, and also because the data is mostly logged by the producers/consumers functions, which are not even part or the `Simulator` class. Therefore, the alternative we are left with are passing the information to the simulator. This can be done in several ways, I've gone with the one that has a better balance between codediff and usefulness, which is passing `NodeInfo` along. This, however, means having to obtain the destination node features on `main` instead of on `validation`. --- sim-cli/src/main.rs | 79 +++++++++++++------- sim-lib/src/cln.rs | 29 ++++---- sim-lib/src/lib.rs | 128 +++++++++++++++------------------ sim-lib/src/lnd.rs | 22 +++--- sim-lib/src/random_activity.rs | 26 ++++--- 5 files changed, 154 insertions(+), 130 deletions(-) diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index ace66c28..cdd13816 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -38,12 +38,10 @@ async fn main() -> anyhow::Result<()> { .unwrap(); let config_str = std::fs::read_to_string(cli.config)?; - let Config { - nodes, - mut activity, - } = serde_json::from_str(&config_str)?; + let Config { nodes, activity } = serde_json::from_str(&config_str)?; let mut clients: HashMap>> = HashMap::new(); + let mut pk_node_map = HashMap::new(); let mut alias_node_map = HashMap::new(); for connection in nodes { @@ -77,36 +75,67 @@ async fn main() -> anyhow::Result<()> { ))); } - alias_node_map.insert(node_info.alias.clone(), node_info.pubkey); clients.insert(node_info.pubkey, node); + pk_node_map.insert(node_info.pubkey, node_info.clone()); + alias_node_map.insert(node_info.alias.clone(), node_info); } let mut validated_activities = vec![]; // Make all the activities identifiable by PK internally - for act in activity.iter_mut() { + for act in activity.into_iter() { // We can only map aliases to nodes we control, so if either the source or destination alias // is not in alias_node_map, we fail - if let NodeId::Alias(a) = &act.source { - if let Some(pk) = alias_node_map.get(a) { - act.source = NodeId::PublicKey(*pk); - } else { - anyhow::bail!(LightningError::ValidationError(format!( - "activity source alias {} not found in nodes.", - act.source - ))); + let source = if let Some(source) = match &act.source { + NodeId::PublicKey(pk) => pk_node_map.get(pk), + NodeId::Alias(a) => alias_node_map.get(a), + } { + source.clone() + } else { + anyhow::bail!(LightningError::ValidationError(format!( + "activity source {} not found in nodes.", + act.source + ))); + }; + + let destination = match &act.destination { + NodeId::Alias(a) => { + if let Some(info) = alias_node_map.get(a) { + info.clone() + } else { + anyhow::bail!(LightningError::ValidationError(format!( + "unknown activity destination: {}.", + act.destination + ))); + } } - } - if let NodeId::Alias(a) = &act.destination { - if let Some(pk) = alias_node_map.get(a) { - act.destination = NodeId::PublicKey(*pk); - } else { - anyhow::bail!(LightningError::ValidationError(format!( - "unknown activity destination: {}.", - act.destination - ))); + NodeId::PublicKey(pk) => { + if let Some(info) = pk_node_map.get(pk) { + info.clone() + } else { + clients + .get(&source.pubkey) + .unwrap() + .lock() + .await + .get_node_info(pk) + .await + .map_err(|e| { + log::debug!("{}", e); + LightningError::ValidationError(format!( + "Destination node unknown or invalid: {}.", + pk, + )) + })? + } } - } - validated_activities.push(ActivityDefinition::try_from(act)?); + }; + + validated_activities.push(ActivityDefinition { + source, + destination, + interval_secs: act.interval_secs, + amount_msat: act.amount_msat, + }); } let sim = Simulation::new( diff --git a/sim-lib/src/cln.rs b/sim-lib/src/cln.rs index f0d27a91..311d56e8 100644 --- a/sim-lib/src/cln.rs +++ b/sim-lib/src/cln.rs @@ -230,12 +230,11 @@ impl LightningNode for ClnNode { } } - async fn get_node_features(&mut self, node: PublicKey) -> Result { - let node_id = node.serialize().to_vec(); - let nodes: Vec = self + async fn get_node_info(&mut self, node_id: &PublicKey) -> Result { + let mut nodes: Vec = self .client .list_nodes(ListnodesRequest { - id: Some(node_id.clone()), + id: Some(node_id.serialize().to_vec()), }) .await .map_err(|err| LightningError::GetNodeInfoError(err.to_string()))? @@ -243,15 +242,19 @@ impl LightningNode for ClnNode { .nodes; // We are filtering `list_nodes` to a single node, so we should get either an empty vector or one with a single element - if let Some(node) = nodes.first() { - Ok(node - .features - .clone() - .map_or(NodeFeatures::empty(), |mut f| { - // We need to reverse this given it has the CLN wire encoding which is BE - f.reverse(); - NodeFeatures::from_le_bytes(f) - })) + if let Some(node) = nodes.pop() { + Ok(NodeInfo { + pubkey: *node_id, + alias: node.alias.unwrap_or(String::new()), + features: node + .features + .clone() + .map_or(NodeFeatures::empty(), |mut f| { + // We need to reverse this given it has the CLN wire encoding which is BE + f.reverse(); + NodeFeatures::from_le_bytes(f) + }), + }) } else { Err(LightningError::GetNodeInfoError( "Node not found".to_string(), diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index e628602b..f7b3df10 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -110,37 +110,18 @@ pub struct ActivityParser { /// Data structure used internally by the simulator. Both source and destination are represented as [PublicKey] here. /// This is constructed during activity validation and passed along to the [Simulation]. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct ActivityDefinition { // The source of the payment. - pub source: PublicKey, + pub source: NodeInfo, // The destination of the payment. - pub destination: PublicKey, + pub destination: NodeInfo, // The interval of the event, as in every how many seconds the payment is performed. pub interval_secs: u16, // The amount of m_sat to used in this payment. pub amount_msat: u64, } -impl TryFrom<&mut ActivityParser> for ActivityDefinition { - type Error = LightningError; - - fn try_from(a: &mut ActivityParser) -> Result { - let source = *a.source.get_pk().map_err(Self::Error::ValidationError)?; - let destination = *a - .destination - .get_pk() - .map_err(Self::Error::ValidationError)?; - - Ok(Self { - source, - destination, - interval_secs: a.interval_secs, - amount_msat: a.amount_msat, - }) - } -} - #[derive(Debug, Error)] pub enum SimulationError { #[error("Lightning Error: {0:?}")] @@ -185,6 +166,18 @@ pub struct NodeInfo { pub features: NodeFeatures, } +impl Display for NodeInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let pk = self.pubkey.to_string(); + let pk_summary = format!("{}...{}", &pk[..6], &pk[pk.len() - 6..]); + if self.alias.is_empty() { + write!(f, "{}", pk_summary) + } else { + write!(f, "{}({})", self.alias, pk_summary) + } + } +} + /// LightningNode represents the functionality that is required to execute events on a lightning node. #[async_trait] pub trait LightningNode { @@ -204,8 +197,8 @@ pub trait LightningNode { hash: PaymentHash, shutdown: Listener, ) -> Result; - /// Gets the list of features of a given node - async fn get_node_features(&mut self, node: PublicKey) -> Result; + /// Gets information on a specific node + async fn get_node_info(&mut self, node_id: &PublicKey) -> Result; /// Lists all channels, at present only returns a vector of channel capacities in msat because no further /// information is required. async fn list_channels(&mut self) -> Result, LightningError>; @@ -214,7 +207,7 @@ pub trait LightningNode { pub trait NetworkGenerator { // sample_node_by_capacity randomly picks a node within the network weighted by its capacity deployed to the // network in channels. It returns the node's public key and its capacity in millisatoshis. - fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64); + fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64); } pub trait PaymentGenerator { @@ -306,11 +299,11 @@ impl Display for Payment { /// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions /// on. -#[derive(Clone, Copy)] +#[derive(Clone)] enum SimulationEvent { /// Dispatch a payment of the specified amount to the public key provided. /// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`. - SendPayment(PublicKey, u64), + SendPayment(NodeInfo, u64), } /// SimulationOutput provides the output of a simulation event. @@ -378,30 +371,16 @@ impl Simulation { for payment_flow in self.activity.iter() { // We need every source node that is configured to execute some activity to be included in our set of // nodes so that we can execute events on it. - let source_node = - self.nodes - .get(&payment_flow.source) - .ok_or(LightningError::ValidationError(format!( - "Source node not found, {}", - payment_flow.source, - )))?; + self.nodes + .get(&payment_flow.source.pubkey) + .ok_or(LightningError::ValidationError(format!( + "Source node not found, {}", + payment_flow.source, + )))?; // Destinations must support keysend to be able to receive payments. // Note: validation should be update with a different check if an event is not a payment. - let features = source_node - .lock() - .await - .get_node_features(payment_flow.destination) - .await - .map_err(|e| { - log::debug!("{}", e); - LightningError::ValidationError(format!( - "Destination node unknown or invalid, {}", - payment_flow.destination, - )) - })?; - - if !features.supports_keysend() { + if !payment_flow.destination.features.supports_keysend() { return Err(LightningError::ValidationError(format!( "Destination node does not support keysend, {}", payment_flow.destination, @@ -478,7 +457,7 @@ impl Simulation { let collecting_nodes = if !self.activity.is_empty() { self.activity .iter() - .map(|activity| activity.source) + .map(|activity| activity.source.pubkey) .collect() } else { random_activity_nodes.extend(self.random_activity_nodes().await?); @@ -568,7 +547,9 @@ impl Simulation { /// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes /// that have sufficient capacity to generate payments of our expected payment amount. - async fn random_activity_nodes(&self) -> Result, SimulationError> { + async fn random_activity_nodes( + &self, + ) -> Result, SimulationError> { // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to // avoid double counting capacity (as each node has a counterparty in the channel). let mut node_capacities = HashMap::new(); @@ -583,7 +564,13 @@ impl Simulation { continue; } - node_capacities.insert(*pk, chan_capacity / 2); + node_capacities.insert( + *pk, + ( + node.lock().await.get_node_info(pk).await?, + chan_capacity / 2, + ), + ); } Ok(node_capacities) @@ -630,7 +617,7 @@ impl Simulation { tasks: &mut JoinSet<()>, ) { for description in self.activity.iter() { - let sender_chan = producer_channels.get(&description.source).unwrap(); + let sender_chan = producer_channels.get(&description.source.pubkey).unwrap(); tasks.spawn(produce_events( description.clone(), sender_chan.clone(), @@ -644,12 +631,13 @@ impl Simulation { /// provided for each node represented in producer channels. async fn dispatch_random_producers( &self, - node_capacities: HashMap, + node_capacities: HashMap, producer_channels: HashMap>, tasks: &mut JoinSet<()>, ) -> Result<(), SimulationError> { - let network_generator = - Arc::new(Mutex::new(NetworkGraphView::new(node_capacities.clone())?)); + let network_generator = Arc::new(Mutex::new(NetworkGraphView::new( + node_capacities.values().cloned().collect(), + )?)); log::info!( "Created network generator: {}.", @@ -657,8 +645,8 @@ impl Simulation { ); for (pk, sender) in producer_channels.into_iter() { - let source_capacity = match node_capacities.get(&pk) { - Some(capacity) => *capacity, + let (info, source_capacity) = match node_capacities.get(&pk) { + Some((info, capacity)) => (info.clone(), *capacity), None => { return Err(SimulationError::RandomActivityError(format!( "Random activity generator run for: {} with unknown capacity.", @@ -674,7 +662,7 @@ impl Simulation { )?; tasks.spawn(produce_random_events( - pk, + info, network_generator.clone(), node_generator, sender.clone(), @@ -698,8 +686,7 @@ async fn consume_events( sender: Sender, shutdown: Trigger, ) { - let node_id = node.lock().await.get_info().pubkey; - log::debug!("Started consumer for {}.", node_id); + log::debug!("Started consumer for {}.", node.lock().await.get_info()); while let Some(event) = receiver.recv().await { match event { @@ -710,15 +697,15 @@ async fn consume_events( source: node.get_info().pubkey, hash: None, amount_msat: amt_msat, - destination: dest, + destination: dest.pubkey, dispatch_time: SystemTime::now(), }; - let outcome = match node.send_payment(dest, amt_msat).await { + let outcome = match node.send_payment(dest.pubkey, amt_msat).await { Ok(payment_hash) => { log::debug!( "Send payment: {} -> {}: ({}).", - node_id, + node.get_info(), dest, hex::encode(payment_hash.0) ); @@ -727,7 +714,11 @@ async fn consume_events( SimulationOutput::SendPaymentSuccess(payment) } Err(e) => { - log::error!("Error while sending payment {} -> {}.", node_id, dest); + log::error!( + "Error while sending payment {} -> {}.", + node.get_info(), + dest + ); match e { LightningError::PermanentError(s) => { @@ -763,7 +754,6 @@ async fn produce_events( shutdown: Trigger, listener: Listener, ) { - let e: SimulationEvent = SimulationEvent::SendPayment(act.destination, act.amount_msat); let interval = time::Duration::from_secs(act.interval_secs as u64); log::debug!( @@ -779,7 +769,7 @@ async fn produce_events( biased; _ = time::sleep(interval) => { // Consumer was dropped - if sender.send(e).await.is_err() { + if sender.send(SimulationEvent::SendPayment(act.destination.clone(), act.amount_msat)).await.is_err() { log::debug!( "Stopped producer for {}: {} -> {}. Consumer cannot be reached.", act.amount_msat, @@ -807,7 +797,7 @@ async fn produce_events( } async fn produce_random_events( - source: PublicKey, + source: NodeInfo, network_generator: Arc>, node_generator: A, sender: Sender, @@ -829,7 +819,7 @@ async fn produce_random_events { - let destination = network_generator.lock().await.sample_node_by_capacity(source); + let destination = network_generator.lock().await.sample_node_by_capacity(source.pubkey); // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get // a payment amount something has gone wrong (because we should have validated that we can always @@ -851,7 +841,7 @@ async fn produce_random_events {}: {amount} msat.", destination.0); // Send the payment, exiting if we can no longer send to the consumer. - let event = SimulationEvent::SendPayment(destination.0, amount); + let event = SimulationEvent::SendPayment(destination.0.clone(), amount); if let Err(e) = sender.send(event).await { log::debug!( "Stopped random producer for {amount}: {source} -> {}. Consumer error: {e}.", destination.0, diff --git a/sim-lib/src/lnd.rs b/sim-lib/src/lnd.rs index 8fda8bdf..b6782a3e 100644 --- a/sim-lib/src/lnd.rs +++ b/sim-lib/src/lnd.rs @@ -109,12 +109,14 @@ impl LightningNode for LndNode { .into_inner(); if info.chains.is_empty() { - return Err(LightningError::ValidationError( - "LND node is not connected any chain".to_string(), - )); + return Err(LightningError::ValidationError(format!( + "{} is not connected any chain", + self.get_info() + ))); } else if info.chains.len() > 1 { return Err(LightningError::ValidationError(format!( - "LND node is connected to more than one chain: {:?}", + "{} is connected to more than one chain: {:?}", + self.get_info(), info.chains.iter().map(|c| c.chain.to_string()) ))); } @@ -229,12 +231,12 @@ impl LightningNode for LndNode { } } - async fn get_node_features(&mut self, node: PublicKey) -> Result { + async fn get_node_info(&mut self, node_id: &PublicKey) -> Result { let node_info = self .client .lightning() .get_node_info(NodeInfoRequest { - pub_key: node.to_string(), + pub_key: node_id.to_string(), include_channels: false, }) .await @@ -242,9 +244,11 @@ impl LightningNode for LndNode { .into_inner(); if let Some(node_info) = node_info.node { - Ok(parse_node_features( - node_info.features.keys().cloned().collect(), - )) + Ok(NodeInfo { + pubkey: *node_id, + alias: node_info.alias, + features: parse_node_features(node_info.features.keys().cloned().collect()), + }) } else { Err(LightningError::GetNodeInfoError( "Node not found".to_string(), diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index a7508032..93b8b6cf 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -1,12 +1,11 @@ use core::fmt; -use std::collections::HashMap; use std::fmt::Display; use bitcoin::secp256k1::PublicKey; use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex}; use std::time::Duration; -use crate::{NetworkGenerator, PaymentGenerator, SimulationError}; +use crate::{NetworkGenerator, NodeInfo, PaymentGenerator, SimulationError}; const HOURS_PER_MONTH: u64 = 30 * 24; const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60; @@ -17,21 +16,21 @@ const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60; /// which has a view of the full network except for itself). pub struct NetworkGraphView { node_picker: WeightedIndex, - nodes: Vec<(PublicKey, u64)>, + nodes: Vec<(NodeInfo, u64)>, } impl NetworkGraphView { // Creates a network view for the map of node public keys to capacity (in millisatoshis) provided. Returns an error // if any node's capacity is zero (the node cannot receive), or there are not at least two nodes (one node can't // send to itself). - pub fn new(node_capacities: HashMap) -> Result { - if node_capacities.len() < 2 { + pub fn new(nodes: Vec<(NodeInfo, u64)>) -> Result { + if nodes.len() < 2 { return Err(SimulationError::RandomActivityError( "at least two nodes required for activity generation".to_string(), )); } - if node_capacities.values().any(|v| *v == 0) { + if nodes.iter().any(|(_, v)| *v == 0) { return Err(SimulationError::RandomActivityError( "network generator created with zero capacity node".to_string(), )); @@ -39,11 +38,9 @@ impl NetworkGraphView { // To create a weighted index we're going to need a vector of nodes that we index and weights that are set // by their deployed capacity. To efficiently store our view of nodes capacity, we're also going to store - // capacity along with the node pubkey because we query the two at the same time. Zero capacity nodes are + // capacity along with the node info because we query the two at the same time. Zero capacity nodes are // filtered out because they have no chance of being selected (and wont' be able to receive payments). - let nodes = node_capacities.iter().map(|(k, v)| (*k, *v)).collect(); - - let node_picker = WeightedIndex::new(node_capacities.into_values().collect::>()) + let node_picker = WeightedIndex::new(nodes.iter().map(|(_, v)| *v).collect::>()) .map_err(|e| SimulationError::RandomActivityError(e.to_string()))?; Ok(NetworkGraphView { node_picker, nodes }) @@ -54,7 +51,7 @@ impl NetworkGenerator for NetworkGraphView { /// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's /// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with /// very small graphs, or those with one node significantly more capitalized than others). - fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64) { + fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64) { let mut rng = rand::thread_rng(); // While it's very unlikely that we can't pick a destination that is not our source, it's possible that there's @@ -63,10 +60,11 @@ impl NetworkGenerator for NetworkGraphView { let mut i = 1; loop { let index = self.node_picker.sample(&mut rng); - let destination = self.nodes[index]; + // Unwrapping is safe given `NetworkGraphView` has the same amount of elements for `nodes` and `node_picker` + let destination = self.nodes.get(index).unwrap(); - if destination.0 != source { - return destination; + if destination.0.pubkey != source { + return destination.clone(); } if i % 50 == 0 { From 279ad5631e75f2327573577763fd899bdbd6f017 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 5 Oct 2023 14:33:01 -0400 Subject: [PATCH 6/6] sim-lib: ensures destination supports keysend for random payments --- sim-lib/src/lib.rs | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index f7b3df10..781d396c 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -362,10 +362,20 @@ impl Simulation { /// we're working with. If no activity description is provided, then it ensures that we have configured a network /// that is suitable for random activity generation. async fn validate_activity(&self) -> Result<(), LightningError> { - if self.activity.is_empty() && self.nodes.len() <= 1 { - return Err(LightningError::ValidationError( - "At least two nodes required for random activity generation.".to_string(), - )); + // For now, empty activity signals random activity generation + if self.activity.is_empty() { + if self.nodes.len() <= 1 { + return Err(LightningError::ValidationError( + "At least two nodes required for random activity generation.".to_string(), + )); + } else { + for node in self.nodes.values() { + let node = node.lock().await; + if !node.get_info().features.supports_keysend() { + return Err(LightningError::ValidationError(format!("All nodes eligible for random activity generation must support keysend, {} does not", node.get_info()))); + } + } + } } for payment_flow in self.activity.iter() { @@ -819,32 +829,32 @@ async fn produce_random_events { - let destination = network_generator.lock().await.sample_node_by_capacity(source.pubkey); + let (destination, capacity) = network_generator.lock().await.sample_node_by_capacity(source.pubkey); // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get // a payment amount something has gone wrong (because we should have validated that we can always // generate amounts), so we exit. - let amount = match node_generator.payment_amount(destination.1) { + let amount = match node_generator.payment_amount(capacity) { Ok(amt) => { if amt == 0 { - log::debug!("Skipping zero amount payment for {source} -> {}.", destination.0); + log::debug!("Skipping zero amount payment for {source} -> {destination}."); continue; } amt }, Err(e) => { - log::error!("Could not get amount for {source} -> {}: {e}. Please report a bug!", destination.0); + log::error!("Could not get amount for {source} -> {destination}: {e}. Please report a bug!"); break; }, }; - log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination.0); + log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination); // Send the payment, exiting if we can no longer send to the consumer. - let event = SimulationEvent::SendPayment(destination.0.clone(), amount); + let event = SimulationEvent::SendPayment(destination.clone(), amount); if let Err(e) = sender.send(event).await { log::debug!( - "Stopped random producer for {amount}: {source} -> {}. Consumer error: {e}.", destination.0, + "Stopped random producer for {amount}: {source} -> {destination}. Consumer error: {e}.", ); break; }