Skip to content

Commit

Permalink
f include channel_id in ChannelMonitorUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
dunxen committed Jan 30, 2024
1 parent 32d7d55 commit 8930f59
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 29 deletions.
4 changes: 2 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
self.chain_monitor.watch_channel(funding_txo, monitor)
}

fn update_channel(&self, funding_txo: OutPoint, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let mut map_entry = match map_lock.entry(funding_txo) {
hash_map::Entry::Occupied(entry) => entry,
Expand All @@ -188,7 +188,7 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));
self.chain_monitor.update_channel(funding_txo, channel_id, update)
self.chain_monitor.update_channel(funding_txo, update)
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
Expand Down
5 changes: 4 additions & 1 deletion lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ where C::Target: chain::Filter,
Ok(persist_res)
}

fn update_channel(&self, funding_txo: OutPoint, channel_id: ChannelId, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo));
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
Expand Down
10 changes: 9 additions & 1 deletion lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ pub struct ChannelMonitorUpdate {
///
/// [`ChannelMonitorUpdateStatus::InProgress`]: super::ChannelMonitorUpdateStatus::InProgress
pub update_id: u64,
/// The channel ID associated with these updates.
///
/// Will be `None` for `ChannelMonitorUpdate`s constructed on LDK versions prior to 0.0.121 and
/// always `Some` otherwise.
pub channel_id: Option<ChannelId>,
}

/// The update ID used for a [`ChannelMonitorUpdate`] that is either:
Expand All @@ -118,6 +123,7 @@ impl Writeable for ChannelMonitorUpdate {
}
write_tlv_fields!(w, {
(1, self.counterparty_node_id, option),
(3, self.channel_id, option),
});
Ok(())
}
Expand All @@ -134,10 +140,12 @@ impl Readable for ChannelMonitorUpdate {
}
}
let mut counterparty_node_id = None;
let mut channel_id = None;
read_tlv_fields!(r, {
(1, counterparty_node_id, option),
(3, channel_id, option),
});
Ok(Self { update_id, counterparty_node_id, updates })
Ok(Self { update_id, counterparty_node_id, updates, channel_id })
}
}

Expand Down
2 changes: 1 addition & 1 deletion lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub trait Watch<ChannelSigner: WriteableEcdsaChannelSigner> {
/// [`ChannelMonitorUpdateStatus::UnrecoverableError`], see its documentation for more info.
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
fn update_channel(&self, funding_txo: OutPoint, channel_id: ChannelId, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;

/// Returns any monitor events since the last call. Subsequent calls must only return new
/// events.
Expand Down
5 changes: 2 additions & 3 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ fn test_monitor_and_persister_update_fail() {
// Create some initial channel
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
let outpoint = OutPoint { txid: chan.3.txid(), index: 0 };
let channel_id = chan.2;

// Rebalance the network to generate htlc in the two directions
send_payment(&nodes[0], &vec!(&nodes[1])[..], 10_000_000);
Expand Down Expand Up @@ -102,12 +101,12 @@ fn test_monitor_and_persister_update_fail() {
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Check that the persister returns InProgress (and will never actually complete)
// as the monitor update errors.
if let ChannelMonitorUpdateStatus::InProgress = chain_mon.chain_monitor.update_channel(outpoint, channel_id, &update) {} else { panic!("Expected monitor paused"); }
if let ChannelMonitorUpdateStatus::InProgress = chain_mon.chain_monitor.update_channel(outpoint, &update) {} else { panic!("Expected monitor paused"); }
logger.assert_log_regex("lightning::chain::chainmonitor", regex::Regex::new("Failed to update ChannelMonitor for channel [0-9a-f]*.").unwrap(), 1);

// Apply the monitor update to the original ChainMonitor, ensuring the
// ChannelManager and ChannelMonitor aren't out of sync.
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, channel_id, &update),
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update),
ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
} else {
Expand Down
12 changes: 10 additions & 2 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
update_id: self.latest_monitor_update_id,
counterparty_node_id: Some(self.counterparty_node_id),
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast }],
channel_id: Some(self.channel_id()),
}))
} else { None }
} else { None };
Expand Down Expand Up @@ -2777,6 +2778,7 @@ impl<SP: Deref> Channel<SP> where
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
payment_preimage: payment_preimage_arg.clone(),
}],
channel_id: Some(self.context.channel_id()),
};

