From 65a9ddda1eef3fe422ea374ffa0a8bb271720539 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 26 Jan 2024 17:29:51 +0000 Subject: [PATCH] fix: fetching messages with many unreliable relays the tool had not been tested with large number of user relays, some of which are misbehaving. It works well when sending events to relays but struggles when fetching messages. it seems to crash when accessing a large number of relays. this change queues up relays so many are not connected to at the same time. it also shows more verbose messages about its connection and success with relays. many of the tests will fail as a result of this change as I havn't updated them to expect details of more relay interaction. further changes are urgently needed to improve the speed of fetching events. - relay interaction UI should reflect the smooth approach used for sending events - we don't need to fetch user events from every relay - we could show the user information that we have already collected and allow them to interact --- src/client.rs | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7418519..6d0aa60 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,7 +12,7 @@ // want to inadvertlty use other features of nightly that might be removed. use anyhow::{Context, Result}; use async_trait::async_trait; -use futures::future::join_all; +use futures::stream::{self, StreamExt}; #[cfg(test)] use mockall::*; use nostr::Event; @@ -50,10 +50,10 @@ impl Connect for Client { ] } else { vec![ - "wss://relayable.org".to_string(), - "wss://relay.f7z.io".to_string(), + "wss://purplepages.es".to_string(), "wss://relay.damus.io".to_string(), - "wss://relay.snort.social".to_string(), + "wss://nostr-pub.wellorder.net".to_string(), + "wss://nos.lol".to_string(), // "ws://localhost:8080".to_string() ] }; @@ -124,19 +124,19 @@ impl Connect for Client { let relays_map = self.client.relays().await; - let relay_results = join_all( - relays - .clone() - .iter() - .map(|r| { - ( - relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(), - filters.clone(), - ) - }) - .map(|(relay, filters)| get_events_of(relay, filters)), - ) - .await; + let futures: Vec<_> = relays + .clone() + .iter() + .map(|r| { + ( + relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(), + filters.clone(), + ) + }) + .map(|(relay, filters)| get_events_of(relay, filters)) + .collect(); + + let relay_results = stream::iter(futures).buffer_unordered(5).collect().await; Ok(get_dedup_events(relay_results)) } @@ -146,18 +146,19 @@ async fn get_events_of( relay: &nostr_sdk::Relay, filters: Vec, ) -> Result> { - if !relay.is_connected().await { - relay.connect(true).await; - } - relay + println!("fetching from {}", relay.url()); + + let events = relay .get_events_of( filters, // 20 is nostr_sdk default - std::time::Duration::from_secs(20), + std::time::Duration::from_secs(10), nostr_sdk::FilterOptions::ExitOnEOSE, ) .await - .context("failed to get events from relay") + .context("failed to get events from relay")?; + println!("fetched {} events from {}", events.len(), relay.url()); + Ok(events) } #[derive(Default)]