Skip to content

Commit

Permalink
Merge pull request #265 from tnull/2024-02-fix-peer-reconnection
Browse files Browse the repository at this point in the history
Reconnect all persisted peers on restart
  • Loading branch information
tnull authored Mar 1, 2024
2 parents afd67d6 + b1bee0f commit 71a3e32
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 37 deletions.
3 changes: 1 addition & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,11 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
},
};

let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
let (stop_sender, _) = tokio::sync::watch::channel(());

Ok(Node {
runtime,
stop_sender,
stop_receiver,
config,
wallet,
tx_sync,
Expand Down
62 changes: 27 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ uniffi::include_scaffolding!("ldk_node");
pub struct Node<K: KVStore + Sync + Send + 'static> {
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
stop_sender: tokio::sync::watch::Sender<()>,
stop_receiver: tokio::sync::watch::Receiver<()>,
config: Arc<Config>,
wallet: Arc<Wallet>,
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
Expand Down Expand Up @@ -247,7 +246,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
.onchain_wallet_sync_interval_secs
Expand Down Expand Up @@ -288,7 +287,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
);
});

let mut stop_fee_updates = self.stop_receiver.clone();
let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
Expand Down Expand Up @@ -331,7 +330,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
Expand Down Expand Up @@ -369,7 +368,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let mut stop_gossip_sync = self.stop_receiver.clone();
let mut stop_gossip_sync = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
loop {
Expand Down Expand Up @@ -412,7 +411,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_receiver.clone();
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
Expand Down Expand Up @@ -462,12 +461,11 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});
}

// Regularly reconnect to channel peers.
let connect_cm = Arc::clone(&self.channel_manager);
// Regularly reconnect to persisted peers.
let connect_pm = Arc::clone(&self.peer_manager);
let connect_logger = Arc::clone(&self.logger);
let connect_peer_store = Arc::clone(&self.peer_store);
let mut stop_connect = self.stop_receiver.clone();
let mut stop_connect = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -482,29 +480,23 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
.iter()
.map(|(peer, _addr)| *peer)
.collect::<Vec<_>>();
for node_id in connect_cm
.list_channels()
.iter()
.map(|chan| chan.counterparty.node_id)
.filter(|id| !pm_peers.contains(id))
{
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
let res = do_connect_peer(
peer_info.node_id,
peer_info.address,
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
).await;
match res {
Ok(_) => {
log_info!(connect_logger, "Successfully reconnected to peer {}", node_id);
},
Err(e) => {
log_error!(connect_logger, "Failed to reconnect to peer {}: {}", node_id, e);
}
}
}

for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
let res = do_connect_peer(
peer_info.node_id,
peer_info.address.clone(),
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
).await;
match res {
Ok(_) => {
log_info!(connect_logger, "Successfully reconnected to peer {}", peer_info.node_id);
},
Err(e) => {
log_error!(connect_logger, "Failed to reconnect to peer {}: {}", peer_info.node_id, e);
}
}
}
}
}
}
Expand All @@ -516,7 +508,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let mut stop_bcast = self.stop_receiver.clone();
let mut stop_bcast = self.stop_sender.subscribe();
runtime.spawn(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
let mut interval = tokio::time::interval(Duration::from_secs(30));
Expand Down Expand Up @@ -572,7 +564,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}
});

let mut stop_tx_bcast = self.stop_receiver.clone();
let mut stop_tx_bcast = self.stop_sender.subscribe();
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
runtime.spawn(async move {
// Every second we try to clear our broadcasting queue.
Expand Down Expand Up @@ -613,7 +605,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let background_logger = Arc::clone(&self.logger);
let background_error_logger = Arc::clone(&self.logger);
let background_scorer = Arc::clone(&self.scorer);
let stop_bp = self.stop_receiver.clone();
let stop_bp = self.stop_sender.subscribe();
let sleeper = move |d| {
let mut stop = stop_bp.clone();
Box::pin(async move {
Expand Down Expand Up @@ -650,7 +642,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let mut stop_liquidity_handler = self.stop_receiver.clone();
let mut stop_liquidity_handler = self.stop_sender.subscribe();
let liquidity_handler = Arc::clone(&liquidity_source);
runtime.spawn(async move {
loop {
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ pub(crate) fn do_channel_full_cycle<K: KVStore + Sync + Send, E: ElectrumApi>(
.unwrap();

assert_eq!(node_a.list_peers().first().unwrap().node_id, node_b.node_id());
assert!(node_a.list_peers().first().unwrap().is_persisted);
let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id());
let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id());
assert_eq!(funding_txo_a, funding_txo_b);
Expand Down
52 changes: 52 additions & 0 deletions tests/integration_tests_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,55 @@ fn sign_verify_msg() {
let pkey = node.node_id();
assert!(node.verify_signature(msg, sig.as_str(), &pkey));
}

#[test]
fn connection_restart_behavior() {
do_connection_restart_behavior(true);
do_connection_restart_behavior(false);
}

fn do_connection_restart_behavior(persist: bool) {
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
let (node_a, node_b) = setup_two_nodes(&electrsd, false);

let node_id_a = node_a.node_id();
let node_id_b = node_b.node_id();

let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
std::thread::sleep(std::time::Duration::from_secs(1));
node_a.connect(node_id_b, node_addr_b, persist).unwrap();

let peer_details_a = node_a.list_peers().first().unwrap().clone();
assert_eq!(peer_details_a.node_id, node_id_b);
assert_eq!(peer_details_a.is_persisted, persist);
assert!(peer_details_a.is_connected);

let peer_details_b = node_b.list_peers().first().unwrap().clone();
assert_eq!(peer_details_b.node_id, node_id_a);
assert_eq!(peer_details_b.is_persisted, false);
assert!(peer_details_a.is_connected);

// Restart nodes.
node_a.stop().unwrap();
node_b.stop().unwrap();
node_b.start().unwrap();
node_a.start().unwrap();

// Sleep a bit to allow for the reconnect to happen.
std::thread::sleep(std::time::Duration::from_secs(5));

if persist {
let peer_details_a = node_a.list_peers().first().unwrap().clone();
assert_eq!(peer_details_a.node_id, node_id_b);
assert_eq!(peer_details_a.is_persisted, persist);
assert!(peer_details_a.is_connected);

let peer_details_b = node_b.list_peers().first().unwrap().clone();
assert_eq!(peer_details_b.node_id, node_id_a);
assert_eq!(peer_details_b.is_persisted, false);
assert!(peer_details_a.is_connected);
} else {
assert!(node_a.list_peers().is_empty());
assert!(node_b.list_peers().is_empty());
}
}

0 comments on commit 71a3e32

Please sign in to comment.