if self.context.channel_state.should_force_holding_cell() {
Expand Down Expand Up @@ -3515,7 +3517,8 @@ impl<SP: Deref> Channel<SP> where
htlc_outputs: htlcs_and_sigs,
claimed_htlcs,
nondust_htlc_sources,
}]
}],
channel_id: Some(self.context.channel_id()),
};

self.context.cur_holder_commitment_transaction_number -= 1;
Expand Down Expand Up @@ -3591,6 +3594,7 @@ impl<SP: Deref> Channel<SP> where
update_id: self.context.latest_monitor_update_id + 1, // We don't increment this yet!
counterparty_node_id: Some(self.context.counterparty_node_id),
updates: Vec::new(),
channel_id: Some(self.context.channel_id()),
};

let mut htlc_updates = Vec::new();
Expand Down Expand Up @@ -3769,6 +3773,7 @@ impl<SP: Deref> Channel<SP> where
idx: self.context.cur_counterparty_commitment_transaction_number + 1,
secret: msg.per_commitment_secret,
}],
channel_id: Some(self.context.channel_id()),
};

// Update state now that we've passed all the can-fail calls...
Expand Down Expand Up @@ -4826,6 +4831,7 @@ impl<SP: Deref> Channel<SP> where
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
channel_id: Some(self.context.channel_id()),
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.push_ret_blockable_mon_update(monitor_update)
Expand Down Expand Up @@ -5965,7 +5971,8 @@ impl<SP: Deref> Channel<SP> where
feerate_per_kw: Some(counterparty_commitment_tx.feerate_per_kw()),
to_broadcaster_value_sat: Some(counterparty_commitment_tx.to_broadcaster_value_sat()),
to_countersignatory_value_sat: Some(counterparty_commitment_tx.to_countersignatory_value_sat()),
}]
}],
channel_id: Some(self.context.channel_id()),
};
self.context.channel_state.set_awaiting_remote_revoke();
monitor_update
Expand Down Expand Up @@ -6159,6 +6166,7 @@ impl<SP: Deref> Channel<SP> where
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
channel_id: Some(self.context.channel_id()),
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.push_ret_blockable_mon_update(monitor_update)
Expand Down
19 changes: 9 additions & 10 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2307,7 +2307,7 @@ macro_rules! handle_new_monitor_update {
in_flight_updates.push($update);
in_flight_updates.len() - 1
});
let update_res = $self.chain_monitor.update_channel($funding_txo, $channel_id, &in_flight_updates[idx]);
let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]);
handle_new_monitor_update!($self, update_res, $chan, _internal,
{
let _ = in_flight_updates.remove(idx);
Expand Down Expand Up @@ -2864,12 +2864,12 @@ where
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
if let Some((_, funding_txo, channel_id, monitor_update)) = shutdown_res.monitor_update {
if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update {
// There isn't anything we can do if we get an update failure - we're already
// force-closing. The monitor update on the required in-memory copy should broadcast
// the latest local state, which is the best we can do anyway. Thus, it is safe to
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, channel_id, &monitor_update);
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
let mut shutdown_results = Vec::new();
if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid {
Expand Down Expand Up @@ -3404,7 +3404,6 @@ where
return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()});
}
let funding_txo = chan.context.get_funding_txo().unwrap();
let channel_id = chan.context.channel_id();
let logger = WithChannelContext::from(&self.logger, &chan.context);
let send_res = chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(),
htlc_cltv, HTLCSource::OutboundRoute {
Expand Down Expand Up @@ -4755,10 +4754,10 @@ where

for event in background_events.drain(..) {
match event {
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, channel_id, update)) => {
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, _channel_id, update)) => {
// The channel has already been closed, so no use bothering to care about the
// monitor updating completing.
let _ = self.chain_monitor.update_channel(funding_txo, channel_id, &update);
let _ = self.chain_monitor.update_channel(funding_txo, &update);
},
BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => {
let mut updated_chan = false;
Expand All @@ -4783,7 +4782,7 @@ where
}
if !updated_chan {
// TODO: Track this as in-flight even though the channel is closed.
let _ = self.chain_monitor.update_channel(funding_txo, channel_id, &update);
let _ = self.chain_monitor.update_channel(funding_txo, &update);
}
},
BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
Expand Down Expand Up @@ -5645,14 +5644,13 @@ where
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
payment_preimage,
}],
channel_id: Some(prev_hop.channel_id),
};

