diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index 0874bc81d..5525c0caf 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -912,7 +912,17 @@ impl InnerRelayPool { Ok(ReceiverStream::new(rx)) } - pub async fn connect(&self, connection_timeout: Option) { + pub async fn connect(&self) { + // Lock with read shared access + let relays = self.relays.read().await; + + // Connect + for relay in relays.values() { + relay.connect() + } + } + + pub async fn try_connect(&self, timeout: Duration) { // Lock with read shared access let relays = self.relays.read().await; @@ -920,7 +930,7 @@ impl InnerRelayPool { // Filter only relays that can connect and compose futures for relay in relays.values().filter(|r| r.status().can_connect()) { - futures.push(relay.connect(connection_timeout)); + futures.push(relay.try_connect(timeout)); } // Check number of futures @@ -957,7 +967,6 @@ impl InnerRelayPool { pub async fn connect_relay( &self, url: U, - connection_timeout: Option, ) -> Result<(), Error> where U: TryIntoUrl, @@ -973,7 +982,31 @@ impl InnerRelayPool { let relay: &Relay = self.internal_relay(&relays, &url)?; // Connect - relay.connect(connection_timeout).await; + relay.connect(); + + Ok(()) + } + + pub async fn try_connect_relay( + &self, + url: U, + timeout: Duration, + ) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + // Convert url + let url: RelayUrl = url.try_into_url()?; + + // Lock with read shared access + let relays = self.relays.read().await; + + // Get relay + let relay: &Relay = self.internal_relay(&relays, &url)?; + + // Try onnect + relay.try_connect(timeout).await?; Ok(()) } diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index 0e169663b..46dd9cb0c 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -251,10 +251,16 @@ impl RelayPool { self.inner.remove_all_relays(true).await } - /// Connect to all added relays and keep connection alive + /// Connect to all added relays #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn connect(&self) { + self.inner.connect().await + } + + /// Connect to all added relays + #[inline] + pub async fn try_connect(&self, timeout: Duration) { + self.inner.try_connect(timeout).await } /// Disconnect from all relays @@ -264,17 +270,36 @@ impl RelayPool { } /// Connect to relay + /// + /// This method doesn't provide any information on if the connection was successful or not. + /// + /// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool. #[inline] pub async fn connect_relay( &self, url: U, - connection_timeout: Option, ) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { - self.inner.connect_relay(url, connection_timeout).await + self.inner.connect_relay(url).await + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + #[inline] + pub async fn try_connect_relay( + &self, + url: U, + timeout: Duration, + ) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + self.inner.try_connect_relay(url, timeout).await } /// Disconnect relay @@ -633,7 +658,7 @@ mod tests { pool.add_relay(&url, RelayOptions::default()).await.unwrap(); - pool.connect(None).await; + pool.connect().await; assert!(!pool.inner.is_shutdown()); diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index 6f030c460..377a4ff93 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -59,6 +59,8 @@ pub enum Error { NotReady, /// Relay not connected NotConnected, + /// Connection failed + ConnectionFailed, /// Received shutdown ReceivedShutdown, /// Relay message @@ -145,6 +147,7 @@ impl fmt::Display for Error { } Self::NotReady => write!(f, "relay is initialized but not ready"), Self::NotConnected => write!(f, "relay not connected"), + Self::ConnectionFailed => write!(f, "connection failed"), Self::ReceivedShutdown => write!(f, "received shutdown"), Self::RelayMessage(message) => write!(f, "{message}"), Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"), diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 70be8eb04..e393bb8f9 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -423,42 +423,54 @@ impl InnerRelay { } } - pub async fn connect(&self, connection_timeout: Option) { - // Return if relay can't connect - if !self.status().can_connect() { - return; - } - + fn _connect(&self, timeout: Duration) { // Update status // Change it to pending to avoid issues with the health check (initialized check) self.set_status(RelayStatus::Pending, false); - // If connection timeout is `Some`, try to connect waiting for connection - match connection_timeout { - Some(timeout) => { - let mut notifications = self.internal_notification_sender.subscribe(); + // Spawn connection task + self.spawn_and_try_connect(timeout); + } - // Spawn and try connect - self.spawn_and_try_connect(timeout); + pub fn connect(&self) { + if self.status().can_connect() { + self._connect(DEFAULT_CONNECTION_TIMEOUT); + } + } - // Wait for status change (connected or disconnected) - tracing::debug!(url = %self.url, "Waiting for status change before continue"); - while let Ok(notification) = notifications.recv().await { - if let RelayNotification::RelayStatus { - status: RelayStatus::Connected | RelayStatus::Disconnected, - } = notification - { - break; - } + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + // Check if relay can't connect + if self.status().can_connect() { + // TODO: should return `Error::AlreadyConnected`? + return Ok(()); + } + + // Subscribe to notifications + let mut notifications = self.internal_notification_sender.subscribe(); + + // Connect + self._connect(timeout); + + // Wait for status change (connected or disconnected) + while let Ok(notification) = notifications.recv().await { + if let RelayNotification::RelayStatus { status } = notification { + match status { + // These statuses shouldn't happen, break the loop + RelayStatus::Initialized | RelayStatus::Pending => break, + // Waiting for connection, stay in the loop + RelayStatus::Connecting => continue, + // Success + RelayStatus::Connected => return Ok(()), + // Failed + RelayStatus::Disconnected | RelayStatus::Terminated => return Err(Error::ConnectionFailed), } } - None => { - self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT); - } } + + Err(Error::PrematureExit) } - fn spawn_and_try_connect(&self, connection_timeout: Duration) { + fn spawn_and_try_connect(&self, timeout: Duration) { if self.is_running() { tracing::warn!(url = %self.url, "Connection task is already running."); return; @@ -480,7 +492,7 @@ impl InnerRelay { tokio::select! { // Connect and run message handler - _ = relay.connect_and_run(connection_timeout) => {}, + _ = relay.connect_and_run(timeout) => {}, // Handle terminate _ = relay.handle_terminate(&mut rx_service) => { // Update status @@ -596,7 +608,7 @@ impl InnerRelay { } /// Connect and run message handler - async fn connect_and_run(&self, connection_timeout: Duration) { + async fn connect_and_run(&self, timeout: Duration) { // Update status self.set_status(RelayStatus::Connecting, true); @@ -609,7 +621,7 @@ impl InnerRelay { DEFAULT_CONNECTION_TIMEOUT } else { // First attempt, use external timeout - connection_timeout + timeout }; // Connect diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 799e442db..77b5c3788 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -255,10 +255,20 @@ impl Relay { self.inner.internal_notification_sender.subscribe() } - /// Connect to relay and keep alive connection + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + #[inline] + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + self.inner.try_connect(timeout).await } /// Disconnect from relay and set status to 'Terminated' @@ -444,7 +454,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); let keys = Keys::generate(); let event = EventBuilder::text_note("Test") @@ -463,7 +473,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -486,7 +496,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -509,7 +519,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -533,7 +543,8 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + let res = relay.try_connect(Duration::from_millis(100)).await; + assert!(matches!(res.unwrap_err(), Error::ConnectionFailed)); assert!(relay.inner.is_running()); @@ -563,7 +574,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -595,7 +606,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -624,7 +635,7 @@ mod tests { relay.inner.state.automatic_authentication(true); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); @@ -665,7 +676,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); diff --git a/crates/nostr-relay-pool/src/shared.rs b/crates/nostr-relay-pool/src/shared.rs index 1ac897db4..9e656350a 100644 --- a/crates/nostr-relay-pool/src/shared.rs +++ b/crates/nostr-relay-pool/src/shared.rs @@ -28,8 +28,6 @@ impl fmt::Display for SharedStateError { } } -// TODO: add SharedStateBuilder? - #[derive(Debug, Clone)] pub struct SharedState { pub(crate) database: Arc, @@ -37,6 +35,7 @@ pub struct SharedState { nip42_auto_authentication: Arc, min_pow_difficulty: Arc, pub(crate) filtering: RelayFiltering, + // TODO: add a semaphore to limit number of concurrent websocket connections attempts? } impl Default for SharedState {