Skip to content

Commit

Permalink
sim-lib: continue simulation on send payment errors
Browse files Browse the repository at this point in the history
 instead of stopping the simulation on the first send payment rpc error,
 we gracefully handle the error and report a default send payment
result. since send payment rpc might fail before dispatching a payment
(i.e in CLN case), we allow activity result reporting to skip tracking
of payment if there's no payment hash
  • Loading branch information
okjodom committed Oct 3, 2023
1 parent 70250a6 commit fb981a2
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 86 deletions.
13 changes: 12 additions & 1 deletion sim-lib/src/cln.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,18 @@ impl LightningNode for ClnNode {
..Default::default()
})
.await
.map_err(|err| LightningError::SendPaymentError(err.to_string()))?
.map_err(|s| {
let message = s.message();
// REF: https://docs.corelightning.org/reference/lightning-keysend#return-value

if message.contains("Some(-1") | message.contains("Some(203") {
// Error codes -1 and 203 indicate permanent errors
LightningError::PermanentError(format!("{:?}", message))
} else {
// Error codes, 205, 206 and 210 indicate temporary errors that can be retried
LightningError::SendPaymentError(format!("{:?}", message))
}
})?
.into_inner();
let slice: [u8; 32] = payment_hash
.as_slice()
Expand Down
178 changes: 113 additions & 65 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub enum LightningError {
GetNodeInfoError(String),
#[error("Config validation failed {0}")]
ValidationError(String),
#[error("RPC error: {0:?}")]
RpcError(#[from] tonic_lnd::tonic::Status),
#[error("Permanent error: {0:?}")]
PermanentError(String),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -131,20 +131,28 @@ pub trait LightningNode {
async fn get_node_features(&mut self, node: PublicKey) -> Result<NodeFeatures, LightningError>;
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
// Dispatch a payment of the specified amount to the public key provided.
SendPayment(PublicKey, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentResult {
pub htlc_count: usize,
pub payment_outcome: PaymentOutcome,
}

impl PaymentResult {
pub fn not_dispatched() -> Self {
PaymentResult {
htlc_count: 0,
payment_outcome: PaymentOutcome::NotDispatched,
}
}

pub fn track_payment_failed() -> Self {
PaymentResult {
htlc_count: 0,
payment_outcome: PaymentOutcome::TrackPaymentFailed,
}
}
}

impl Display for PaymentResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -167,25 +175,33 @@ pub enum PaymentOutcome {
IncorrectPaymentDetails,
InsufficientBalance,
Unknown,
NotDispatched,
TrackPaymentFailed,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct DispatchedPayment {
/// Describes a payment from a source node to a destination node.
#[derive(Debug, Clone, Copy, Serialize)]
struct Payment {
/// Pubkey of the source node dispatching the payment.
source: PublicKey,
/// Pubkey of the destination node receiving the payment.
destination: PublicKey,
#[serde(with = "serializers::serde_payment_hash")]
hash: PaymentHash,
/// Amount of the payment in msat.
amount_msat: u64,
/// Hash of the payment if it has been successfully dispatched.
#[serde(with = "serializers::serde_option_payment_hash")]
hash: Option<PaymentHash>,
/// Time at which the payment was dispatched.
#[serde(with = "serde_millis")]
dispatch_time: SystemTime,
}

impl Display for DispatchedPayment {
impl Display for Payment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payment {} dispatched at {:?} sending {} msat from {} -> {}",
hex::encode(self.hash.0),
self.hash.map(|h| hex::encode(h.0)).unwrap_or(String::new()),
self.dispatch_time.duration_since(UNIX_EPOCH).unwrap(),
self.amount_msat,
self.source,
Expand All @@ -194,10 +210,23 @@ impl Display for DispatchedPayment {
}
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(PublicKey, u64),
}

/// SimulationOutput provides the output of a simulation event.
enum SimulationOutput {
// The payment hash that results from a SendPayment SimulationEvent being triggered.
PaymentSent(DispatchedPayment),
/// Intermediate output for when simulator has successfully dispatched a payment.
/// We need to track the result of the payment to report on it.
SendPaymentSuccess(Payment),
/// Final output for when simulator has failed to dispatch a payment.
/// Report this as the final result of simulation event.
SendPaymentFailure(Payment, PaymentResult),
}

#[derive(Clone)]
Expand Down Expand Up @@ -391,6 +420,7 @@ impl Simulation {
node.clone(),
receiver,
output_sender.clone(),
shutdown.clone(),
));

// Add the producer channel to our map so that various activity descriptions can use it. We may have multiple
Expand Down Expand Up @@ -421,6 +451,7 @@ async fn consume_events(
node: Arc<Mutex<dyn LightningNode + Send>>,
mut receiver: Receiver<SimulationEvent>,
sender: Sender<SimulationOutput>,
shutdown: Trigger,
) {
let node_id = node.lock().await.get_info().pubkey;
log::debug!("Started consumer for {}.", node_id);
Expand All @@ -429,44 +460,51 @@ async fn consume_events(
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
let mut node = node.lock().await;
let payment = node.send_payment(dest, amt_msat);

match payment.await {
let mut payment = Payment {
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest,
dispatch_time: SystemTime::now(),
};

let outcome = match node.send_payment(dest, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node_id,
dest,
hex::encode(payment_hash.0)
);
// We need to track the payment outcome using the payment hash that we have received.
payment.hash = Some(payment_hash);
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!("Error while sending payment {} -> {}.", node_id, dest);

log::debug!("Sending output for {}.", hex::encode(payment_hash.0));
let output = SimulationOutput::PaymentSent(DispatchedPayment {
source: node.get_info().pubkey,
hash: payment_hash,
amount_msat: amt_msat,
destination: dest,
dispatch_time: SystemTime::now(),
});

match sender.send(output).await {
Ok(_) => {}
Err(e) => {
log::error!("Error sending simulation output: {:?}.", e);
match e {
LightningError::PermanentError(s) => {
log::error!("Simulation terminated with error: {s}.");
shutdown.trigger();
break;
}
_ => SimulationOutput::SendPaymentFailure(
payment,
PaymentResult::not_dispatched(),
),
}
}
};

match sender.send(outcome).await {
Ok(_) => {}
Err(e) => {
log::error!(
"Error while sending payment {} -> {}. Terminating consumer. {}.",
node_id,
dest,
e
);
log::error!("Error sending action outcome: {:?}.", e);
break;
}
};
}
}
};
}
Expand Down Expand Up @@ -524,7 +562,7 @@ async fn produce_events(
}

async fn consume_simulation_results(
receiver: Receiver<(DispatchedPayment, PaymentResult)>,
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) {
Expand All @@ -538,7 +576,7 @@ async fn consume_simulation_results(
}

async fn write_payment_results(
mut receiver: Receiver<(DispatchedPayment, PaymentResult)>,
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) -> Result<(), SimulationError> {
Expand Down Expand Up @@ -608,7 +646,7 @@ impl PaymentResultLogger {
}
}

fn report_result(&mut self, details: &DispatchedPayment, result: &PaymentResult) {
fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
match result.payment_outcome {
PaymentOutcome::Success => self.success_payment += 1,
_ => self.failed_payment += 1,
Expand Down Expand Up @@ -641,7 +679,7 @@ impl PaymentResultLogger {
async fn produce_simulation_results(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
mut output_receiver: Receiver<SimulationOutput>,
results: Sender<(DispatchedPayment, PaymentResult)>,
results: Sender<(Payment, PaymentResult)>,
shutdown: Listener,
) {
log::debug!("Simulation results producer started.");
Expand All @@ -656,14 +694,17 @@ async fn produce_simulation_results(
match output{
Some(simulation_output) => {
match simulation_output{
SimulationOutput::PaymentSent(dispatched_payment) => {
let source_node = nodes.get(&dispatched_payment.source).unwrap().clone();

log::debug!("Tracking payment result for: {}.", hex::encode(dispatched_payment.hash.0));
SimulationOutput::SendPaymentSuccess(payment) => {
let source_node = nodes.get(&payment.source).unwrap().clone();
set.spawn(track_payment_result(
source_node,results.clone(),simulation_output, shutdown.clone(),
source_node, results.clone(), payment, shutdown.clone(),
));
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.clone().send((payment, result)).await.is_err() {
log::debug!("Could not send payment result.");
}
}
};

},
Expand All @@ -685,38 +726,45 @@ async fn produce_simulation_results(

async fn track_payment_result(
node: Arc<Mutex<dyn LightningNode + Send>>,
results: Sender<(DispatchedPayment, PaymentResult)>,
output: SimulationOutput,
results: Sender<(Payment, PaymentResult)>,
payment: Payment,
shutdown: Listener,
) {
log::trace!("Payment result tracker starting.");

let mut node = node.lock().await;

match output {
SimulationOutput::PaymentSent(payment) => {
let track_payment = node.track_payment(payment.hash, shutdown.clone());
let res = match payment.hash {
Some(hash) => {
log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
let track_payment = node.track_payment(hash, shutdown.clone());

match track_payment.await {
Ok(res) => {
log::debug!(
"Track payment {} result: {:?}.",
hex::encode(payment.hash.0),
hex::encode(hash.0),
res.payment_outcome
);
if results.clone().send((payment, res)).await.is_err() {
log::debug!(
"Could not send payment result for {}.",
hex::encode(payment.hash.0)
);
}
res
}
Err(e) => {
log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
PaymentResult::track_payment_failed()
}
Err(e) => log::error!(
"Track payment failed for {}: {e}.",
hex::encode(payment.hash.0)
),
}
}
// None means that the payment was not dispatched, so we cannot track it.
None => {
log::error!(
"We cannot track a payment that has not been dispatched. Missing payment hash."
);
PaymentResult::not_dispatched()
}
};

if results.clone().send((payment, res)).await.is_err() {
log::debug!("Could not send payment result.");
}

log::trace!("Payment result tracker exiting.");
Expand Down
16 changes: 14 additions & 2 deletions sim-lib/src/lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use lightning::ln::{PaymentHash, PaymentPreimage};
use tonic_lnd::lnrpc::{payment::PaymentStatus, GetInfoRequest, GetInfoResponse};
use tonic_lnd::lnrpc::{NodeInfoRequest, PaymentFailureReason};
use tonic_lnd::routerrpc::TrackPaymentRequest;
use tonic_lnd::tonic::Code::Unavailable;
use tonic_lnd::tonic::Status;
use tonic_lnd::{routerrpc::SendPaymentRequest, Client};
use triggered::Listener;

Expand Down Expand Up @@ -107,11 +109,12 @@ impl LightningNode for LndNode {
fee_limit_msat: i64::max_value(),
..Default::default()
})
.await?;
.await
.map_err(status_to_lightning_error)?;

let mut stream = response.into_inner();

let payment_hash = match stream.message().await? {
let payment_hash = match stream.message().await.map_err(status_to_lightning_error)? {
Some(payment) => string_to_payment_hash(&payment.payment_hash)?,
None => return Err(LightningError::SendPaymentError("No payment".to_string())),
};
Expand Down Expand Up @@ -209,3 +212,12 @@ fn string_to_payment_hash(hash: &str) -> Result<PaymentHash, LightningError> {
.map_err(|_| LightningError::InvalidPaymentHash)?;
Ok(PaymentHash(slice))
}

fn status_to_lightning_error(s: Status) -> LightningError {
let code = s.code();
let message = s.message();
match code {
Unavailable => LightningError::SendPaymentError(format!("Node unavailable: {message}")),
_ => LightningError::PermanentError(message.to_string()),
}
}
Loading

0 comments on commit fb981a2

Please sign in to comment.