diff --git a/CHANGELOG.md b/CHANGELOG.md index c819a70c4..6c630914a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,35 @@ --> +## [Unreleased] + +### Summary + +### Breaking changes + +* pool: change `Relay::connect` method signature ([Yuki Kishimoto]) + +### Changed + +* pool: update `Error::WebSocket` variant inner type ([Yuki Kishimoto]) + +### Added + +* pool: add `Relay::try_connect` ([Yuki Kishimoto]) +* pool: add `Relay::wait_for_connection` ([Yuki Kishimoto]) +* pool: add `RelayPool::try_connect` ([Yuki Kishimoto]) +* pool: add `RelayPool::try_connect_relay` ([Yuki Kishimoto]) +* pool: add `RelayPool::wait_for_connection` ([Yuki Kishimoto]) +* sdk: add `Client::try_connect` ([Yuki Kishimoto]) +* sdk: add `Client::try_connect_relay` ([Yuki Kishimoto]) +* sdk: add `Client::wait_for_connection` ([Yuki Kishimoto]) + +### Fixed + +### Removed + +### Deprecated + ## [v0.38.0] - 2024/12/31 ### Summary diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index 808f35ed5..6096068d3 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -191,12 +191,22 @@ impl Client { self.inner.connect().await } - /// Connect to all added relays + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + + /// Try to establish a connection with the relays. /// - /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. - /// The code continues if the `timeout` is reached or if all relays connect. - pub async fn connect_with_timeout(&self, timeout: Duration) { - self.inner.connect_with_timeout(timeout).await + /// Attempts to establish a connection without spawning the connection task if it fails. + /// This means that if the connection fails, no automatic retries are scheduled. + /// Use [`Client::connect`] if you want to immediately spawn a connection task, + /// regardless of whether the initial connection succeeds. + pub async fn try_connect(&self, timeout: Duration) -> Output { + self.inner.try_connect(timeout).await.into() } pub async fn disconnect(&self) -> Result<()> { diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index 1c89cfd04..d959deb2d 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -200,9 +200,20 @@ impl Relay { // TODO: add notifications - /// Connect to relay and keep alive connection - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in `RelayOptions`. + pub async fn try_connect(&self, timeout: Duration) -> Result<()> { + Ok(self.inner.try_connect(timeout).await?) } /// Disconnect from relay and set status to 'Terminated' diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 78a292545..3f59edfbe 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -224,14 +224,24 @@ impl JsClient { self.inner.connect().await } - /// Connect to all added relays + /// Waits for relays connections /// - /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. - /// The code continues if the `timeout` is reached or if all relays connect. - #[inline] - #[wasm_bindgen(js_name = connectWithTimeout)] - pub async fn connect_with_timeout(&self, timeout: &JsDuration) { - self.inner.connect_with_timeout(**timeout).await + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[wasm_bindgen(js_name = waitForConnection)] + pub async fn wait_for_connection(&self, timeout: &JsDuration) { + self.inner.wait_for_connection(**timeout).await + } + + /// Try to establish a connection with the relays. + /// + /// Attempts to establish a connection without spawning the connection task if it fails. + /// This means that if the connection fails, no automatic retries are scheduled. + /// Use [`Client::connect`] if you want to immediately spawn a connection task, + /// regardless of whether the initial connection succeeds. + #[wasm_bindgen(js_name = tryConnect)] + pub async fn try_connect(&self, timeout: &JsDuration) -> JsOutput { + self.inner.try_connect(**timeout).await.into() } /// Disconnect from all relays diff --git a/bindings/nostr-sdk-js/src/relay/mod.rs b/bindings/nostr-sdk-js/src/relay/mod.rs index 567c38172..3fd9571de 100644 --- a/bindings/nostr-sdk-js/src/relay/mod.rs +++ b/bindings/nostr-sdk-js/src/relay/mod.rs @@ -158,9 +158,21 @@ impl JsRelay { self.inner.queue() as u64 } - /// Connect to relay and keep alive connection - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout.map(|d| *d)).await + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in `RelayOptions`. + #[wasm_bindgen(js_name = tryConnect)] + pub async fn try_connect(&self, timeout: &JsDuration) -> Result<()> { + self.inner.try_connect(**timeout).await.map_err(into_err) } /// Disconnect from relay and set status to 'Terminated' diff --git a/crates/nostr-cli/src/main.rs b/crates/nostr-cli/src/main.rs index df3c41faa..a701ef689 100644 --- a/crates/nostr-cli/src/main.rs +++ b/crates/nostr-cli/src/main.rs @@ -177,7 +177,7 @@ async fn handle_command(command: ShellCommand, client: &Client) -> Result<()> { println!("Connecting to relays..."); // Connect and wait for connection - client.connect_with_timeout(Duration::from_secs(60)).await; + client.try_connect(Duration::from_secs(60)).await; relays.clone() } else { diff --git a/crates/nostr-connect/src/client.rs b/crates/nostr-connect/src/client.rs index 7e6deac0d..de9890449 100644 --- a/crates/nostr-connect/src/client.rs +++ b/crates/nostr-connect/src/client.rs @@ -120,7 +120,7 @@ impl NostrConnect { } // Connect to relays - self.pool.connect(None).await; + self.pool.connect().await; // Subscribe let notifications = self.subscribe().await?; diff --git a/crates/nostr-connect/src/signer.rs b/crates/nostr-connect/src/signer.rs index b1c139552..492a3c927 100644 --- a/crates/nostr-connect/src/signer.rs +++ b/crates/nostr-connect/src/signer.rs @@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; use nostr::nips::nip46::{Message, Request, ResponseResult}; use nostr_relay_pool::prelude::*; @@ -130,7 +129,7 @@ impl NostrConnectRemoteSigner { } // Connect - self.pool.connect(Some(Duration::from_secs(10))).await; + self.pool.connect().await; let filter = Filter::new() .pubkey(self.keys.signer.public_key()) diff --git a/crates/nostr-relay-pool/examples/pool.rs b/crates/nostr-relay-pool/examples/pool.rs index c8dfdf4f6..0abc9108d 100644 --- a/crates/nostr-relay-pool/examples/pool.rs +++ b/crates/nostr-relay-pool/examples/pool.rs @@ -19,7 +19,7 @@ async fn main() -> Result<()> { pool.add_relay("wss://relay.damus.io", RelayOptions::default()) .await?; - pool.connect(None).await; + pool.connect().await; let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap(); pool.send_event(event).await?; diff --git a/crates/nostr-relay-pool/src/pool/constants.rs b/crates/nostr-relay-pool/src/pool/constants.rs index 798fba7aa..f31f7291b 100644 --- a/crates/nostr-relay-pool/src/pool/constants.rs +++ b/crates/nostr-relay-pool/src/pool/constants.rs @@ -4,7 +4,5 @@ //! Constants -pub(super) const MAX_CONNECTING_CHUNK: usize = 100; - /// Relay Pool default notification channel size pub const DEFAULT_NOTIFICATION_CHANNEL_SIZE: usize = 4096; diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index b75cf80cb..44755a2aa 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -4,7 +4,6 @@ //! Relay Pool -use std::cmp; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -16,7 +15,6 @@ use atomic_destructor::AtomicDestroyer; use nostr_database::prelude::*; use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard}; -use super::constants::MAX_CONNECTING_CHUNK; use super::options::RelayPoolOptions; use super::{Error, Output, RelayPoolNotification}; use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions}; @@ -922,34 +920,62 @@ impl InnerRelayPool { Ok(ReceiverStream::new(rx)) } - pub async fn connect(&self, connection_timeout: Option) { + pub async fn connect(&self) { // Lock with read shared access let relays = self.atomic.relays.read().await; + // Connect + for relay in relays.values() { + relay.connect() + } + } + + pub async fn wait_for_connection(&self, timeout: Duration) { + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Compose futures + let mut futures = Vec::with_capacity(relays.len()); + for relay in relays.values() { + futures.push(relay.wait_for_connection(timeout)); + } + + // Join futures + future::join_all(futures).await; + } + + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + let mut urls: Vec = Vec::with_capacity(relays.len()); let mut futures = Vec::with_capacity(relays.len()); + let mut output: Output<()> = Output::default(); // Filter only relays that can connect and compose futures for relay in relays.values().filter(|r| r.status().can_connect()) { - futures.push(relay.connect(connection_timeout)); + urls.push(relay.url().clone()); + futures.push(relay.try_connect(timeout)); } - // Check number of futures - if futures.len() <= MAX_CONNECTING_CHUNK { - future::join_all(futures).await; - return; - } + // TODO: use semaphore to limit number concurrent connections? - tracing::warn!( - "Too many relays ({}). Connecting in chunks of {MAX_CONNECTING_CHUNK} relays...", - futures.len() - ); + // Join futures + let list = future::join_all(futures).await; - // Join in chunks - while !futures.is_empty() { - let upper: usize = cmp::min(MAX_CONNECTING_CHUNK, futures.len()); - let chunk = futures.drain(..upper); - future::join_all(chunk).await; + // Iterate results and compose output + for (url, result) in urls.into_iter().zip(list.into_iter()) { + match result { + Ok(..) => { + output.success.insert(url); + } + Err(e) => { + output.failed.insert(url, e.to_string()); + } + } } + + output } pub async fn disconnect(&self) -> Result<(), Error> { @@ -964,11 +990,7 @@ impl InnerRelayPool { Ok(()) } - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, @@ -983,7 +1005,27 @@ impl InnerRelayPool { let relay: &Relay = self.internal_relay(&relays, &url)?; // Connect - relay.connect(connection_timeout).await; + relay.connect(); + + Ok(()) + } + + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + // Convert url + let url: RelayUrl = url.try_into_url()?; + + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Get relay + let relay: &Relay = self.internal_relay(&relays, &url)?; + + // Try to connect + relay.try_connect(timeout).await?; Ok(()) } diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index f3df7a2df..1c85629a5 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -251,10 +251,32 @@ impl RelayPool { self.inner.remove_all_relays(true).await } - /// Connect to all added relays and keep connection alive + /// Connect to all added relays #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn connect(&self) { + self.inner.connect().await + } + + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + + /// Try to establish a connection with the relays. + /// + /// Attempts to establish a connection without spawning the connection task if it fails. + /// This means that if the connection fails, no automatic retries are scheduled. + /// Use [`RelayPool::connect`] if you want to immediately spawn a connection task, + /// regardless of whether the initial connection succeeds. + /// + /// For further details, see the documentation of [`Relay::try_connect`]. + #[inline] + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + self.inner.try_connect(timeout).await } /// Disconnect from all relays @@ -263,18 +285,30 @@ impl RelayPool { self.inner.disconnect().await } - /// Connect to relay + /// Connect to a previously added relay + /// + /// This method doesn't provide any information on if the connection was successful or not. + /// + /// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool. + #[inline] + pub async fn connect_relay(&self, url: U) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + self.inner.connect_relay(url).await + } + + /// Try to connect to a previously added relay + /// + /// For further details, see the documentation of [`Relay::try_connect`]. #[inline] - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { - self.inner.connect_relay(url, connection_timeout).await + self.inner.try_connect_relay(url, timeout).await } /// Disconnect relay @@ -633,7 +667,7 @@ mod tests { pool.add_relay(&url, RelayOptions::default()).await.unwrap(); - pool.connect(None).await; + pool.connect().await; assert!(!pool.inner.is_shutdown()); diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index 6f030c460..a9ba57b2b 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -5,6 +5,7 @@ use std::fmt; use std::time::Duration; +use async_wsocket::Error as WsError; use nostr::event; use nostr::event::builder; use nostr::message::MessageHandleError; @@ -18,7 +19,7 @@ use crate::RelayPoolNotification; #[derive(Debug)] pub enum Error { /// WebSocket error - WebSocket(Box), + WebSocket(WsError), /// Shared state error SharedState(SharedStateError), /// MessageHandle error @@ -180,13 +181,9 @@ impl fmt::Display for Error { } } -impl Error { - #[inline] - pub(super) fn websocket(error: E) -> Self - where - E: std::error::Error + Send + Sync + 'static, - { - Self::WebSocket(Box::new(error)) +impl From for Error { + fn from(e: WsError) -> Self { + Self::WebSocket(e) } } diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index f4fea8091..6d44add13 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -12,7 +12,9 @@ use std::time::Duration; use async_utility::{task, time}; use async_wsocket::futures_util::{self, Future, SinkExt, StreamExt}; -use async_wsocket::{connect as wsocket_connect, ConnectionMode, Sink, Stream, WsMessage}; +use async_wsocket::{ + connect as wsocket_connect, ConnectionMode, Error as WsError, Sink, Stream, WsMessage, +}; use atomic_destructor::AtomicDestroyer; use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector}; use negentropy_deprecated::{Bytes as BytesDeprecated, Negentropy as NegentropyDeprecated}; @@ -432,8 +434,7 @@ impl InnerRelay { } } - pub async fn connect(&self, connection_timeout: Option) { - // Return if relay can't connect + pub fn connect(&self) { if !self.status().can_connect() { return; } @@ -442,32 +443,53 @@ impl InnerRelay { // Change it to pending to avoid issues with the health check (initialized check) self.set_status(RelayStatus::Pending, false); - // If connection timeout is `Some`, try to connect waiting for connection - match connection_timeout { - Some(timeout) => { - let mut notifications = self.internal_notification_sender.subscribe(); + // Spawn connection task + self.spawn_connection_task(None); + } - // Spawn and try connect - self.spawn_and_try_connect(timeout); + pub async fn wait_for_connection(&self, timeout: Duration) { + let status: RelayStatus = self.status(); - // Wait for status change (connected or disconnected) - tracing::debug!(url = %self.url, "Waiting for status change before continue"); - while let Ok(notification) = notifications.recv().await { - if let RelayNotification::RelayStatus { - status: RelayStatus::Connected | RelayStatus::Disconnected, - } = notification - { - break; - } + // Already connected + if status.is_connected() { + return; + } + + // Subscribe to notifications + let mut notifications = self.internal_notification_sender.subscribe(); + + // Set timeout + time::timeout(Some(timeout), async { + while let Ok(notification) = notifications.recv().await { + // Wait for status change. Break loop when connect. + if let RelayNotification::RelayStatus { + status: RelayStatus::Connected, + } = notification + { + break; } } - None => { - self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT); - } + }) + .await; + } + + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + // Check if relay can't connect + if !self.status().can_connect() { + return Ok(()); } + + // Try to connect + // This will set the status to "terminated" if the connection fails + let stream: (Sink, Stream) = self._try_connect(timeout, RelayStatus::Terminated).await?; + + // Spawn connection task + self.spawn_connection_task(Some(stream)); + + Ok(()) } - fn spawn_and_try_connect(&self, connection_timeout: Duration) { + fn spawn_connection_task(&self, mut stream: Option<(Sink, Stream)>) { if self.is_running() { tracing::warn!(url = %self.url, "Connection task is already running."); return; @@ -482,7 +504,7 @@ impl InnerRelay { let mut rx_service = relay.atomic.channels.rx_service().await; // Last websocket error - // Store it to avoid to print every time the same connection error + // Store it to avoid printing every time the same connection error let mut last_ws_error = None; // Auto-connect loop @@ -493,7 +515,7 @@ impl InnerRelay { tokio::select! { // Connect and run message handler - _ = relay.connect_and_run(connection_timeout, &mut last_ws_error) => {}, + _ = relay.connect_and_run(stream, &mut last_ws_error) => {}, // Handle "terminate" message _ = relay.handle_terminate(&mut rx_service) => { // Update status @@ -504,6 +526,9 @@ impl InnerRelay { } } + // Update stream to `None`, meaning that it was already used (if was some). + stream = None; + // Get status let status: RelayStatus = relay.status(); @@ -557,7 +582,7 @@ impl InnerRelay { /// Depending on attempts and success, use default or incremental retry interval fn calculate_retry_interval(&self) -> Duration { - // Check if incremental interval is enabled + // Check if the incremental interval is enabled if self.opts.adjust_retry_interval { // Calculate the difference between attempts and success let diff: u32 = self.stats.attempts().saturating_sub(self.stats.success()) as u32; @@ -565,7 +590,7 @@ impl InnerRelay { // Calculate multiplier let multiplier: u32 = 1 + (diff / 2); - // Compute adaptive retry interval + // Compute the adaptive retry interval let adaptive_interval: Duration = self.opts.retry_interval * multiplier; // If the interval is too big, use the min one. @@ -608,27 +633,17 @@ impl InnerRelay { } } - /// Connect and run message handler - async fn connect_and_run( + async fn _try_connect( &self, - connection_timeout: Duration, - last_ws_error: &mut Option, - ) { + timeout: Duration, + status_on_failure: RelayStatus, + ) -> Result<(Sink, Stream), WsError> { // Update status self.set_status(RelayStatus::Connecting, true); // Add attempt self.stats.new_attempt(); - // Compose timeout - let timeout: Duration = if self.stats.attempts() > 1 { - // Many attempts, use the default timeout - DEFAULT_CONNECTION_TIMEOUT - } else { - // First attempt, use external timeout - connection_timeout - }; - // Connect match wsocket_connect((&self.url).into(), &self.opts.connection_mode, timeout).await { Ok((ws_tx, ws_rx)) => { @@ -638,37 +653,70 @@ impl InnerRelay { // Increment success stats self.stats.new_success(); - // Request information document - #[cfg(feature = "nip11")] - self.request_nip11_document(); - - // Run message handler - self.run_message_handler(ws_tx, ws_rx).await; + Ok((ws_tx, ws_rx)) } Err(e) => { // Update status - self.set_status(RelayStatus::Disconnected, false); - - // TODO: avoid string allocation. The error is converted to string only to perform the `!=` binary operation. - // Check if error should be logged - let e: String = e.to_string(); - let to_log: bool = match &last_ws_error { - Some(prev_err) => { - // Log only if different from the last one - prev_err != &e - } - None => true, - }; + self.set_status(status_on_failure, false); - // Log error and update the last error - if to_log { - tracing::error!(url = %self.url, error= %e, "Connection failed."); - *last_ws_error = Some(e); - } + // Return error + Err(e) } } } + /// Connect and run message handler + /// + /// If `stream` arg is passed, no connection attempt will be done. + async fn connect_and_run( + &self, + stream: Option<(Sink, Stream)>, + last_ws_error: &mut Option, + ) { + match stream { + // Already have a stream, go to post-connection stage + Some((ws_tx, ws_rx)) => self.post_connection(ws_tx, ws_rx).await, + // No stream is passed, try to connect + // Set the status to "disconnected" to allow to automatic retries + None => match self + ._try_connect(DEFAULT_CONNECTION_TIMEOUT, RelayStatus::Disconnected) + .await + { + // Connection success, go to post-connection stage + Ok((ws_tx, ws_rx)) => self.post_connection(ws_tx, ws_rx).await, + // Error during connection + Err(e) => { + // TODO: avoid string allocation. The error is converted to string only to perform the `!=` binary operation. + // Check if error should be logged + let e: String = e.to_string(); + let to_log: bool = match &last_ws_error { + Some(prev_err) => { + // Log only if different from the last one + prev_err != &e + } + None => true, + }; + + // Log error and update the last error + if to_log { + tracing::error!(url = %self.url, error= %e, "Connection failed."); + *last_ws_error = Some(e); + } + } + }, + } + } + + // To run after websocket connection + async fn post_connection(&self, ws_tx: Sink, ws_rx: Stream) { + // Request information document + #[cfg(feature = "nip11")] + self.request_nip11_document(); + + // Run message handler + self.run_message_handler(ws_tx, ws_rx).await; + } + async fn run_message_handler(&self, ws_tx: Sink, ws_rx: Stream) { // (Re)subscribe to relay if self.flags.can_read() { @@ -772,7 +820,7 @@ impl InnerRelay { let _ping = ping; while let Some(msg) = ws_rx.next().await { - match msg.map_err(Error::websocket)? { + match msg? { #[cfg(not(target_arch = "wasm32"))] WsMessage::Pong(bytes) => { if self.flags.has_ping() { @@ -2370,7 +2418,7 @@ impl InnerRelay { async fn send_ws_msgs(tx: &mut Sink, msgs: Vec) -> Result<(), Error> { let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok)); match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await { - Some(res) => res.map_err(Error::websocket), + Some(res) => Ok(res?), None => Err(Error::Timeout), } } @@ -2378,7 +2426,7 @@ async fn send_ws_msgs(tx: &mut Sink, msgs: Vec) -> Result<(), Error> /// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT]. async fn close_ws(tx: &mut Sink) -> Result<(), Error> { match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.close()).await { - Some(res) => res.map_err(Error::websocket), + Some(res) => Ok(res?), None => Err(Error::Timeout), } } diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 28cd72dd5..188f4fe87 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -254,10 +254,34 @@ impl Relay { self.inner.internal_notification_sender.subscribe() } - /// Connect to relay and keep alive connection + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + #[inline] + pub fn connect(&self) { + self.inner.connect() + } + + /// Waits for relay connection + /// + /// Wait for relay connection at most for the specified `timeout`. + /// The code continues when the relay is connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.inner.wait_for_connection(timeout).await + } + + /// Try to establish a connection with the relay. + /// + /// Attempts to establish a connection without spawning the connection task if it fails. + /// This means that if the connection fails, no automatic retries are scheduled. + /// Use [`Relay::connect`] if you want to immediately spawn a connection task, + /// regardless of whether the initial connection succeeds. + /// + /// Returns an error if the connection fails. #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + self.inner.try_connect(timeout).await } /// Disconnect from relay and set status to 'Terminated' @@ -443,7 +467,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); let keys = Keys::generate(); let event = EventBuilder::text_note("Test") @@ -462,7 +486,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -485,7 +509,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -508,7 +532,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -532,7 +556,9 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); + + time::sleep(Duration::from_secs(1)).await; assert!(relay.inner.is_running()); @@ -549,6 +575,80 @@ mod tests { assert!(!relay.inner.is_running()); } + #[tokio::test] + async fn test_connect() { + // Mock relay + let mock = MockRelay::run().await.unwrap(); + let url = RelayUrl::parse(&mock.url()).unwrap(); + + let opts = RelayOptions::default(); + let relay = Relay::with_opts(url, opts); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + relay.connect(); + + relay.wait_for_connection(Duration::from_secs(1)).await; + + assert_eq!(relay.status(), RelayStatus::Connected); + assert!(relay.inner.is_running()); + } + + #[tokio::test] + async fn test_connect_to_unreachable_relay() { + let url = RelayUrl::parse("wss://127.0.0.1:666").unwrap(); + + let opts = RelayOptions::default(); + let relay = Relay::with_opts(url, opts); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + relay.connect(); + + time::sleep(Duration::from_secs(1)).await; + + assert_eq!(relay.status(), RelayStatus::Disconnected); + assert!(relay.inner.is_running()); + } + + #[tokio::test] + async fn test_try_connect() { + // Mock relay + let mock = MockRelay::run().await.unwrap(); + let url = RelayUrl::parse(&mock.url()).unwrap(); + + let opts = RelayOptions::default(); + let relay = Relay::with_opts(url, opts); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + relay.try_connect(Duration::from_millis(500)).await.unwrap(); + + assert_eq!(relay.status(), RelayStatus::Connected); + + time::sleep(Duration::from_millis(500)).await; + + assert!(relay.inner.is_running()); + } + + #[tokio::test] + async fn test_try_connect_to_unreachable_relay() { + let url = RelayUrl::parse("wss://127.0.0.1:666").unwrap(); + + let opts = RelayOptions::default(); + let relay = Relay::with_opts(url, opts); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + let res = relay.try_connect(Duration::from_secs(2)).await; + assert!(matches!(res.unwrap_err(), Error::WebSocket(..))); + + assert_eq!(relay.status(), RelayStatus::Terminated); + + // Connection failed, the connection task is not running + assert!(!relay.inner.is_running()); + } + #[tokio::test] async fn test_disconnect_unresponsive_relay_that_connect() { // Mock relay @@ -562,7 +662,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -594,7 +694,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -609,6 +709,30 @@ mod tests { assert!(!relay.inner.is_running()); } + #[tokio::test] + async fn test_wait_for_connection() { + // Mock relay + let opts = RelayTestOptions { + unresponsive_connection: Some(Duration::from_secs(2)), + }; + let mock = MockRelay::run_with_opts(opts).await.unwrap(); + let url = RelayUrl::parse(&mock.url()).unwrap(); + + let relay = Relay::new(url); + + assert_eq!(relay.status(), RelayStatus::Initialized); + + relay.connect(); + + relay.wait_for_connection(Duration::from_millis(500)).await; // This timeout + + assert_eq!(relay.status(), RelayStatus::Connecting); + + relay.wait_for_connection(Duration::from_secs(3)).await; + + assert_eq!(relay.status(), RelayStatus::Connected); + } + #[tokio::test] async fn test_nip42_send_event() { // Mock relay @@ -623,7 +747,7 @@ mod tests { relay.inner.state.automatic_authentication(true); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); @@ -664,7 +788,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); diff --git a/crates/nostr-relay-pool/src/relay/status.rs b/crates/nostr-relay-pool/src/relay/status.rs index 8eb16a414..1ec37191e 100644 --- a/crates/nostr-relay-pool/src/relay/status.rs +++ b/crates/nostr-relay-pool/src/relay/status.rs @@ -97,7 +97,7 @@ impl RelayStatus { matches!(self, Self::Terminated) } - /// Check if relay can start connection (status is `initialized` or `terminated`) + /// Check if relay can start a connection (status is `initialized` or `terminated`) #[inline] pub(crate) fn can_connect(&self) -> bool { matches!(self, Self::Initialized | Self::Terminated) diff --git a/crates/nostr-relay-pool/src/shared.rs b/crates/nostr-relay-pool/src/shared.rs index 587a8b587..85f73683f 100644 --- a/crates/nostr-relay-pool/src/shared.rs +++ b/crates/nostr-relay-pool/src/shared.rs @@ -36,6 +36,7 @@ pub struct SharedState { nip42_auto_authentication: Arc, min_pow_difficulty: Arc, pub(crate) filtering: RelayFiltering, + // TODO: add a semaphore to limit number of concurrent websocket connections attempts? } impl Default for SharedState { diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 2b42f7221..9850e69c0 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -493,13 +493,27 @@ impl Client { } /// Connect to a previously added relay + /// + /// Check [`RelayPool::connect_relay`] docs to learn more. #[inline] pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.connect_relay(url, None).await?) + Ok(self.pool.connect_relay(url).await?) + } + + /// Try to connect to a previously added relay + /// + /// For further details, see the documentation of [`RelayPool::try_connect_relay`]. + #[inline] + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + pool::Error: From<::Err>, + { + Ok(self.pool.try_connect_relay(url, timeout).await?) } /// Disconnect relay @@ -515,16 +529,41 @@ impl Client { /// Connect to all added relays #[inline] pub async fn connect(&self) { - self.pool.connect(None).await; + self.pool.connect().await; + } + + /// Waits for relays connections + /// + /// Wait for relays connections at most for the specified `timeout`. + /// The code continues when the relays are connected or the `timeout` is reached. + #[inline] + pub async fn wait_for_connection(&self, timeout: Duration) { + self.pool.wait_for_connection(timeout).await + } + + /// Try to establish a connection with the relays. + /// + /// Attempts to establish a connection without spawning the connection task if it fails. + /// This means that if the connection fails, no automatic retries are scheduled. + /// Use [`Client::connect`] if you want to immediately spawn a connection task, + /// regardless of whether the initial connection succeeds. + /// + /// For further details, see the documentation of [`RelayPool::try_connect`]. + #[inline] + pub async fn try_connect(&self, timeout: Duration) -> Output<()> { + self.pool.try_connect(timeout).await } /// Connect to all added relays /// /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. - #[inline] + #[deprecated( + since = "0.39.0", + note = "Use `connect` + `wait_for_connection` instead." + )] pub async fn connect_with_timeout(&self, timeout: Duration) { - self.pool.connect(Some(timeout)).await + self.pool.try_connect(timeout).await; } /// Disconnect from all relays diff --git a/crates/nwc/src/lib.rs b/crates/nwc/src/lib.rs index 6b6830059..fecdb6d23 100644 --- a/crates/nwc/src/lib.rs +++ b/crates/nwc/src/lib.rs @@ -73,7 +73,7 @@ impl NWC { } // Connect - self.relay.connect(None).await; + self.relay.connect(); let filter = Filter::new() .author(self.uri.public_key)