Skip to content

Commit

Permalink
fix: fetching messages with many unreliable relays
Browse files Browse the repository at this point in the history
the tool had not been tested with large number of user relays, some of
which are misbehaving. It works well when sending events to relays but
struggles when fetching messages.

it seems to crash when accessing a large number of relays. this change
queues up relays so many are not connected to at the same time.

it also shows more verbose messages about its connection and success
with relays.

many of the tests will fail as a result of this change as I havn't
updated them to expect details of more relay interaction.

further changes are urgently needed to improve the speed of fetching
events.

 - relay interaction UI should reflect the smooth approach used for
   sending events
 - we don't need to fetch user events from every relay
 - we could show the user information that we have already collected
   and allow them to interact
  • Loading branch information
DanConwayDev committed Jan 26, 2024
1 parent 1587924 commit 65a9ddd
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// want to inadvertlty use other features of nightly that might be removed.
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::future::join_all;
use futures::stream::{self, StreamExt};
#[cfg(test)]
use mockall::*;
use nostr::Event;
Expand Down Expand Up @@ -50,10 +50,10 @@ impl Connect for Client {
]
} else {
vec![
"wss://relayable.org".to_string(),
"wss://relay.f7z.io".to_string(),
"wss://purplepages.es".to_string(),
"wss://relay.damus.io".to_string(),
"wss://relay.snort.social".to_string(),
"wss://nostr-pub.wellorder.net".to_string(),
"wss://nos.lol".to_string(),
// "ws://localhost:8080".to_string()
]
};
Expand Down Expand Up @@ -124,19 +124,19 @@ impl Connect for Client {

let relays_map = self.client.relays().await;

let relay_results = join_all(
relays
.clone()
.iter()
.map(|r| {
(
relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(),
filters.clone(),
)
})
.map(|(relay, filters)| get_events_of(relay, filters)),
)
.await;
let futures: Vec<_> = relays
.clone()
.iter()
.map(|r| {
(
relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(),
filters.clone(),
)
})
.map(|(relay, filters)| get_events_of(relay, filters))
.collect();

let relay_results = stream::iter(futures).buffer_unordered(5).collect().await;

Ok(get_dedup_events(relay_results))
}
Expand All @@ -146,18 +146,19 @@ async fn get_events_of(
relay: &nostr_sdk::Relay,
filters: Vec<nostr::Filter>,
) -> Result<Vec<Event>> {
if !relay.is_connected().await {
relay.connect(true).await;
}
relay
println!("fetching from {}", relay.url());

let events = relay
.get_events_of(
filters,
// 20 is nostr_sdk default
std::time::Duration::from_secs(20),
std::time::Duration::from_secs(10),
nostr_sdk::FilterOptions::ExitOnEOSE,
)
.await
.context("failed to get events from relay")
.context("failed to get events from relay")?;
println!("fetched {} events from {}", events.len(), relay.url());
Ok(events)
}

#[derive(Default)]
Expand Down

0 comments on commit 65a9ddd

Please sign in to comment.