Skip to content

Commit

Permalink
Merge pull request #104 from okjodom/continue-sim
Browse files Browse the repository at this point in the history
sim-lib: continue simulation on send payment errors
  • Loading branch information
carlaKC authored Oct 4, 2023
2 parents fceb05a + fb981a2 commit bc0f8ed
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 86 deletions.
13 changes: 12 additions & 1 deletion sim-lib/src/cln.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,18 @@ impl LightningNode for ClnNode {
..Default::default()
})
.await
.map_err(|err| LightningError::SendPaymentError(err.to_string()))?
.map_err(|s| {
let message = s.message();
// REF: https://docs.corelightning.org/reference/lightning-keysend#return-value

if message.contains("Some(-1") | message.contains("Some(203") {
// Error codes -1 and 203 indicate permanent errors
LightningError::PermanentError(format!("{:?}", message))
} else {
// Error codes, 205, 206 and 210 indicate temporary errors that can be retried
LightningError::SendPaymentError(format!("{:?}", message))
}
})?
.into_inner();
let slice: [u8; 32] = payment_hash
.as_slice()
Expand Down
178 changes: 113 additions & 65 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub enum LightningError {
GetNodeInfoError(String),
#[error("Config validation failed: {0}")]
ValidationError(String),
#[error("RPC error: {0:?}")]
RpcError(#[from] tonic_lnd::tonic::Status),
#[error("Permanent error: {0:?}")]
PermanentError(String),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -132,20 +132,28 @@ pub trait LightningNode {
async fn get_node_features(&mut self, node: PublicKey) -> Result<NodeFeatures, LightningError>;
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
// Dispatch a payment of the specified amount to the public key provided.
SendPayment(PublicKey, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentResult {
pub htlc_count: usize,
pub payment_outcome: PaymentOutcome,
}

impl PaymentResult {
pub fn not_dispatched() -> Self {
PaymentResult {
htlc_count: 0,
payment_outcome: PaymentOutcome::NotDispatched,
}
}

pub fn track_payment_failed() -> Self {
PaymentResult {
htlc_count: 0,
payment_outcome: PaymentOutcome::TrackPaymentFailed,
}
}
}

impl Display for PaymentResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -168,25 +176,33 @@ pub enum PaymentOutcome {
IncorrectPaymentDetails,
InsufficientBalance,
Unknown,
NotDispatched,
TrackPaymentFailed,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct DispatchedPayment {
/// Describes a payment from a source node to a destination node.
#[derive(Debug, Clone, Copy, Serialize)]
struct Payment {
/// Pubkey of the source node dispatching the payment.
source: PublicKey,
/// Pubkey of the destination node receiving the payment.
destination: PublicKey,
#[serde(with = "serializers::serde_payment_hash")]
hash: PaymentHash,
/// Amount of the payment in msat.
amount_msat: u64,
/// Hash of the payment if it has been successfully dispatched.
#[serde(with = "serializers::serde_option_payment_hash")]
hash: Option<PaymentHash>,
/// Time at which the payment was dispatched.
#[serde(with = "serde_millis")]
dispatch_time: SystemTime,
}

impl Display for DispatchedPayment {
impl Display for Payment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payment {} dispatched at {:?} sending {} msat from {} -> {}",
hex::encode(self.hash.0),
self.hash.map(|h| hex::encode(h.0)).unwrap_or(String::new()),
self.dispatch_time.duration_since(UNIX_EPOCH).unwrap(),
self.amount_msat,
self.source,
Expand All @@ -195,10 +211,23 @@ impl Display for DispatchedPayment {
}
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(PublicKey, u64),
}

/// SimulationOutput provides the output of a simulation event.
enum SimulationOutput {
// The payment hash that results from a SendPayment SimulationEvent being triggered.
PaymentSent(DispatchedPayment),
/// Intermediate output for when simulator has successfully dispatched a payment.
/// We need to track the result of the payment to report on it.
SendPaymentSuccess(Payment),
/// Final output for when simulator has failed to dispatch a payment.
/// Report this as the final result of simulation event.
SendPaymentFailure(Payment, PaymentResult),
}

#[derive(Clone)]
Expand Down Expand Up @@ -434,6 +463,7 @@ impl Simulation {
node.clone(),
receiver,
output_sender.clone(),
shutdown.clone(),
));

// Add the producer channel to our map so that various activity descriptions can use it. We may have multiple
Expand Down Expand Up @@ -464,6 +494,7 @@ async fn consume_events(
node: Arc<Mutex<dyn LightningNode + Send>>,
mut receiver: Receiver<SimulationEvent>,
sender: Sender<SimulationOutput>,
shutdown: Trigger,
) {
let node_id = node.lock().await.get_info().pubkey;
log::debug!("Started consumer for {}.", node_id);
Expand All @@ -472,44 +503,51 @@ async fn consume_events(
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
let mut node = node.lock().await;
let payment = node.send_payment(dest, amt_msat);

match payment.await {
let mut payment = Payment {
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest,
dispatch_time: SystemTime::now(),
};

let outcome = match node.send_payment(dest, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node_id,
dest,
hex::encode(payment_hash.0)
);
// We need to track the payment outcome using the payment hash that we have received.
payment.hash = Some(payment_hash);
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!("Error while sending payment {} -> {}.", node_id, dest);

log::debug!("Sending output for {}.", hex::encode(payment_hash.0));
let output = SimulationOutput::PaymentSent(DispatchedPayment {
source: node.get_info().pubkey,
hash: payment_hash,
amount_msat: amt_msat,
destination: dest,
dispatch_time: SystemTime::now(),
});

match sender.send(output).await {
Ok(_) => {}
Err(e) => {
log::error!("Error sending simulation output: {:?}.", e);
match e {
LightningError::PermanentError(s) => {
log::error!("Simulation terminated with error: {s}.");
shutdown.trigger();
break;
}
_ => SimulationOutput::SendPaymentFailure(
payment,
PaymentResult::not_dispatched(),
),
}
}
};

match sender.send(outcome).await {
Ok(_) => {}
Err(e) => {
log::error!(
"Error while sending payment {} -> {}. Terminating consumer. {}.",
node_id,
dest,
e
);
log::error!("Error sending action outcome: {:?}.", e);
break;
}
};
}
}
};
}
Expand Down Expand Up @@ -567,7 +605,7 @@ async fn produce_events(
}

async fn consume_simulation_results(
receiver: Receiver<(DispatchedPayment, PaymentResult)>,
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) {
Expand All @@ -581,7 +619,7 @@ async fn consume_simulation_results(
}

async fn write_payment_results(
mut receiver: Receiver<(DispatchedPayment, PaymentResult)>,
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) -> Result<(), SimulationError> {
Expand Down Expand Up @@ -651,7 +689,7 @@ impl PaymentResultLogger {
}
}

fn report_result(&mut self, details: &DispatchedPayment, result: &PaymentResult) {
fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
match result.payment_outcome {
PaymentOutcome::Success => self.success_payment += 1,
_ => self.failed_payment += 1,
Expand Down Expand Up @@ -684,7 +722,7 @@ impl PaymentResultLogger {
async fn produce_simulation_results(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
mut output_receiver: Receiver<SimulationOutput>,
results: Sender<(DispatchedPayment, PaymentResult)>,
results: Sender<(Payment, PaymentResult)>,
shutdown: Listener,
) {
log::debug!("Simulation results producer started.");
Expand All @@ -699,14 +737,17 @@ async fn produce_simulation_results(
match output{
Some(simulation_output) => {
match simulation_output{
SimulationOutput::PaymentSent(dispatched_payment) => {
let source_node = nodes.get(&dispatched_payment.source).unwrap().clone();

log::debug!("Tracking payment result for: {}.", hex::encode(dispatched_payment.hash.0));
SimulationOutput::SendPaymentSuccess(payment) => {
let source_node = nodes.get(&payment.source).unwrap().clone();
set.spawn(track_payment_result(
source_node,results.clone(),simulation_output, shutdown.clone(),
source_node, results.clone(), payment, shutdown.clone(),
));
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.clone().send((payment, result)).await.is_err() {
log::debug!("Could not send payment result.");
}
}
};

},
Expand All @@ -728,38 +769,45 @@ async fn produce_simulation_results(

async fn track_payment_result(
node: Arc<Mutex<dyn LightningNode + Send>>,
results: Sender<(DispatchedPayment, PaymentResult)>,
output: SimulationOutput,
results: Sender<(Payment, PaymentResult)>,
payment: Payment,
shutdown: Listener,
) {
log::trace!("Payment result tracker starting.");

let mut node = node.lock().await;

match output {
SimulationOutput::PaymentSent(payment) => {
let track_payment = node.track_payment(payment.hash, shutdown.clone());
let res = match payment.hash {
Some(hash) => {
log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
let track_payment = node.track_payment(hash, shutdown.clone());

match track_payment.await {
Ok(res) => {
log::debug!(
"Track payment {} result: {:?}.",
hex::encode(payment.hash.0),
hex::encode(hash.0),
res.payment_outcome
);
if results.clone().send((payment, res)).await.is_err() {
log::debug!(
"Could not send payment result for {}.",
hex::encode(payment.hash.0)
);
}
res
}
Err(e) => {
log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
PaymentResult::track_payment_failed()
}
Err(e) => log::error!(
"Track payment failed for {}: {e}.",
hex::encode(payment.hash.0)
),
}
}
// None means that the payment was not dispatched, so we cannot track it.
None => {
log::error!(
"We cannot track a payment that has not been dispatched. Missing payment hash."
);
PaymentResult::not_dispatched()
}
};

if results.clone().send((payment, res)).await.is_err() {
log::debug!("Could not send payment result.");
}

log::trace!("Payment result tracker exiting.");
Expand Down
16 changes: 14 additions & 2 deletions sim-lib/src/lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use lightning::ln::{PaymentHash, PaymentPreimage};
use tonic_lnd::lnrpc::{payment::PaymentStatus, GetInfoRequest, GetInfoResponse};
use tonic_lnd::lnrpc::{NodeInfoRequest, PaymentFailureReason};
use tonic_lnd::routerrpc::TrackPaymentRequest;
use tonic_lnd::tonic::Code::Unavailable;
use tonic_lnd::tonic::Status;
use tonic_lnd::{routerrpc::SendPaymentRequest, Client};
use triggered::Listener;

Expand Down Expand Up @@ -115,11 +117,12 @@ impl LightningNode for LndNode {
fee_limit_msat: i64::max_value(),
..Default::default()
})
.await?;
.await
.map_err(status_to_lightning_error)?;

let mut stream = response.into_inner();

let payment_hash = match stream.message().await? {
let payment_hash = match stream.message().await.map_err(status_to_lightning_error)? {
Some(payment) => string_to_payment_hash(&payment.payment_hash)?,
None => return Err(LightningError::SendPaymentError("No payment".to_string())),
};
Expand Down Expand Up @@ -217,3 +220,12 @@ fn string_to_payment_hash(hash: &str) -> Result<PaymentHash, LightningError> {
.map_err(|_| LightningError::InvalidPaymentHash)?;
Ok(PaymentHash(slice))
}

fn status_to_lightning_error(s: Status) -> LightningError {
let code = s.code();
let message = s.message();
match code {
Unavailable => LightningError::SendPaymentError(format!("Node unavailable: {message}")),
_ => LightningError::PermanentError(message.to_string()),
}
}
Loading

0 comments on commit bc0f8ed

Please sign in to comment.