let prev_hop_channel_id = prev_hop.channel_id;

if !during_init {
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, prev_hop_channel_id, &preimage_update);
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
Expand Down Expand Up @@ -10459,6 +10457,7 @@ where
update_id: CLOSED_CHANNEL_UPDATE_ID,
counterparty_node_id: None,
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
channel_id: Some(monitor.channel_id()),
};
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, channel_id, monitor_update)));
}
Expand Down
12 changes: 5 additions & 7 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8470,7 +8470,6 @@ fn test_update_err_monitor_lockdown() {
// Create some initial channel
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1);
let outpoint = OutPoint { txid: chan_1.3.txid(), index: 0 };
let channel_id = chan_1.2;

// Rebalance the network to generate htlc in the two directions
send_payment(&nodes[0], &vec!(&nodes[1])[..], 10_000_000);
Expand Down Expand Up @@ -8513,8 +8512,8 @@ fn test_update_err_monitor_lockdown() {
let mut node_0_peer_state_lock;
if let ChannelPhase::Funded(ref mut channel) = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2) {
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, channel_id, &update), ChannelMonitorUpdateStatus::InProgress);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, channel_id, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::InProgress);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
} else {
assert!(false);
Expand All @@ -8541,7 +8540,6 @@ fn test_concurrent_monitor_claim() {
// Create some initial channel
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1);
let outpoint = OutPoint { txid: chan_1.3.txid(), index: 0 };
let channel_id = chan_1.2;

// Rebalance the network to generate htlc in the two directions
send_payment(&nodes[0], &vec!(&nodes[1])[..], 10_000_000);
Expand Down Expand Up @@ -8617,9 +8615,9 @@ fn test_concurrent_monitor_claim() {
if let ChannelPhase::Funded(ref mut channel) = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2) {
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Watchtower Alice should already have seen the block and reject the update
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, channel_id, &update), ChannelMonitorUpdateStatus::InProgress);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, channel_id, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, channel_id, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::InProgress);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
} else {
assert!(false);
Expand Down
5 changes: 3 additions & 2 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,13 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
self.chain_monitor.watch_channel(funding_txo, new_monitor)
}

fn update_channel(&self, funding_txo: OutPoint, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
// Every monitor update should survive roundtrip
let mut w = TestVecWriter(Vec::new());
update.write(&mut w).unwrap();
assert!(channelmonitor::ChannelMonitorUpdate::read(
&mut io::Cursor::new(&w.0)).unwrap() == *update);
let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo));

self.monitor_updates.lock().unwrap().entry(channel_id).or_insert(Vec::new()).push(update.clone());

Expand All @@ -366,7 +367,7 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {

self.latest_monitor_update_id.lock().unwrap().insert(channel_id,
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(update)));
let update_res = self.chain_monitor.update_channel(funding_txo, channel_id, update);
let update_res = self.chain_monitor.update_channel(funding_txo, update);
// At every point where we get a monitor update, we should be able to send a useful monitor
// to a watchtower and disk...
let monitor = self.chain_monitor.get_monitor(funding_txo).unwrap();
Expand Down

0 comments on commit 8930f59

Please sign in to comment.