Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wallet connectivity improvements #6

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ impl WalletEventMonitor {
result = transaction_service_events.recv() => {
match result {
Ok(msg) => {
trace!(
target: LOG_TARGET,
"Wallet Event Monitor received wallet transaction service event {:?}",
msg
);
trace!(target: LOG_TARGET, "Wallet transaction service event {:?}", msg);
self.app_state_inner.write().await.add_event(EventListItem{
event_type: "TransactionEvent".to_string(),
desc: (*msg).to_string()
Expand Down Expand Up @@ -184,27 +180,17 @@ impl WalletEventMonitor {
}
},
Ok(_) = connectivity_status.changed() => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity status changed");
trace!(
target: LOG_TARGET,
"Wallet connectivity status changed to {:?}",
connectivity_status.borrow().clone()
);
self.trigger_peer_state_refresh().await;
},
// Ok(_) = software_update_notif.changed() => {
// trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet auto update status changed");
// let update = software_update_notif.borrow().as_ref().cloned();
// if let Some(update) = update {
// self.add_notification(format!(
// "Version {} of the {} is available: {} (sha: {})",
// update.version(),
// update.app(),
// update.download_url(),
// update.to_hash_hex()
// )).await;
// }
// },
result = network_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity event {:?}", msg
);
result = network_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet connectivity event {:?}", msg);
match msg {
NetworkEvent::PeerConnected{..} |
NetworkEvent::PeerDisconnected{..} => {
Expand Down Expand Up @@ -244,14 +230,16 @@ impl WalletEventMonitor {
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer.get_current_peer().clone()).await;
let current_peer = peer.get_current_peer().clone();
trace!(target: LOG_TARGET, "Base node changed to '{}'", current_peer.peer_id());
self.trigger_base_node_peer_refresh(current_peer).await;
self.trigger_balance_refresh();
}
}
result = base_node_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg);
trace!(target: LOG_TARGET, "Base node event {:?}", msg);
if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() {
self.trigger_base_node_state_refresh(state).await;
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Display for ContactsLivenessData {
f,
"Liveness event '{}' for contact {} ({}) {}",
self.message_type,
self.address,
self.address.to_hex(),
self.peer_id,
if let Some(time) = self.last_seen {
let local_time = DateTime::<Local>::from_naive_utc_and_offset(time, Local::now().offset().to_owned())
Expand Down
2 changes: 2 additions & 0 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct P2pConfig {
deserialize_with = "deserialize_from_str",
serialize_with = "serialize_string"
)]
/// If set to `Private`, the node assume it has no public address and will try to establish a relay connection as
/// soon as possible. `Auto` will use auto NAT to try determine this automatically.
pub reachability_mode: ReachabilityMode,
/// The global maximum allowed RPC sessions.
/// Default: 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@
use std::{
fmt::Display,
sync::{atomic, atomic::AtomicUsize, Arc},
time::{Duration, Instant},
};

use tari_network::{identity::PeerId, Peer};

use crate::connectivity_service::WalletConnectivityError;

/// The selected peer is a current base node and an optional list of backup peers.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BaseNodePeerManager {
// The current base node that the wallet is connected to
current_peer_index: Arc<AtomicUsize>,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Arc<Vec<Peer>>,
local_last_connection_attempt: Option<Instant>,
}

impl BaseNodePeerManager {
Expand All @@ -57,7 +55,6 @@ impl BaseNodePeerManager {
Ok(Self {
current_peer_index: Arc::new(AtomicUsize::new(preferred_peer_index)),
peer_list: Arc::new(peer_list),
local_last_connection_attempt: None,
})
}

Expand All @@ -66,13 +63,6 @@ impl BaseNodePeerManager {
self.get_current_peer().peer_id()
}

pub fn select_next_peer_if_attempted(&mut self) -> &Peer {
if self.time_since_last_connection_attempt().is_some() {
self.select_next_peer();
}
self.get_current_peer()
}

/// Get the current peer.
pub fn get_current_peer(&self) -> &Peer {
self.peer_list
Expand All @@ -84,10 +74,6 @@ impl BaseNodePeerManager {
/// Changes to the next peer in the list, returning that peer
pub fn select_next_peer(&mut self) -> &Peer {
self.set_current_peer_index((self.current_peer_index() + 1) % self.peer_list.len());
if self.peer_list.len() > 1 {
// Reset the last attempt since we've moved onto another peer
self.local_last_connection_attempt = None;
}
&self.peer_list[self.current_peer_index()]
}

Expand All @@ -100,16 +86,6 @@ impl BaseNodePeerManager {
(self.current_peer_index(), &self.peer_list)
}

/// Set the last connection attempt stats
pub fn set_last_connection_attempt(&mut self) {
self.local_last_connection_attempt = Some(Instant::now());
}

/// Get the last connection attempt for the current peer
pub fn time_since_last_connection_attempt(&self) -> Option<Duration> {
self.local_last_connection_attempt.as_ref().map(|t| t.elapsed())
}

fn set_current_peer_index(&self, index: usize) {
self.current_peer_index.store(index, atomic::Ordering::SeqCst);
}
Expand All @@ -121,15 +97,10 @@ impl BaseNodePeerManager {

impl Display for BaseNodePeerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let last_connection_attempt = match self.time_since_last_connection_attempt() {
Some(stats) => format!("{:?}", stats.as_secs()),
None => "Never".to_string(),
};
write!(
f,
"BaseNodePeerManager {{ current index: {}, last attempt (s): {}, peer list: {} entries }}",
"BaseNodePeerManager {{ current index: {}, peer list: {} entries }}",
self.current_peer_index(),
last_connection_attempt,
self.peer_list.len()
)
}
Expand Down
62 changes: 24 additions & 38 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct WalletConnectivityService {
current_pool: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
last_attempted_peer: Option<PeerId>,
}

struct ClientPoolContainer {
Expand Down Expand Up @@ -100,6 +101,7 @@ impl WalletConnectivityService {
current_pool: None,
pending_requests: Vec::new(),
online_status_watch,
last_attempted_peer: None,
}
}

Expand Down Expand Up @@ -287,6 +289,7 @@ impl WalletConnectivityService {
}

async fn disconnect_base_node(&mut self, peer_id: PeerId) {
trace!(target: LOG_TARGET, "Disconnecting base node '{}'...", peer_id);
if let Some(pool) = self.current_pool.take() {
pool.close().await;
}
Expand All @@ -296,27 +299,18 @@ impl WalletConnectivityService {
}

async fn setup_base_node_connection(&mut self, mut peer_manager: BaseNodePeerManager) {
let mut peer = peer_manager.select_next_peer_if_attempted().clone();
let peer_id = peer.peer_id();
let mut peer = if self.last_attempted_peer.is_some() {
peer_manager.select_next_peer().clone()
} else {
peer_manager.get_current_peer().clone()
};

loop {
self.set_online_status(OnlineStatus::Connecting);
let maybe_last_attempt = peer_manager.time_since_last_connection_attempt();

debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer '{}'... (last attempt {:?})",
peer,
maybe_last_attempt
);

peer_manager.set_last_connection_attempt();

match self.try_setup_rpc_pool(&peer).await {
Ok(true) => {
if let Err(e) = self.notify_pending_requests().await {
warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e);
}
self.base_node_watch.send(Some(peer_manager.clone()));
self.notify_pending_requests().await;
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
Expand All @@ -329,45 +323,31 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"The peer has changed while connecting. Attempting to connect to new base node."
);

// NOTE: we do not strictly need to update our local copy of BaseNodePeerManager since state is
// atomically shared. However, since None is a possibility (although in practice
// it should never be) we handle that here.
peer_manager = match self.get_base_node_peer_manager() {
Some(pm) => pm,
None => {
warn!(target: LOG_TARGET, "⚠️ NEVER HAPPEN: Base node peer manager set to None while connecting");
return;
},
};
self.disconnect_base_node(peer_id).await;
self.disconnect_base_node(peer.peer_id()).await;
self.set_online_status(OnlineStatus::Offline);
continue;
return;
},
Err(WalletConnectivityError::DialError(DialError::Aborted)) => {
debug!(target: LOG_TARGET, "Dial was cancelled.");
self.disconnect_base_node(peer_id).await;
self.disconnect_base_node(peer.peer_id()).await;
self.set_online_status(OnlineStatus::Offline);
},
Err(e) => {
warn!(target: LOG_TARGET, "{}", e);
self.disconnect_base_node(peer_id).await;
self.disconnect_base_node(peer.peer_id()).await;
self.set_online_status(OnlineStatus::Offline);
},
}

// Select the next peer (if available)
let next_peer = peer_manager.select_next_peer().clone();
// If we only have one peer in the list, wait a bit before retrying
if peer_id == next_peer.peer_id() {
if peer.peer_id() == next_peer.peer_id() {
debug!(target: LOG_TARGET,
"Only single peer in base node peer list. Waiting {}s before retrying again ...",
CONNECTIVITY_WAIT.as_secs()
);
time::sleep(CONNECTIVITY_WAIT).await;
} else {
// Ensure that all services are aware of the next peer being attempted
self.base_node_watch.mark_changed();
}
peer = next_peer;
}
Expand All @@ -381,6 +361,7 @@ impl WalletConnectivityService {
}

async fn try_setup_rpc_pool(&mut self, peer: &Peer) -> Result<bool, WalletConnectivityError> {
self.last_attempted_peer = Some(peer.peer_id());
let peer_id = peer.peer_id();
let dial_wait = self
.network_handle
Expand Down Expand Up @@ -434,16 +415,21 @@ impl WalletConnectivityService {
Ok(true)
}

async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> {
async fn notify_pending_requests(&mut self) {
let current_pending = mem::take(&mut self.pending_requests);
let mut count = 0;
let current_pending_len = current_pending.len();
for reply in current_pending {
if reply.is_canceled() {
continue;
}

count += 1;
trace!(target: LOG_TARGET, "Handle {} of {} pending RPC pool requests", count, current_pending_len);
self.handle_pool_request(reply).await;
}
Ok(())
if !self.pending_requests.is_empty() {
warn!(target: LOG_TARGET, "{} of {} pending RPC pool requests not handled", count, current_pending_len);
}
}
}

Expand Down
16 changes: 0 additions & 16 deletions base_layer/wallet/src/util/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,6 @@ impl<T> Watch<T> {
&self.1
}

fn receiver_mut(&mut self) -> &mut watch::Receiver<T> {
&mut self.1
}

/// Marks the state as changed.
///
/// After invoking this method [`has_changed()`](Self::has_changed)
/// returns `true` and [`changed()`](Self::changed) returns
/// immediately, regardless of whether a new value has been sent.
///
/// This is useful for triggering an initial change notification after
/// subscribing to synchronize new receivers.
pub fn mark_changed(&mut self) {
self.receiver_mut().mark_changed();
}

pub fn get_receiver(&self) -> watch::Receiver<T> {
self.receiver().clone()
}
Expand Down
8 changes: 7 additions & 1 deletion common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ blockchain_sync_config.rpc_deadline = 240
# _NOTE_: If using the `tor` transport type, public_addresses will be ignored and an onion address will be
# automatically configured
#public_addresses = ["/ip4/172.2.3.4/tcp/18189",]
# The multiaddrs to listen on.
# Default: ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"]
#listen_addresses = ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"]
# If set to `Private`, the node assume it has no public address and will try to establish a relay connection as
# soon as possible. `Auto` will use auto NAT to try determine this automatically.
#reachability_mode = auto

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
Expand Down Expand Up @@ -163,4 +169,4 @@ blockchain_sync_config.rpc_deadline = 240
# If true, and the maximum per peer RPC sessions is reached, the RPC server will close an old session and replace it
# with a new session. If false, the RPC server will reject the new session and preserve the older session.
# (default value = true).
#pub cull_oldest_peer_rpc_connection_on_full = true
#cull_oldest_peer_rpc_connection_on_full = true
8 changes: 7 additions & 1 deletion common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ event_channel_size = 3500
# _NOTE_: If using the `tor` transport type, public_address will be ignored and an onion address will be
# automatically configured
#public_addresses = ["/ip4/172.2.3.4/tcp/18188",]
# The multiaddrs to listen on.
# Default: ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"]
#listen_addresses = ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"]
# If set to `Private`, the node will not be used as a public relay
# If set to `Private`, the node assume it has no public address and will try to establish a relay connection as
# soon as possible. `Auto` will use auto NAT to try determine this automatically.
#reachability_mode = private

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
Expand Down Expand Up @@ -205,7 +212,6 @@ event_channel_size = 3500
#rpc_max_simultaneous_sessions = 100
# The maximum comms RPC sessions allowed per peer (default value = 10).
#rpc_max_sessions_per_peer = 10
#rpc_max_sessions_per_peer = 10
# If true, and the maximum per peer RPC sessions is reached, the RPC server will close an old session and replace it
# with a new session. If false, the RPC server will reject the new session and preserve the older session.
# (default value = true).
Expand Down
Loading
Loading