diff --git a/Cargo.lock b/Cargo.lock index 066bb8ac5..4dc32ea95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2472,6 +2472,7 @@ dependencies = [ "ceramic-kubo-rpc-server", "ceramic-metrics", "ceramic-p2p", + "ceramic-peer-svc", "ceramic-pipeline", "ceramic-sql", "cid 0.11.1", @@ -2521,6 +2522,7 @@ dependencies = [ "ceramic-core", "ceramic-event-svc", "ceramic-metrics", + "chrono", "cid 0.11.1", "criterion2", "futures", @@ -2548,6 +2550,39 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ceramic-peer-svc" +version = "0.44.0" +dependencies = [ + "anyhow", + "async-trait", + "ceramic-api", + "ceramic-core", + "ceramic-event", + "ceramic-metrics", + "ceramic-p2p", + "ceramic-sql", + "criterion2", + "expect-test", + "futures", + "ipld-core", + "multibase 0.9.1", + "paste", + "prometheus-client", + "rand 0.8.5", + "recon", + "serde", + "serde_ipld_dagcbor", + "sqlx", + "test-log", + "thiserror", + "tmpdir", + "tokio", + "tracing", + "tracing-subscriber", + "uuid 1.10.0", +] + [[package]] name = "ceramic-pipeline" version = "0.44.0" diff --git a/Cargo.toml b/Cargo.toml index ce1d1a126..1e48e78d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "metrics", "one", "p2p", + "peer-svc", "pipeline", "recon", "sql", @@ -76,6 +77,7 @@ ceramic-metadata = { path = "./metadata" } ceramic-metrics = { path = "./metrics" } ceramic-one = { path = "./one" } ceramic-p2p = { path = "./p2p" } +ceramic-peer-svc = { path = "./peer-svc" } ceramic-pipeline = { path = "./pipeline" } ceramic-sql = { path = "./sql" } ceramic-validation = { path = "./validation" } diff --git a/core/src/lib.rs b/core/src/lib.rs index adfe5372b..61eee7a6c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,7 +8,7 @@ pub mod interest; mod jwk; mod network; mod node_id; -mod peer; +pub mod peer; mod range; mod serialize_ext; pub mod signer; diff --git a/core/src/node_id.rs b/core/src/node_id.rs index f205858dc..f09794810 100644 --- a/core/src/node_id.rs +++ b/core/src/node_id.rs @@ -1,7 +1,6 @@ -use std::fmt::Display; -use std::{fs, path::PathBuf, str::FromStr}; +use std::{fmt::Display, path::Path, str::FromStr}; -use anyhow::{anyhow, Context, Ok, Result}; +use anyhow::{anyhow, Context, Result}; use cid::{multihash::Multihash, Cid}; use libp2p_identity::PeerId; use rand::Rng; @@ -159,14 +158,10 @@ impl Display for NodeId { } /// A [`NodeId`] with its private key. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NodeKey { id: NodeId, - // It would be preferable to not store the private_key_bytes directly and instead use only the - // key_pair. However to use JWK we need to keep the private_key_bytes around. - // Maybe in future versions of ssi_jwk we can change this. private_key_bytes: [u8; 32], - key_pair: Ed25519KeyPair, did: DidDocument, } @@ -179,11 +174,10 @@ impl Eq for NodeKey {} impl NodeKey { /// Construct a new key with both private and public keys. - fn new(id: NodeId, private_key_bytes: [u8; 32], key_pair: Ed25519KeyPair) -> Self { + fn new(id: NodeId, private_key_bytes: [u8; 32]) -> Self { Self { id, private_key_bytes, - key_pair, did: id.did(), } } @@ -204,12 +198,24 @@ impl NodeKey { self.id } - /// Read an Ed25519 key from a directory - pub fn try_from_dir(key_dir: PathBuf) -> Result { - let key_path = key_dir.join("id_ed25519_0"); - let content = fs::read_to_string(key_path)?; - let seed = ssh_key::private::PrivateKey::from_str(&content) - .map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))? + /// Read an Ed25519 key from a directory or create a new key if not found in the directory. + pub async fn try_from_dir(key_dir: impl AsRef) -> Result { + let key_path = key_dir.as_ref().join("id_ed25519_0"); + let private_key = match tokio::fs::read_to_string(&key_path).await { + Ok(content) => ssh_key::private::PrivateKey::from_str(&content) + .map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))?, + Err(_) => { + let key = ssh_key::private::PrivateKey::random( + &mut rand::rngs::OsRng, + ssh_key::Algorithm::Ed25519, + )?; + // Write out contents to file for next time + let content = key.to_openssh(ssh_key::LineEnding::default())?; + tokio::fs::write(&key_path, content).await?; + key + } + }; + let seed = private_key .key_data() .ed25519() .map_or(Err(anyhow::anyhow!("failed to parse ed25519 key")), |key| { @@ -223,7 +229,6 @@ impl NodeKey { public_ed25519_key_bytes, }, seed, - key_pair, )) } /// Create an Ed25519 key pair from a secret. The secret can be formatted in two ways: @@ -279,7 +284,7 @@ impl NodeKey { let id = NodeId { public_ed25519_key_bytes, }; - Ok(NodeKey::new(id, secret, key_pair)) + Ok(NodeKey::new(id, secret)) } /// Create a NodeId using a random Ed25519 key pair /// @@ -299,12 +304,19 @@ impl NodeKey { public_ed25519_key_bytes, }, random_secret, - key_pair, ) } /// Sign data with this key pub fn sign(&self, data: &[u8]) -> Signature { - self.key_pair.sign(data) + let key_pair = Ed25519KeyPair::from_seed_unchecked(&self.private_key_bytes) + .expect("private key bytes should already be validated"); + key_pair.sign(data) + } + + /// Construct a [`libp2p_identity::Keypair`] from this node key. + pub fn p2p_keypair(&self) -> libp2p_identity::Keypair { + libp2p_identity::Keypair::ed25519_from_bytes(self.private_key_bytes) + .expect("private key bytes should already be validated") } } diff --git a/core/src/peer.rs b/core/src/peer.rs index 6cf63fd95..19dbf2b7c 100644 --- a/core/src/peer.rs +++ b/core/src/peer.rs @@ -1,3 +1,7 @@ +//! Peer structures for managing known peers the network. +//! [`PeerEntry`] is be signed by the peer such that [`PeerEntry`] structs can be gossipped around +//! the network safely. + use anyhow::{anyhow, bail}; use multiaddr::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; @@ -5,19 +9,23 @@ use ssi::jws::DecodedJWS; use crate::{node_id::NodeKey, signer::Signer, DeserializeExt as _, NodeId, SerializeExt as _}; +const MIN_EXPIRATION: u64 = 0; +// 11 9s is the maximum value we can encode into the string representation of a PeerKey. +const MAX_EXPIRATION: u64 = 99_999_999_999; + /// Peer entry that is signed and can be shared. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct PeerEntry { id: NodeId, // Number of seconds after UNIX epoch when this entry is no longer valid. - expiration: u32, + expiration: u64, addresses: Vec, } impl PeerEntry { /// Construct an entry about a peer with address that is no longer valid after expiration seconds after the /// UNIX epoch. - pub fn new(local_id: NodeId, expiration: u32, addresses: Vec) -> Self { + pub fn new(local_id: NodeId, expiration: u64, addresses: Vec) -> Self { let peer_id = local_id.peer_id(); Self { id: local_id, @@ -59,7 +67,7 @@ impl PeerEntry { } /// Report the number of seconds after the UNIX epoch when this entry is no longer valid. - pub fn expiration(&self) -> u32 { + pub fn expiration(&self) -> u64 { self.expiration } @@ -84,10 +92,38 @@ fn ensure_multiaddr_has_p2p(addr: Multiaddr, peer_id: PeerId) -> Multiaddr { /// Encoded [`PeerEntry`] prefixed with its expiration. /// The sort order matters as its used in a Recon ring. /// The key is valid utf-8 of the form `.`; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)] pub struct PeerKey(String); +impl std::fmt::Display for PeerKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl TryFrom> for PeerKey { + type Error = anyhow::Error; + + fn try_from(value: Vec) -> Result { + let key = Self(String::from_utf8(value)?); + let _ = key.to_entry()?; + Ok(key) + } +} + impl PeerKey { + /// Return a builder for constructing a PeerKey from its parts. + pub fn builder() -> Builder { + Builder { state: Init } + } + /// Return the raw bytes of the peer key. + pub fn as_slice(&self) -> &[u8] { + self.0.as_bytes() + } + /// Report if this key contains an jws section. + pub fn has_jws(&self) -> bool { + self.0.find('.').is_some() + } /// Construct a signed key from a [`PeerEntry`]. pub fn from_entry(entry: &PeerEntry, node_key: &NodeKey) -> anyhow::Result { if entry.id() != node_key.id() { @@ -102,11 +138,7 @@ impl PeerKey { } /// Decode and verify key as a [`PeerEntry`]. pub fn to_entry(&self) -> anyhow::Result { - let (expiration, jws) = self - .0 - .split_once('.') - .ok_or_else(|| anyhow!("peer key must contain a '.'"))?; - let expiration: u32 = expiration.parse()?; + let (expiration, jws) = self.split_expiration()?; let peer = PeerEntry::from_jws(jws)?; if expiration != peer.expiration { Err(anyhow!( @@ -117,6 +149,112 @@ impl PeerKey { Ok(peer) } } + fn split_expiration(&self) -> anyhow::Result<(u64, &str)> { + let (expiration, jws) = self + .0 + .split_once('.') + .ok_or_else(|| anyhow!("peer key must contain a '.'"))?; + let expiration = expiration.parse()?; + Ok((expiration, jws)) + } +} + +/// Builder provides an ordered API for constructing a PeerKey +#[derive(Debug)] +pub struct Builder { + state: S, +} +/// The state of the builder +pub trait BuilderState {} + +/// Initial state of the builder. +#[derive(Debug)] +pub struct Init; +impl BuilderState for Init {} + +/// Build state where the expiration is known. +pub struct WithExpiration { + expiration: u64, +} +impl BuilderState for WithExpiration {} + +/// Build state where the peer id is known. +pub struct WithId<'a> { + node_key: &'a NodeKey, + expiration: u64, +} +impl<'a> BuilderState for WithId<'a> {} + +/// Build state where the addresses are known. +pub struct WithAddresses<'a> { + node_key: &'a NodeKey, + expiration: u64, + addresses: Vec, +} +impl<'a> BuilderState for WithAddresses<'a> {} + +impl Builder { + /// Set the expiration to earliest possible value. + pub fn with_min_expiration(self) -> Builder { + Builder { + state: WithExpiration { + expiration: MIN_EXPIRATION, + }, + } + } + /// Set the expiration to the latest possible value. + pub fn with_max_expiration(self) -> Builder { + Builder { + state: WithExpiration { + expiration: MAX_EXPIRATION, + }, + } + } + /// Set the expiration as the number of seconds since the UNIX epoch. + pub fn with_expiration(self, expiration: u64) -> Builder { + Builder { + state: WithExpiration { expiration }, + } + } +} +impl Builder { + /// Finish the build producing a partial [`PeerKey`]. + pub fn build_fencepost(self) -> PeerKey { + PeerKey(format!("{:0>11}", self.state.expiration)) + } + /// Set the peer id. Note, a NodeKey is required so the [`PeerEntry`] can be signed. + pub fn with_id(self, id: &NodeKey) -> Builder { + Builder { + state: WithId { + node_key: id, + expiration: self.state.expiration, + }, + } + } +} +impl<'a> Builder> { + /// Set the addresses where the peer can be reached. + pub fn with_addresses(self, addresses: Vec) -> Builder> { + Builder { + state: WithAddresses { + node_key: self.state.node_key, + expiration: self.state.expiration, + addresses, + }, + } + } +} +impl<'a> Builder> { + /// Finish the build producing a [`PeerKey`]. + pub fn build(self) -> PeerKey { + let entry = PeerEntry::new( + self.state.node_key.id(), + self.state.expiration, + self.state.addresses, + ); + PeerKey::from_entry(&entry, self.state.node_key) + .expect("builder should not build invalid peer key") + } } #[cfg(test)] diff --git a/migrations/sqlite/20241122113134_peer.down.sql b/migrations/sqlite/20241122113134_peer.down.sql new file mode 100644 index 000000000..d9fac05bb --- /dev/null +++ b/migrations/sqlite/20241122113134_peer.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS ceramic_one_peer; diff --git a/migrations/sqlite/20241122113134_peer.up.sql b/migrations/sqlite/20241122113134_peer.up.sql new file mode 100644 index 000000000..10984402b --- /dev/null +++ b/migrations/sqlite/20241122113134_peer.up.sql @@ -0,0 +1,16 @@ +-- Add up migration script here + +CREATE TABLE IF NOT EXISTS ceramic_one_peer ( + order_key BLOB NOT NULL, -- PeerKey . + ahash_0 INTEGER NOT NULL, -- the ahash is decomposed as [u32; 8] + ahash_1 INTEGER NOT NULL, + ahash_2 INTEGER NOT NULL, + ahash_3 INTEGER NOT NULL, + ahash_4 INTEGER NOT NULL, + ahash_5 INTEGER NOT NULL, + ahash_6 INTEGER NOT NULL, + ahash_7 INTEGER NOT NULL, + PRIMARY KEY(order_key) +); + +SELECT order_key, ahash_0, ahash_1, ahash_2, ahash_3, ahash_4, ahash_5, ahash_6, ahash_7 FROM ceramic_one_peer WHERE false; diff --git a/one/Cargo.toml b/one/Cargo.toml index 9ed1e794c..86285d56e 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -27,6 +27,7 @@ ceramic-kubo-rpc = { path = "../kubo-rpc", features = ["http"] } ceramic-kubo-rpc-server.workspace = true ceramic-metrics.workspace = true ceramic-p2p.workspace = true +ceramic-peer-svc.workspace = true ceramic-pipeline.workspace = true ceramic-sql.workspace = true cid.workspace = true diff --git a/one/src/daemon.rs b/one/src/daemon.rs index b4de4c8cc..22dd5941b 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -4,7 +4,7 @@ use crate::{ default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts, Info, LogOpts, Network, }; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, bail, Result}; use ceramic_anchor_remote::RemoteCas; use ceramic_anchor_service::AnchorService; use ceramic_core::NodeKey; @@ -13,7 +13,8 @@ use ceramic_event_svc::{ChainInclusionProvider, EventService}; use ceramic_interest_svc::InterestService; use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; -use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; +use ceramic_p2p::{Libp2pConfig, PeerKeyInterests}; +use ceramic_peer_svc::PeerService; use clap::Args; use object_store::aws::AmazonS3Builder; use recon::{FullInterests, Recon, ReconInterestProvider}; @@ -361,6 +362,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let rpc_providers = get_eth_rpc_providers(opts.ethereum_rpc_urls, &opts.network).await?; // Construct services from pool + let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone())); let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone())); let event_validation = opts.event_validation.unwrap_or(true); let event_svc = Arc::new( @@ -418,25 +420,13 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { }; debug!(?p2p_config, "using p2p config"); - // Load p2p identity - let mut kc = Keychain::::new(opts.p2p_key_dir.clone()) - .await - .context(format!( - "initializing p2p key: using p2p_key_dir={}", - opts.p2p_key_dir.display() - ))?; - let libp2p_keypair = load_identity(&mut kc).await?; - let peer_id = libp2p_keypair.public().to_peer_id(); - - // Load node ID from key directory. Libp2p has their own wrapper around ed25519 keys (╯°□°)╯︵ ┻━┻ - // So, we need to load the key from the key directory for libp2p to use, and then again for evaluating the Node ID - // using a generic ed25519 processing library (ring). We'll assert that the keys are the same. - let node_key = NodeKey::try_from_dir(opts.p2p_key_dir.clone())?; - assert_eq!(node_key.peer_id(), peer_id); + let node_key = NodeKey::try_from_dir(opts.p2p_key_dir).await?; let node_id = node_key.id(); // Register metrics for all components let recon_metrics = MetricsHandle::register(recon::Metrics::register); + let peer_svc_store_metrics = + MetricsHandle::register(ceramic_peer_svc::store::Metrics::register); let interest_svc_store_metrics = MetricsHandle::register(ceramic_interest_svc::store::Metrics::register); let event_svc_store_metrics = @@ -445,51 +435,56 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { http_metrics::Metrics::register, )); + // Create recon store for peers. + let peer_svc = ceramic_peer_svc::store::StoreMetricsMiddleware::new( + peer_svc, + peer_svc_store_metrics.clone(), + ); + // Create recon store for interests. - let interest_store = ceramic_interest_svc::store::StoreMetricsMiddleware::new( + let interest_svc = ceramic_interest_svc::store::StoreMetricsMiddleware::new( interest_svc.clone(), interest_svc_store_metrics.clone(), ); - let interest_api_store = ceramic_interest_svc::store::StoreMetricsMiddleware::new( + let interest_api_svc = ceramic_interest_svc::store::StoreMetricsMiddleware::new( interest_svc.clone(), interest_svc_store_metrics.clone(), ); - // Create second recon store for models. - let model_store = ceramic_event_svc::store::StoreMetricsMiddleware::new( - event_svc.clone(), - event_svc_store_metrics.clone(), - ); - - let model_api_store = ceramic_event_svc::store::StoreMetricsMiddleware::new( + // Create recon store for models. + let model_svc = ceramic_event_svc::store::StoreMetricsMiddleware::new( event_svc.clone(), event_svc_store_metrics, ); + // Construct a recon implementation for peers. + let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone()); + // Construct a recon implementation for interests. - let recon_interest_svr = Recon::new( - interest_store.clone(), + let recon_interest = Recon::new( + interest_svc.clone(), FullInterests::default(), recon_metrics.clone(), ); // Construct a recon implementation for models. - let recon_model_svr = Recon::new( - model_store.clone(), + let recon_model = Recon::new( + model_svc.clone(), // Use recon interests as the InterestProvider for recon_model - ReconInterestProvider::new(node_id, interest_store.clone()), + ReconInterestProvider::new(node_id, interest_svc.clone()), recon_metrics, ); - let recons = Some((recon_interest_svr, recon_model_svr)); + let recons = Some((recon_peer, recon_interest, recon_model)); let ipfs_metrics = ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register); let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register); let ipfs = Ipfs::::builder() .with_p2p( p2p_config, - libp2p_keypair, + node_key.clone(), + peer_svc, recons, event_svc.clone(), p2p_metrics, @@ -613,8 +608,8 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let mut ceramic_server = ceramic_api::Server::new( node_id, network, - interest_api_store, - Arc::new(model_api_store), + interest_api_svc, + Arc::new(model_svc), pipeline_ctx, shutdown_signal.resubscribe(), ); diff --git a/one/src/network.rs b/one/src/network.rs index 1437d8884..0c6ab9101 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -3,12 +3,11 @@ use std::sync::Arc; use anyhow::Result; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, NodeKey, PeerKey}; use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService}; -use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node}; +use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService}; use iroh_rpc_client::P2pClient; use iroh_rpc_types::{p2p::P2pAddr, Addr}; -use libp2p::identity::Keypair; use recon::{libp2p::Recon, Sha256a}; use tokio::task::{self, JoinHandle}; use tracing::{debug, error}; @@ -33,15 +32,17 @@ impl BuilderState for WithP2p {} /// Configure the p2p service impl Builder { - pub async fn with_p2p( + pub async fn with_p2p( self, libp2p_config: Libp2pConfig, - keypair: Keypair, - recons: Option<(I, M)>, + node_key: NodeKey, + peer_svc: impl PeerService + 'static, + recons: Option<(P, I, M)>, block_store: Arc, metrics: ceramic_p2p::Metrics, ) -> anyhow::Result> where + P: Recon, I: Recon, M: Recon, S: iroh_bitswap::Store, @@ -52,8 +53,16 @@ impl Builder { config.libp2p = libp2p_config; - let mut p2p = - Node::new(config, addr.clone(), keypair, recons, block_store, metrics).await?; + let mut p2p = Node::new( + config, + addr.clone(), + node_key, + peer_svc, + recons, + block_store, + metrics, + ) + .await?; let task = task::spawn(async move { if let Err(err) = p2p.run().await { diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 2248f0917..b9f5a80a0 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -16,6 +16,7 @@ async-trait.workspace = true backoff.workspace = true ceramic-core.workspace = true ceramic-metrics.workspace = true +chrono.workspace = true cid.workspace = true futures-util.workspace = true futures.workspace = true diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index b14eedefc..baa85d9e0 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::Result; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, PeerKey}; use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig}; use libp2p::{ autonat, @@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION" /// Libp2p behaviour for the node. #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event")] -pub(crate) struct NodeBehaviour +pub(crate) struct NodeBehaviour where S: iroh_bitswap::Store + Send + Sync, { @@ -56,11 +56,12 @@ where relay: Toggle, relay_client: Toggle, dcutr: Toggle, - recon: Toggle>, + recon: Toggle>, } -impl NodeBehaviour +impl NodeBehaviour where + P: Recon + Send + Sync, I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, @@ -69,7 +70,7 @@ where local_key: &Keypair, config: &Libp2pConfig, relay_client: Option, - recons: Option<(I, M)>, + recons: Option<(P, I, M)>, block_store: Arc, metrics: Metrics, ) -> Result { @@ -184,8 +185,8 @@ where .with_max_pending_incoming(Some(config.max_conns_pending_in)) .with_max_established_per_peer(Some(config.max_conns_per_peer)), ); - let recon = recons.map(|(interest, model)| { - recon::libp2p::Behaviour::new(interest, model, recon::libp2p::Config::default()) + let recon = recons.map(|(peer, interest, model)| { + recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default()) }); Ok(NodeBehaviour { ping: Ping::default(), diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index e849a640f..22666c497 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; mod keys; mod metrics; mod node; +mod peers; mod providers; pub mod rpc; mod swarm; @@ -13,5 +14,6 @@ pub use self::keys::{DiskStorage, Keychain, MemoryStorage}; pub use self::metrics::Metrics; pub use self::node::*; pub use libp2p::PeerId; +pub use peers::{PeerKeyInterests, PeerService}; pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 32d71fb62..288157db1 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration}; use ahash::AHashMap; use anyhow::{anyhow, bail, Context, Result}; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, NodeKey, PeerKey}; use ceramic_metrics::{libp2p_metrics, Recorder}; use cid::Cid; use futures_util::stream::StreamExt; @@ -19,7 +19,6 @@ use libp2p::{ autonat::{self, OutboundProbeEvent}, core::Multiaddr, identify, - identity::Keypair, kad::{ self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, QueryId, QueryResult, @@ -40,11 +39,10 @@ use tracing::{debug, error, info, instrument, trace, warn}; use crate::{ behaviour::{Event, NodeBehaviour}, - keys::{Keychain, Storage}, metrics::{LoopEvent, Metrics}, + peers::{self, PeerService}, providers::Providers, - rpc::{self, RpcMessage}, - rpc::{P2p, ProviderRequestKey}, + rpc::{self, P2p, ProviderRequestKey, RpcMessage}, swarm::build_swarm, Config, }; @@ -61,14 +59,15 @@ pub enum NetworkEvent { /// Node implements a peer to peer node that participates on the Ceramic network. /// /// Node provides an external API via RpcMessages. -pub struct Node +pub struct Node where + P: Recon, I: Recon, M: Recon, S: iroh_bitswap::Store, { metrics: Metrics, - swarm: Swarm>, + swarm: Swarm>, supported_protocols: HashSet, net_receiver_in: Receiver, dial_queries: AHashMap>>>, @@ -79,6 +78,8 @@ where #[allow(dead_code)] rpc_client: RpcClient, rpc_task: JoinHandle<()>, + peers_tx: Sender, + peers_task: JoinHandle<()>, use_dht: bool, bitswap_sessions: BitswapSessions, providers: Providers, @@ -88,8 +89,9 @@ where active_address_probe: Option, } -impl fmt::Debug for Node +impl fmt::Debug for Node where + P: Recon, I: Recon, M: Recon, S: iroh_bitswap::Store, @@ -104,6 +106,8 @@ where .field("network_events", &self.network_events) .field("rpc_client", &self.rpc_client) .field("rpc_task", &self.rpc_task) + .field("peers_tx", &self.peers_tx) + .field("peers_task", &self.peers_task) .field("use_dht", &self.use_dht) .field("bitswap_sessions", &self.bitswap_sessions) .field("providers", &self.providers) @@ -121,22 +125,26 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6); const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60); const EXPIRY_INTERVAL: Duration = Duration::from_secs(1); -impl Drop for Node +impl Drop for Node where + P: Recon, I: Recon, M: Recon, S: iroh_bitswap::Store, { fn drop(&mut self) { self.rpc_task.abort(); + self.peers_task.abort(); } } // Allow IntoConnectionHandler deprecated associated type. // We are not using IntoConnectionHandler directly only referencing the type as part of this event signature. -type NodeSwarmEvent = SwarmEvent< as NetworkBehaviour>::ToSwarm>; -impl Node +type NodeSwarmEvent = + SwarmEvent< as NetworkBehaviour>::ToSwarm>; +impl Node where + P: Recon + Send + Sync, I: Recon + Send + Sync, M: Recon + Send + Sync, S: iroh_bitswap::Store + Send + Sync, @@ -144,8 +152,9 @@ where pub async fn new( config: Config, rpc_addr: P2pAddr, - keypair: Keypair, - recons: Option<(I, M)>, + node_key: NodeKey, + peer_svc: impl PeerService + 'static, + recons: Option<(P, I, M)>, block_store: Arc, metrics: Metrics, ) -> Result { @@ -159,7 +168,7 @@ where let mut swarm = build_swarm( &libp2p_config, - keypair, + node_key.p2p_keypair(), recons, block_store, metrics.clone(), @@ -192,6 +201,17 @@ where .unwrap() }); + let (peers_tx, peers_rx) = channel(1_000); + let peers_task = tokio::task::spawn(async move { + peers::run( + Duration::from_secs(24 * 60 * 60), + node_key, + peer_svc, + peers_rx, + ) + .await + }); + let rpc_client = RpcClient::new(rpc_client) .await .context("failed to create rpc client")?; @@ -229,6 +249,8 @@ where network_events: Vec::new(), rpc_client, rpc_task, + peers_tx, + peers_task, use_dht: libp2p_config.kademlia, bitswap_sessions: Default::default(), providers: Providers::new(4), @@ -456,7 +478,7 @@ where #[tracing::instrument(skip_all)] async fn handle_swarm_event( &mut self, - event: NodeSwarmEvent, + event: NodeSwarmEvent, ) -> Result> { libp2p_metrics().record(&event); match event { @@ -505,6 +527,32 @@ where } Ok(None) } + SwarmEvent::ExternalAddrConfirmed { address } => { + if let Err(err) = self + .peers_tx + .send(peers::Message::NewLocalAddress(address)) + .await + { + warn!( + address = ?err.0, + "failed to notifiy peers task about a new local address" + ); + } + Ok(None) + } + SwarmEvent::ExternalAddrExpired { address } => { + if let Err(err) = self + .peers_tx + .send(peers::Message::RemoveLocalAddress(address)) + .await + { + warn!( + address = ?err.0, + "failed to notifiy peers task about an expired local address" + ); + } + Ok(None) + } _ => Ok(None), } } @@ -1150,23 +1198,6 @@ enum SwarmEventResult { KademliaBoostrapSuccess, } -pub async fn load_identity(kc: &mut Keychain) -> Result { - if kc.is_empty().await? { - info!("no identity found, creating",); - kc.create_ed25519_key().await?; - } - - // for now we just use the first key - let first_key = kc.keys().next().await; - if let Some(keypair) = first_key { - let keypair: Keypair = keypair?.into(); - info!("identity loaded: {}", PeerId::from(keypair.public())); - return Ok(keypair); - } - - Err(anyhow!("inconsistent key state")) -} - #[cfg(test)] mod tests { use std::marker::PhantomData; @@ -1315,6 +1346,80 @@ mod tests { unreachable!() } } + #[derive(Clone)] + struct DummyPeers; + + #[async_trait] + impl Recon for DummyPeers { + type Key = PeerKey; + type Hash = Sha256a; + + async fn insert( + &self, + _items: Vec>, + _informant: NodeId, + ) -> ReconResult> { + unreachable!() + } + + async fn range( + &self, + _left_fencepost: Self::Key, + _right_fencepost: Self::Key, + _offset: usize, + _limit: usize, + ) -> ReconResult> { + unreachable!() + } + + async fn len(&self) -> ReconResult { + unreachable!() + } + + async fn value_for_key(&self, _key: Self::Key) -> ReconResult>> { + Ok(None) + } + async fn interests(&self) -> ReconResult>> { + unreachable!() + } + + async fn process_interests( + &self, + _interests: Vec>, + ) -> ReconResult>> { + unreachable!() + } + + async fn initial_range( + &self, + _interest: RangeOpen, + ) -> ReconResult> { + unreachable!() + } + + async fn process_range( + &self, + _range: RangeHash, + ) -> ReconResult> { + unreachable!() + } + + fn metrics(&self) -> recon::Metrics { + unreachable!() + } + } + #[async_trait] + impl PeerService for DummyPeers { + async fn insert(&self, peer: &PeerKey) -> anyhow::Result<()> { + unreachable!() + } + async fn delete_range(&self, range: RangeOpen<&PeerKey>) -> anyhow::Result<()> { + unreachable!() + } + async fn all_peers(&self) -> anyhow::Result> { + unreachable!() + } + } impl TestRunnerBuilder { fn new() -> Self { @@ -1398,7 +1503,7 @@ mod tests { network_config, rpc_server_addr, keypair.into(), - None::<(DummyRecon, DummyRecon)>, + None::<(DummyPeers, DummyRecon, DummyRecon)>, store, metrics, ) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs new file mode 100644 index 000000000..43ac38fe9 --- /dev/null +++ b/p2p/src/peers.rs @@ -0,0 +1,147 @@ +use std::{collections::BTreeSet, ops::Range, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use ceramic_core::{NodeId, NodeKey, PeerEntry, PeerKey, RangeOpen}; +use libp2p::Multiaddr; +use recon::InterestProvider; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tracing::warn; + +/// [`InterestProvider`] that is interested in [`PeerKey`]s that have not expired. +#[derive(Debug, Clone)] +pub struct PeerKeyInterests; + +#[async_trait] +impl InterestProvider for PeerKeyInterests { + type Key = PeerKey; + + async fn interests(&self) -> recon::Result>> { + let now = chrono::Utc::now().timestamp() as u64; + Ok(vec![( + PeerKey::builder().with_expiration(now).build_fencepost(), + PeerKey::builder().with_max_expiration().build_fencepost(), + ) + .into()]) + } +} + +#[async_trait] +pub trait PeerService: Send + Sync { + async fn insert(&self, peer: &PeerKey) -> anyhow::Result<()>; + async fn delete_range(&self, range: Range<&PeerKey>) -> anyhow::Result<()>; + async fn all_peers(&self) -> anyhow::Result>; +} + +#[async_trait] +impl PeerService for Arc { + async fn insert(&self, peer: &PeerKey) -> anyhow::Result<()> { + self.as_ref().insert(peer).await + } + async fn delete_range(&self, range: Range<&PeerKey>) -> anyhow::Result<()> { + self.as_ref().delete_range(range).await + } + async fn all_peers(&self) -> anyhow::Result> { + self.as_ref().all_peers().await + } +} + +#[derive(Debug)] +pub enum Message { + /// Inform the peers loop about a new local address. + NewLocalAddress(Multiaddr), + /// Inform the peers loop about a local address that is no longer valid. + RemoveLocalAddress(Multiaddr), + /// Report a list of all remote peers. + #[allow(dead_code)] // This will be used as part of issue #606 + AllRemotePeers(oneshot::Sender>>), +} + +/// Run a loop handling messages and publishing the local node into the Peer recon ring. +pub async fn run( + expiration: Duration, + node_key: NodeKey, + svc: impl PeerService, + mut messages: mpsc::Receiver, +) { + let mut addresses = BTreeSet::new(); + let mut interval = tokio::time::interval(expiration / 2); + loop { + select! { + _ = interval.tick() => { + do_tick(expiration, &node_key, addresses.iter().cloned().collect(), &svc).await + } + Some(m) = messages.recv() => { + handle_message(node_key.id(), m, &mut addresses,&svc).await + } + } + } +} +async fn do_tick( + expiration: Duration, + node_key: &NodeKey, + addressess: Vec, + svc: &impl PeerService, +) { + // Publish a new peer key with a new expiration. + // Otherwise other peers will forget about the local node. + let now = chrono::Utc::now().timestamp() as u64; + let expiration = now + expiration.as_secs(); + let key = PeerKey::builder() + .with_expiration(expiration) + .with_id(node_key) + .with_addresses(addressess) + .build(); + if let Err(err) = svc.insert(&key).await { + warn!(%err, "error encountered publishing local node"); + } + // Delete all expired peers, otherwise the db would grow indefinitely + if let Err(err) = svc + .delete_range( + &PeerKey::builder().with_min_expiration().build_fencepost() + ..&PeerKey::builder().with_expiration(now).build_fencepost(), + ) + .await + { + warn!(%err, "error encountered deleting expired peer keys"); + } +} +async fn handle_message( + node_id: NodeId, + message: Message, + addressess: &mut BTreeSet, + svc: &impl PeerService, +) { + match message { + Message::NewLocalAddress(address) => { + addressess.insert(address); + } + Message::RemoveLocalAddress(address) => { + addressess.remove(&address); + } + Message::AllRemotePeers(tx) => { + let r = match svc.all_peers().await { + Ok(all_peers) => Ok(all_peers + .into_iter() + .filter_map(|peer_key| { + //Skip any peer keys that do not have a valid signature + peer_key.to_entry().ok().and_then(|peer_entry| { + // Skip the local node, we only want remote peers + if peer_entry.id() != node_id { + Some(peer_entry) + } else { + None + } + }) + }) + .collect()), + Err(err) => Err(err), + }; + if tx.send(r).is_err() { + warn!("failed to send all peers response"); + } + } + } +} diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index 348771294..8beb131e8 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, PeerKey}; use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder}; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; @@ -28,14 +28,15 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) { } } -pub(crate) async fn build_swarm( +pub(crate) async fn build_swarm( config: &Libp2pConfig, keypair: Keypair, - recons: Option<(I, M)>, + recons: Option<(P, I, M)>, block_store: Arc, metrics: Metrics, -) -> Result>> +) -> Result>> where + P: Recon, I: Recon, M: Recon, S: iroh_bitswap::Store, @@ -94,15 +95,16 @@ where } } -fn new_behavior( +fn new_behavior( config: &Libp2pConfig, keypair: &Keypair, relay_client: Option, - recons: Option<(I, M)>, + recons: Option<(P, I, M)>, block_store: Arc, metrics: Metrics, -) -> Result> +) -> Result> where + P: Recon + Send, I: Recon + Send, M: Recon + Send, S: iroh_bitswap::Store, diff --git a/peer-svc/Cargo.toml b/peer-svc/Cargo.toml new file mode 100644 index 000000000..48fe77099 --- /dev/null +++ b/peer-svc/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "ceramic-peer-svc" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +ceramic-api.workspace = true +ceramic-core.workspace = true +ceramic-metrics.workspace = true +ceramic-p2p.workspace = true +ceramic-sql.workspace = true +futures.workspace = true +prometheus-client.workspace = true +recon.workspace = true +sqlx.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true + +[dev-dependencies] +ceramic-event.workspace = true +criterion2 = { workspace = true, features = ["async", "async_tokio"] } +expect-test.workspace = true +ipld-core.workspace = true +multibase.workspace = true +paste = "1.0" +rand.workspace = true +serde.workspace = true +serde_ipld_dagcbor.workspace = true +test-log.workspace = true +tmpdir.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true +uuid.workspace = true diff --git a/peer-svc/src/error.rs b/peer-svc/src/error.rs new file mode 100644 index 000000000..93c4acb1a --- /dev/null +++ b/peer-svc/src/error.rs @@ -0,0 +1,101 @@ +#[derive(Debug, thiserror::Error)] +/// The Errors that can be raised by store operations +pub enum Error { + #[error("Application error encountered: {error}")] + /// An internal application error that is not fatal to the process e.g. a 500/unhandled error + Application { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("InvalidArgument: {error}")] + /// Invalid client input + InvalidArgument { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("Fatal error encountered: {error}")] + /// A fatal error that is unlikely to be recoverable, and may require terminating the process completely + Fatal { + /// The error details that may include context and other information + error: anyhow::Error, + }, + #[error("Transient error encountered: {error}")] + /// An error that can be retried, and may resolve itself. If an error is transient repeatedly, it should be + /// considered an "application" level error and propagated upward. + Transient { + /// The error details that may include context and other information + error: anyhow::Error, + }, +} + +impl Error { + /// Create a transient error + pub fn new_transient(error: impl Into) -> Self { + Self::Transient { + error: error.into(), + } + } + /// Create a fatal error + pub fn new_fatal(error: impl Into) -> Self { + Self::Fatal { + error: error.into(), + } + } + + /// Create an application error + pub fn new_app(error: impl Into) -> Self { + Self::Application { + error: error.into(), + } + } + + /// Crate an InvalidArgument error + pub fn new_invalid_arg(error: impl Into) -> Self { + Self::InvalidArgument { + error: error.into(), + } + } + + /// Add context to the internal error. Works identically to `anyhow::context` + pub fn context(self, context: C) -> Self + where + C: std::fmt::Display + Send + Sync + 'static, + { + match self { + Error::Application { error } => Self::Application { + error: error.context(context), + }, + Error::Fatal { error } => Self::Fatal { + error: error.context(context), + }, + Error::Transient { error } => Self::Transient { + error: error.context(context), + }, + Error::InvalidArgument { error } => Self::InvalidArgument { + error: error.context(context), + }, + } + } +} + +impl From for recon::Error { + fn from(value: Error) -> Self { + match value { + Error::Application { error } => recon::Error::Application { error }, + Error::Fatal { error } => recon::Error::Fatal { error }, + Error::Transient { error } => recon::Error::Transient { error }, + Error::InvalidArgument { error } => recon::Error::Application { error }, + } + } +} + +impl From for Error { + fn from(value: crate::store::Error) -> Self { + match value { + crate::store::Error::Application { error } => Error::Application { error }, + crate::store::Error::Fatal { error } => Error::Fatal { error }, + crate::store::Error::Transient { error } => Error::Transient { error }, + crate::store::Error::InvalidArgument { error } => Error::InvalidArgument { error }, + } + } +} diff --git a/peer-svc/src/lib.rs b/peer-svc/src/lib.rs new file mode 100644 index 000000000..f5864877e --- /dev/null +++ b/peer-svc/src/lib.rs @@ -0,0 +1,11 @@ +//! The Event Service provides an API for ingesting and querying Ceramic Events. +#![warn(missing_docs)] + +mod error; +mod service; +pub mod store; +#[cfg(test)] +mod tests; + +pub use error::Error; +pub use service::PeerService; diff --git a/peer-svc/src/service.rs b/peer-svc/src/service.rs new file mode 100644 index 000000000..9500a5cb0 --- /dev/null +++ b/peer-svc/src/service.rs @@ -0,0 +1,133 @@ +use std::ops::Range; + +use ceramic_core::{NodeId, PeerKey}; +use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use tracing::instrument; + +use crate::store::PeerDB; +use crate::store::SqlitePool; +use crate::Error; + +/// A Service that understands how to process and store Ceramic [`ceramic_core::PeerKey`]s. +/// Implements the [`recon::Store`], [`ceramic_p2p::PeerService`]. +#[derive(Debug)] +pub struct PeerService { + pub(crate) pool: SqlitePool, +} +impl PeerService { + /// Construct a new interest service from a [`SqlitePool`]. + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait::async_trait] +impl recon::Store for PeerService { + type Key = PeerKey; + type Hash = Sha256a; + + /// Insert new keys into the key space. + /// Returns true for each key if it did not previously exist, in the + /// same order as the input iterator. + #[instrument(skip(self))] + async fn insert_many( + &self, + items: &[ReconItem], + _informant: NodeId, + ) -> ReconResult> { + let keys = items.iter().map(|item| &item.key).collect::>(); + Ok(PeerDB::insert_many(&self.pool, &keys) + .await + .map_err(Error::from)?) + } + + /// Return the hash of all keys in the range between left_fencepost and right_fencepost. + /// Both range bounds are exclusive. + /// Returns ReconResult<(Hash, count), Err> + #[instrument(skip(self))] + async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(PeerDB::hash_range(&self.pool, range) + .await + .map_err(Error::from)?) + } + + /// Return all keys in the range between left_fencepost and right_fencepost. + /// Both range bounds are exclusive. + /// + /// Offset and limit values are applied within the range of keys. + #[instrument(skip(self))] + async fn range( + &self, + range: Range<&Self::Key>, + + offset: usize, + limit: usize, + ) -> ReconResult + Send + 'static>> { + Ok(Box::new( + PeerDB::range(&self.pool, range, offset, limit) + .await + .map_err(Error::from)? + .into_iter(), + )) + } + + /// Return all keys and values in the range between left_fencepost and right_fencepost. + /// Both range bounds are exclusive. + /// + /// Offset and limit values are applied within the range of keys. + #[instrument(skip(self))] + async fn range_with_values( + &self, + range: Range<&Self::Key>, + offset: usize, + limit: usize, + ) -> ReconResult)> + Send + 'static>> { + let res = PeerDB::range(&self.pool, range, offset, limit) + .await + .map_err(Error::from)?; + Ok(Box::new(res.into_iter().map(|key| (key, vec![])))) + } + /// Return the number of keys within the range. + #[instrument(skip(self))] + async fn count(&self, range: Range<&Self::Key>) -> ReconResult { + Ok(PeerDB::count(&self.pool, range) + .await + .map_err(Error::from)?) + } + + /// value_for_key returns + /// Ok(Some(value)) if stored, + /// Ok(None) if not stored, and + /// Err(e) if retrieving failed. + #[instrument(skip(self))] + async fn value_for_key(&self, _key: &Self::Key) -> ReconResult>> { + Ok(Some(vec![])) + } +} + +#[async_trait::async_trait] +impl ceramic_p2p::PeerService for PeerService { + async fn insert(&self, peer: &PeerKey) -> anyhow::Result<()> { + PeerDB::insert_many(&self.pool, &[peer]) + .await + .map_err(Error::from)?; + Ok(()) + } + async fn delete_range(&self, range: Range<&PeerKey>) -> anyhow::Result<()> { + PeerDB::delete_range(&self.pool, range) + .await + .map_err(Error::from)?; + Ok(()) + } + async fn all_peers(&self) -> anyhow::Result> { + Ok(PeerDB::range( + &self.pool, + &PeerKey::builder().with_min_expiration().build_fencepost() + ..&PeerKey::builder().with_max_expiration().build_fencepost(), + 0, + usize::MAX, + ) + .await + .map_err(Error::from)?) + } +} diff --git a/peer-svc/src/store/metrics.rs b/peer-svc/src/store/metrics.rs new file mode 100644 index 000000000..d93109c50 --- /dev/null +++ b/peer-svc/src/store/metrics.rs @@ -0,0 +1,235 @@ +use std::{ops::Range, time::Duration}; + +use async_trait::async_trait; +use ceramic_core::{NodeId, PeerKey}; +use ceramic_metrics::{register, Recorder}; +use ceramic_p2p::PeerService; +use futures::Future; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::{exponential_buckets, Histogram}, + }, + registry::Registry, +}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; +use tokio::time::Instant; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct StorageQuery { + pub name: &'static str, + pub duration: Duration, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct InsertEvent { + pub cnt: u64, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct QueryLabels { + name: &'static str, +} + +impl From<&StorageQuery> for QueryLabels { + fn from(value: &StorageQuery) -> Self { + Self { name: value.name } + } +} + +#[derive(Clone, Debug)] +/// Storage system metrics +pub struct Metrics { + key_value_insert_count: Counter, + + query_durations: Family, +} + +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("store"); + + register!( + key_value_insert_count, + "Number times a new key/value pair is inserted into the datastore", + Counter::default(), + sub_registry + ); + + register!( + query_durations, + "Durations of store queries in seconds", + Family::::new_with_constructor(|| { + Histogram::new(exponential_buckets(0.005, 2.0, 20)) + }), + sub_registry + ); + + Self { + key_value_insert_count, + query_durations, + } + } +} + +impl Recorder for Metrics { + fn record(&self, event: &InsertEvent) { + self.key_value_insert_count.inc_by(event.cnt); + } +} + +impl Recorder for Metrics { + fn record(&self, event: &StorageQuery) { + let labels: QueryLabels = event.into(); + self.query_durations + .get_or_create(&labels) + .observe(event.duration.as_secs_f64()); + } +} + +/// Implement the Store and record metrics +#[derive(Debug, Clone)] +pub struct StoreMetricsMiddleware +where + S: Send + Sync, +{ + store: S, + metrics: Metrics, +} + +impl StoreMetricsMiddleware { + /// Construct a new StoreMetricsMiddleware. + /// The metrics should have already be registered. + pub fn new(store: S, metrics: Metrics) -> Self { + Self { store, metrics } + } + // Record metrics for a given API endpoint + async fn record(metrics: &Metrics, name: &'static str, fut: impl Future) -> T { + let start = Instant::now(); + let ret = fut.await; + let duration = start.elapsed(); + let event = StorageQuery { name, duration }; + metrics.record(&event); + ret + } +} + +#[async_trait] +impl recon::Store for StoreMetricsMiddleware +where + S: recon::Store + Send + Sync, + K: recon::Key, + H: AssociativeHash, +{ + type Key = K; + type Hash = H; + + async fn insert_many( + &self, + items: &[ReconItem], + informant: NodeId, + ) -> ReconResult> { + let res = StoreMetricsMiddleware::::record( + &self.metrics, + "insert_many", + self.store.insert_many(items, informant), + ) + .await?; + + self.metrics.record(&InsertEvent { + cnt: res.count_inserted() as u64, + }); + + Ok(res) + } + + async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record( + &self.metrics, + "hash_range", + self.store.hash_range(range), + ) + .await + } + + async fn range( + &self, + range: Range<&Self::Key>, + offset: usize, + limit: usize, + ) -> ReconResult + Send + 'static>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "range", + self.store.range(range, offset, limit), + ) + .await + } + async fn range_with_values( + &self, + range: Range<&Self::Key>, + offset: usize, + limit: usize, + ) -> ReconResult)> + Send + 'static>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "range_with_values", + self.store.range_with_values(range, offset, limit), + ) + .await + } + + async fn full_range( + &self, + ) -> ReconResult + Send + 'static>> { + StoreMetricsMiddleware::::record(&self.metrics, "full_range", self.store.full_range()) + .await + } + + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await + } + async fn count(&self, range: Range<&Self::Key>) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await + } + async fn len(&self) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "len", self.store.len()).await + } + + async fn is_empty(&self) -> ReconResult { + StoreMetricsMiddleware::::record(&self.metrics, "is_empty", self.store.is_empty()).await + } + + async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { + StoreMetricsMiddleware::::record( + &self.metrics, + "value_for_key", + self.store.value_for_key(key), + ) + .await + } +} +#[async_trait] +impl ceramic_p2p::PeerService for StoreMetricsMiddleware +where + S: PeerService + Send + Sync, +{ + async fn insert(&self, peer: &PeerKey) -> anyhow::Result<()> { + StoreMetricsMiddleware::::record(&self.metrics, "insert", self.store.insert(peer)).await + } + async fn delete_range(&self, range: Range<&PeerKey>) -> anyhow::Result<()> { + StoreMetricsMiddleware::::record( + &self.metrics, + "delete_range", + self.store.delete_range(range), + ) + .await + } + async fn all_peers(&self) -> anyhow::Result> { + StoreMetricsMiddleware::::record(&self.metrics, "all_peers", self.store.all_peers()) + .await + } +} diff --git a/peer-svc/src/store/mod.rs b/peer-svc/src/store/mod.rs new file mode 100644 index 000000000..cca34c655 --- /dev/null +++ b/peer-svc/src/store/mod.rs @@ -0,0 +1,7 @@ +//! An implementation of store for event. + +mod metrics; +mod sql; + +pub use metrics::{Metrics, StoreMetricsMiddleware}; +pub use sql::{CeramicOneVersion, Error, PeerDB, Result, SqlitePool, SqliteTransaction}; diff --git a/peer-svc/src/store/sql/access/mod.rs b/peer-svc/src/store/sql/access/mod.rs new file mode 100644 index 000000000..67b1d7fc3 --- /dev/null +++ b/peer-svc/src/store/sql/access/mod.rs @@ -0,0 +1,5 @@ +mod peer; +mod version; + +pub use peer::PeerDB; +pub use version::CeramicOneVersion; diff --git a/peer-svc/src/store/sql/access/peer.rs b/peer-svc/src/store/sql/access/peer.rs new file mode 100644 index 000000000..f2892742d --- /dev/null +++ b/peer-svc/src/store/sql/access/peer.rs @@ -0,0 +1,152 @@ +#![warn(missing_docs, missing_debug_implementations, clippy::all)] + +use std::ops::Range; + +use anyhow::anyhow; + +use ceramic_core::PeerKey; +use recon::{AssociativeHash, HashCount, InsertResult, Key, Sha256a}; +use sqlx::Row; + +use crate::store::{ + sql::{ + entities::ReconHash, + query::{ReconQuery, SqlBackend}, + SqliteTransaction, + }, + Error, Result, SqlitePool, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// Entity for storing PeerKey in the database. +pub struct PeerDB {} + +type PeerKeyError = >>::Error; + +impl PeerDB { + async fn insert_tx<'a>(conn: &mut SqliteTransaction<'a>, key: &PeerKey) -> Result { + let key_insert = sqlx::query(ReconQuery::insert_interest()); + + let hash = Sha256a::digest(key); + let resp = key_insert + .bind(key.as_bytes()) + .bind(hash.as_u32s()[0]) + .bind(hash.as_u32s()[1]) + .bind(hash.as_u32s()[2]) + .bind(hash.as_u32s()[3]) + .bind(hash.as_u32s()[4]) + .bind(hash.as_u32s()[5]) + .bind(hash.as_u32s()[6]) + .bind(hash.as_u32s()[7]) + .execute(&mut **conn.inner()) + .await; + match resp { + std::result::Result::Ok(_rows) => Ok(true), + Err(sqlx::Error::Database(err)) => { + if err.is_unique_violation() { + Ok(false) + } else { + Err(Error::from(sqlx::Error::Database(err))) + } + } + Err(err) => Err(err.into()), + } + } + + /// Insert a single interest into the database. + pub async fn insert(pool: &SqlitePool, key: &PeerKey) -> Result { + let mut tx = pool.begin_tx().await.map_err(Error::from)?; + let new_key = PeerDB::insert_tx(&mut tx, key).await?; + tx.commit().await.map_err(Error::from)?; + Ok(new_key) + } + + /// Insert a multiple interests into the database. + pub async fn insert_many( + pool: &SqlitePool, + items: &[&PeerKey], + ) -> Result> { + match items.len() { + 0 => Ok(InsertResult::new(0)), + _ => { + let mut results = 0; + let mut tx = pool.begin_tx().await.map_err(Error::from)?; + + for item in items.iter() { + PeerDB::insert_tx(&mut tx, item) + .await? + .then(|| results += 1); + } + tx.commit().await.map_err(Error::from)?; + Ok(InsertResult::new(results)) + } + } + } + + /// Calculate the Sha256a hash of all keys in the range between left_fencepost and right_fencepost. + pub async fn hash_range( + pool: &SqlitePool, + range: Range<&PeerKey>, + ) -> Result> { + if range.start >= range.end { + return Ok(HashCount::new(Sha256a::identity(), 0)); + } + + let res: ReconHash = sqlx::query_as(ReconQuery::hash_range(SqlBackend::Sqlite)) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_one(pool.reader()) + .await + .map_err(Error::from)?; + let bytes = res.hash(); + Ok(HashCount::new(Sha256a::from(bytes), res.count())) + } + + /// Find the keys in the range + pub async fn range( + pool: &SqlitePool, + range: Range<&PeerKey>, + offset: usize, + limit: usize, + ) -> Result> { + let query = sqlx::query(ReconQuery::range()); + let rows = query + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(pool.reader()) + .await?; + let rows = rows + .into_iter() + .map(|row| { + let bytes: Vec = row.get(0); + PeerKey::try_from(bytes) + }) + .collect::, PeerKeyError>>() + .map_err(|e| Error::new_app(anyhow!(e)))?; + Ok(rows) + } + + pub(crate) async fn delete_range(pool: &SqlitePool, range: Range<&PeerKey>) -> Result<()> { + let query = sqlx::query(ReconQuery::delete_range()); + query + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .execute(pool.writer()) + .await?; + Ok(()) + } + + /// Count the number of keys in a given range + pub async fn count(pool: &SqlitePool, range: Range<&PeerKey>) -> Result { + let row = sqlx::query(ReconQuery::count(SqlBackend::Sqlite)) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_one(pool.reader()) + .await + .map_err(Error::from)?; + + Ok(row.get::<'_, i64, _>(0) as usize) + } +} diff --git a/peer-svc/src/store/sql/access/version.rs b/peer-svc/src/store/sql/access/version.rs new file mode 100644 index 000000000..fd4202a66 --- /dev/null +++ b/peer-svc/src/store/sql/access/version.rs @@ -0,0 +1,98 @@ +use std::str::FromStr; + +use anyhow::anyhow; + +use crate::store::{ + sql::{entities::VersionRow, SqlitePool}, + Error, Result, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +/// It's kind of pointless to roundtrip CARGO_PKG_VERSION through this struct, +/// but it makes it clear how we expect to format our versions in the database. +struct SemVer { + major: u64, + minor: u64, + patch: u64, +} + +impl std::fmt::Display for SemVer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}.{}", self.major, self.minor, self.patch) + } +} + +impl std::str::FromStr for SemVer { + type Err = Error; + + fn from_str(s: &str) -> std::result::Result { + let parts: Vec<&str> = s.split('.').collect(); + if parts.len() != 3 { + Err(Error::new_invalid_arg(anyhow!( + "Invalid version. Must have 3 parts: {}", + s.to_string() + ))) + } else { + let major = parts[0].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Major did not parse: {}", + s.to_string() + )) + })?; + let minor = parts[1].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Minor did not parse: {}", + s.to_string() + )) + })?; + let patch = parts[2].parse().map_err(|_| { + Error::new_invalid_arg(anyhow!( + "Invalid version. Patch did not parse: {}", + s.to_string() + )) + })?; + Ok(Self { + major, + minor, + patch, + }) + } + } +} + +#[derive(Debug, Clone)] +/// Access to ceramic version information +pub struct CeramicOneVersion {} + +impl CeramicOneVersion { + /// Fetch the previous version from the database. May be None if no previous version exists. + pub async fn fetch_previous(pool: &SqlitePool) -> Result> { + let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?; + VersionRow::_fetch_previous(pool, ¤t.to_string()).await + } + + /// Insert the current version into the database + pub async fn insert_current(pool: &SqlitePool) -> Result<()> { + let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?; + VersionRow::insert_current(pool, ¤t.to_string()).await + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::store::SqlitePool; + + #[tokio::test] + async fn insert_version() { + let mem = SqlitePool::connect_in_memory().await.unwrap(); + CeramicOneVersion::insert_current(&mem).await.unwrap(); + } + + #[tokio::test] + async fn prev_version() { + let mem = SqlitePool::connect_in_memory().await.unwrap(); + CeramicOneVersion::fetch_previous(&mem).await.unwrap(); + } +} diff --git a/peer-svc/src/store/sql/entities/hash.rs b/peer-svc/src/store/sql/entities/hash.rs new file mode 100644 index 000000000..250fd68b2 --- /dev/null +++ b/peer-svc/src/store/sql/entities/hash.rs @@ -0,0 +1,48 @@ +use sqlx::{sqlite::SqliteRow, Row as _}; + +#[derive(Debug, Clone)] +pub struct ReconHash { + pub count: i64, + pub ahash_0: u32, + pub ahash_1: u32, + pub ahash_2: u32, + pub ahash_3: u32, + pub ahash_4: u32, + pub ahash_5: u32, + pub ahash_6: u32, + pub ahash_7: u32, +} + +impl sqlx::FromRow<'_, SqliteRow> for ReconHash { + fn from_row(row: &SqliteRow) -> std::result::Result { + Ok(Self { + count: row.try_get("count")?, + ahash_0: row.try_get("ahash_0")?, + ahash_1: row.try_get("ahash_1")?, + ahash_2: row.try_get("ahash_2")?, + ahash_3: row.try_get("ahash_3")?, + ahash_4: row.try_get("ahash_4")?, + ahash_5: row.try_get("ahash_5")?, + ahash_6: row.try_get("ahash_6")?, + ahash_7: row.try_get("ahash_7")?, + }) + } +} + +impl ReconHash { + pub fn count(&self) -> u64 { + self.count as u64 + } + pub fn hash(&self) -> [u32; 8] { + [ + self.ahash_0, + self.ahash_1, + self.ahash_2, + self.ahash_3, + self.ahash_4, + self.ahash_5, + self.ahash_6, + self.ahash_7, + ] + } +} diff --git a/peer-svc/src/store/sql/entities/mod.rs b/peer-svc/src/store/sql/entities/mod.rs new file mode 100644 index 000000000..4860a3909 --- /dev/null +++ b/peer-svc/src/store/sql/entities/mod.rs @@ -0,0 +1,5 @@ +mod hash; +mod version; + +pub use hash::ReconHash; +pub use version::VersionRow; diff --git a/peer-svc/src/store/sql/entities/version.rs b/peer-svc/src/store/sql/entities/version.rs new file mode 100644 index 000000000..b30ebf908 --- /dev/null +++ b/peer-svc/src/store/sql/entities/version.rs @@ -0,0 +1,39 @@ +use sqlx::types::chrono; + +use crate::store::{Result, SqlitePool}; + +#[derive(Debug, Clone, sqlx::FromRow)] +// We want to retrieve these fields for logging but we don't refer to them directly +#[allow(dead_code)] +pub struct VersionRow { + id: i64, + pub version: String, + pub installed_at: chrono::NaiveDateTime, + pub last_started_at: chrono::NaiveDateTime, +} + +impl VersionRow { + /// Return the version installed before the current version + pub async fn _fetch_previous(pool: &SqlitePool, current_version: &str) -> Result> { + Ok(sqlx::query_as( + "SELECT id, version, installed_at + FROM ceramic_one_version + WHERE version <> $1 + ORDER BY installed_at DESC limit 1;", + ) + .bind(current_version) + .fetch_optional(pool.reader()) + .await?) + } + + /// Add the current version to the database, updating the last_started_at field if the version already exists + pub async fn insert_current(pool: &SqlitePool, current_version: &str) -> Result<()> { + sqlx::query( + "INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT (version) DO UPDATE set last_started_at = CURRENT_TIMESTAMP;", + ) + .bind(current_version) + .execute(pool.writer()) + .await?; + Ok(()) + } +} diff --git a/peer-svc/src/store/sql/mod.rs b/peer-svc/src/store/sql/mod.rs new file mode 100644 index 000000000..578f15822 --- /dev/null +++ b/peer-svc/src/store/sql/mod.rs @@ -0,0 +1,9 @@ +mod access; +pub mod entities; +mod query; + +pub use access::{CeramicOneVersion, PeerDB}; +pub use ceramic_sql::{ + sqlite::{SqlitePool, SqliteTransaction}, + Error, Result, +}; diff --git a/peer-svc/src/store/sql/query.rs b/peer-svc/src/store/sql/query.rs new file mode 100644 index 000000000..ed16d8a8a --- /dev/null +++ b/peer-svc/src/store/sql/query.rs @@ -0,0 +1,72 @@ +/// Holds the SQL queries for accessing interests across DB types +pub struct ReconQuery {} +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SqlBackend { + Sqlite, +} + +impl ReconQuery { + /// Requires 9 parameters: the order_key and the 8 hash values + pub fn insert_interest() -> &'static str { + "INSERT INTO ceramic_one_peer ( + order_key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7 + ) VALUES ( + $1, + $2, $3, $4, $5, + $6, $7, $8, $9 + );" + } + + /// Requires binding 2 parameters. Returned as `ReconHash` struct + pub fn hash_range(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { + r#"SELECT + TOTAL(ahash_0) & 0xFFFFFFFF as ahash_0, TOTAL(ahash_1) & 0xFFFFFFFF as ahash_1, + TOTAL(ahash_2) & 0xFFFFFFFF as ahash_2, TOTAL(ahash_3) & 0xFFFFFFFF as ahash_3, + TOTAL(ahash_4) & 0xFFFFFFFF as ahash_4, TOTAL(ahash_5) & 0xFFFFFFFF as ahash_5, + TOTAL(ahash_6) & 0xFFFFFFFF as ahash_6, TOTAL(ahash_7) & 0xFFFFFFFF as ahash_7, + COUNT(1) as count + FROM ceramic_one_peer + WHERE order_key >= $1 AND order_key < $2;"# + } + } + } + /// Requires binding 4 parameters + pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_peer + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT + $3 + OFFSET + $4;"# + } + /// Requires binding 2 parameters + pub fn delete_range() -> &'static str { + r#" DELETE FROM + ceramic_one_peer + WHERE + order_key >= $1 AND order_key < $2;"# + } + + pub fn count(db: SqlBackend) -> &'static str { + match db { + SqlBackend::Sqlite => { + r#"SELECT + count(order_key) as res + FROM + ceramic_one_peer + WHERE + order_key >= $1 AND order_key < $2"# + } + } + } +} diff --git a/peer-svc/src/tests/mod.rs b/peer-svc/src/tests/mod.rs new file mode 100644 index 000000000..e93c2c91f --- /dev/null +++ b/peer-svc/src/tests/mod.rs @@ -0,0 +1 @@ +mod peer; diff --git a/peer-svc/src/tests/peer.rs b/peer-svc/src/tests/peer.rs new file mode 100644 index 000000000..a16019cca --- /dev/null +++ b/peer-svc/src/tests/peer.rs @@ -0,0 +1,281 @@ +use std::collections::BTreeSet; + +use ceramic_core::{ + peer::{Builder, Init}, + NodeKey, PeerKey, +}; +use rand::{thread_rng, Rng}; +use recon::{ReconItem, Sha256a}; +use test_log::test; + +macro_rules! test_with_sqlite { + ($test_name: ident, $test_fn: expr $(, $sql_stmts:expr)?) => { + paste::paste! { + #[test(tokio::test)] + async fn [<$test_name _sqlite>]() { + + let conn = $crate::store::SqlitePool::connect_in_memory().await.unwrap(); + let service = $crate::PeerService::new(conn); + $( + for stmt in $sql_stmts { + service.pool.run_statement(stmt).await.unwrap(); + } + )? + $test_fn(&service).await; + } + } + }; +} + +/// test_name (will eventually generate multiple tests when we have multiple databases to test) +/// test_fn (the test function that will be run for both databases) +/// sql_stmts (optional, array of sql statements to run before the test) +macro_rules! test_with_dbs { + ($test_name: ident, $test_fn: expr $(, $sql_stmts:expr)?) => { + test_with_sqlite!($test_name, $test_fn $(, $sql_stmts)?); + } +} + +// Return an builder for an event with the same network,model,controller,stream. +pub(crate) fn peer_key_builder() -> Builder { + PeerKey::builder() +} + +// Generate an event for the same network,model,controller,stream +// The event and height are random when when its None. +pub(crate) fn random_peer_key<'a>(expiration: Option) -> PeerKey { + peer_key_builder() + .with_expiration(expiration.unwrap_or_else(|| thread_rng().gen())) + .with_id(&NodeKey::random()) + .with_addresses(vec![ + format!("/ip4/127.0.0.1/tcp/{}", thread_rng().gen::()) + .parse() + .unwrap(), + format!("/ip4/127.0.0.1/udp/{}/quic-v1", thread_rng().gen::()) + .parse() + .unwrap(), + ]) + .build() +} +// The EventId that is the minumum of all possible random event ids +pub(crate) fn random_peer_min() -> PeerKey { + peer_key_builder().with_min_expiration().build_fencepost() +} +// The EventId that is the maximum of all possible random event ids +pub(crate) fn random_peer_max() -> PeerKey { + peer_key_builder().with_max_expiration().build_fencepost() +} + +test_with_dbs!( + test_hash_range_query, + test_hash_range_query, + ["delete from ceramic_one_interest"] +); + +async fn test_hash_range_query(store: &S) +where + S: recon::Store, +{ + recon::Store::insert_many( + store, + &[ReconItem::new(random_peer_key(Some(42)), vec![])], + NodeKey::random().id(), + ) + .await + .unwrap(); + + let hash_cnt = store + .hash_range(&random_peer_min()..&random_peer_max()) + .await + .unwrap(); + assert_eq!(1, hash_cnt.count()); + + recon::Store::insert_many( + store, + &[ReconItem::new(random_peer_key(Some(24)), vec![])], + NodeKey::random().id(), + ) + .await + .unwrap(); + + let hash_cnt = store + .hash_range(&random_peer_min()..&random_peer_max()) + .await + .unwrap(); + assert_eq!(2, hash_cnt.count()); +} + +test_with_dbs!( + test_range_query, + test_range_query, + ["delete from ceramic_one_interest"] +); + +async fn test_range_query(store: &S) +where + S: recon::Store, +{ + let interest_0 = random_peer_key(None); + let interest_1 = random_peer_key(None); + + recon::Store::insert_many( + store, + &[ReconItem::new(interest_0.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap(); + recon::Store::insert_many( + store, + &[ReconItem::new(interest_1.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap(); + let ids = recon::Store::range(store, &random_peer_min()..&random_peer_max(), 0, usize::MAX) + .await + .unwrap(); + let interests = ids.collect::>(); + assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); +} + +test_with_dbs!( + test_range_with_values_query, + test_range_with_values_query, + ["delete from ceramic_one_interest"] +); + +async fn test_range_with_values_query(store: &S) +where + S: recon::Store, +{ + let interest_0 = random_peer_key(None); + let interest_1 = random_peer_key(None); + + store + .insert_many( + &[ReconItem::new(interest_0.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap(); + store + .insert_many( + &[ReconItem::new(interest_1.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap(); + let ids = store + .range_with_values(&random_peer_min()..&random_peer_max(), 0, usize::MAX) + .await + .unwrap(); + let interests = ids + .into_iter() + .map(|(i, _v)| i) + .collect::>(); + assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); +} + +test_with_dbs!( + test_double_insert, + test_double_insert, + ["delete from ceramic_one_interest"] +); + +async fn test_double_insert(store: &S) +where + S: recon::Store, +{ + let interest = random_peer_key(None); + // do take the first one + assert!(&recon::Store::insert_many( + store, + &[ReconItem::new(interest.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap() + .included_new_key()); + + // reject the second insert of same key + assert!(!recon::Store::insert_many( + store, + &[ReconItem::new(interest.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap() + .included_new_key()); +} + +test_with_dbs!( + test_value_for_key, + test_value_for_key, + ["delete from ceramic_one_interest"] +); + +async fn test_value_for_key(store: &S) +where + S: recon::Store, +{ + let key = random_peer_key(None); + recon::Store::insert_many( + store, + &[ReconItem::new(key.clone(), Vec::new())], + NodeKey::random().id(), + ) + .await + .unwrap(); + let value = store.value_for_key(&key).await.unwrap(); + let val = value.unwrap(); + let empty: Vec = vec![]; + assert_eq!(empty, val); +} + +test_with_dbs!( + test_insert, + test_insert, + ["delete from ceramic_one_interest"] +); + +async fn test_insert(service: &S) +where + S: ceramic_p2p::PeerService, +{ + let key_0 = random_peer_key(Some(42)); + let key_1 = random_peer_key(Some(43)); + service.insert(&key_0).await.unwrap(); + let peers = service.all_peers().await.unwrap(); + assert_eq!(vec![key_0.clone()], peers); + service.insert(&key_1).await.unwrap(); + let peers = service.all_peers().await.unwrap(); + assert_eq!(vec![key_0, key_1], peers); +} + +test_with_dbs!( + test_insert_delete, + test_insert_delete, + ["delete from ceramic_one_interest"] +); + +async fn test_insert_delete(service: &S) +where + S: ceramic_p2p::PeerService, +{ + let key_0 = random_peer_key(Some(42)); + let key_1 = random_peer_key(Some(43)); + service.insert(&key_0).await.unwrap(); + let peers = service.all_peers().await.unwrap(); + assert_eq!(vec![key_0.clone()], peers); + service.insert(&key_1).await.unwrap(); + let peers = service.all_peers().await.unwrap(); + assert_eq!(vec![key_0.clone(), key_1.clone()], peers); + // Delete key_0 + service + .delete_range(&random_peer_min()..&key_1) + .await + .unwrap(); + let peers = service.all_peers().await.unwrap(); + assert_eq!(vec![key_1.clone()], peers); +} diff --git a/recon/src/lib.rs b/recon/src/lib.rs index ef18c9f98..8e8f88090 100644 --- a/recon/src/lib.rs +++ b/recon/src/lib.rs @@ -5,9 +5,9 @@ pub use crate::{ error::Error, metrics::Metrics, recon::{ - btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount, - InsertResult, InterestProvider, InterestStore, InvalidItem, Key, RangeHash, Recon, - ReconInterestProvider, ReconItem, Split, Store, SyncState, + btreestore::BTreeStore, AssociativeHash, FullInterests, HashCount, InsertResult, + InterestProvider, InvalidItem, Key, RangeHash, Recon, ReconInterestProvider, ReconItem, + Split, Store, SyncState, }, sha256a::Sha256a, }; diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 1dab520ee..b70cdf4fc 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -17,7 +17,7 @@ mod stream_set; mod tests; mod upgrade; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, PeerKey}; use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, @@ -42,6 +42,8 @@ use crate::{ Sha256a, }; +/// Name of the Recon protocol for synchronizing peers +pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer"; /// Name of the Recon protocol for synchronizing interests pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest"; /// Name of the Recon protocol for synchronizing models @@ -75,7 +77,8 @@ impl Default for Config { /// The Behavior tracks all peers on the network that speak the Recon protocol. /// It is responsible for starting and stopping syncs with various peers depending on the needs of /// the application. -pub struct Behaviour { +pub struct Behaviour { + peer: P, interest: I, model: M, config: Config, @@ -85,7 +88,12 @@ pub struct Behaviour { next_sync: Option>, } -impl std::fmt::Debug for Behaviour { +impl std::fmt::Debug for Behaviour +where + P: std::fmt::Debug, + I: std::fmt::Debug, + M: std::fmt::Debug, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Behaviour") .field("interest", &self.interest) @@ -135,20 +143,21 @@ pub enum PeerStatus { /// The stream_set that has failed synchronizing. stream_set: StreamSet, }, - /// Local peer has stopped synchronizing with the remote peer and will not attempt to - /// synchronize again. + /// Local peer was unable to negotiate a protocol with the remote peer. Stopped, } -impl Behaviour { +impl Behaviour { /// Create a new Behavior with the provided Recon implementation. - pub fn new(interest: I, model: M, config: Config) -> Self + pub fn new(peer: P, interest: I, model: M, config: Config) -> Self where + P: Recon, I: Recon, M: Recon, { let (tx, rx) = tokio::sync::mpsc::channel(1000); Self { + peer, interest, model, config, @@ -168,12 +177,13 @@ impl Behaviour { } } -impl NetworkBehaviour for Behaviour +impl NetworkBehaviour for Behaviour where + P: Recon, I: Recon, M: Recon, { - type ConnectionHandler = Handler; + type ConnectionHandler = Handler; type ToSwarm = Event; @@ -193,9 +203,14 @@ where connections: vec![connection_info], next_sync: BTreeMap::from_iter([ // Schedule all stream_sets initially - (StreamSet::Interest, Instant::now()), + (StreamSet::Peer, Instant::now()), + // Schedule interests after peers + ( + StreamSet::Interest, + Instant::now() + Duration::from_millis(1), + ), // Schedule models after interests - (StreamSet::Model, Instant::now() + Duration::from_millis(1)), + (StreamSet::Model, Instant::now() + Duration::from_millis(2)), ]), sync_delay: Default::default(), }); @@ -226,11 +241,12 @@ where status: info.status, }))) } else { - tracing::warn!(%peer_id, "peer not found in peers map when started syncronizing?"); + tracing::warn!(%peer_id, "peer not found in peers map when started synchronizing?"); None } } - // The peer has stopped synchronization and will never be able to resume. + + // The peer failed to negotiate a protocol with the local peer. FromHandler::Stopped => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { let info = entry.get_mut(); @@ -240,7 +256,7 @@ where status: info.status, }))) } else { - tracing::warn!(%peer_id, "peer not found in peers map when stopped syncronizing?"); + tracing::warn!(%peer_id, "peer not found in peers map when stopped synchronizing?"); None } } @@ -266,7 +282,7 @@ where status: info.status, }))) } else { - tracing::warn!(%peer_id, "peer not found in peers map when succeeded syncronizing?"); + tracing::warn!(%peer_id, "peer not found in peers map when succeeded synchronizing?"); None } } @@ -297,7 +313,7 @@ where status: info.status, }))) } else { - tracing::warn!(%peer_id, "peer not found in peers map when failed syncronizing?"); + tracing::warn!(%peer_id, "peer not found in peers map when failed synchronizing?"); None } } @@ -313,7 +329,7 @@ where cx: &mut std::task::Context<'_>, ) -> Poll>> { if let Poll::Ready(Some(event)) = self.swarm_events_receiver.poll_recv(cx) { - debug!(?event, "swarm event"); + trace!(?event, "swarm event"); return Poll::Ready(event); } // Check each peer and start synchronization as needed. @@ -330,7 +346,7 @@ where info.next_sync.iter().min_by_key(|(_, t)| *t).expect( "next_sync should always be initialized with stream sets", ); - debug!(?next_stream_set,?next_sync, now=?Instant::now(), "polling"); + trace!(?next_stream_set,?next_sync, now=?Instant::now(), "polling"); // Sync if enough time has passed since we synced the stream set. if *next_sync < Instant::now() { self.next_sync = None; @@ -371,6 +387,7 @@ where peer, connection_id, handler::State::WaitingInbound, + self.peer.clone(), self.interest.clone(), self.model.clone(), )) @@ -387,10 +404,11 @@ where Ok(Handler::new( peer, connection_id, - // Start synchronizing interests + // Start synchronizing peers handler::State::RequestOutbound { - stream_set: StreamSet::Interest, + stream_set: StreamSet::Peer, }, + self.peer.clone(), self.interest.clone(), self.model.clone(), )) diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index 5fed86625..82c4fa1d7 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, task::Poll}; use anyhow::Result; -use ceramic_core::{EventId, Interest}; +use ceramic_core::{EventId, Interest, PeerKey}; use libp2p::{ futures::FutureExt, swarm::{ @@ -22,17 +22,19 @@ use crate::{ }; #[derive(Debug)] -pub struct Handler { +pub struct Handler { remote_peer_id: PeerId, connection_id: ConnectionId, + peer: P, interest: I, model: M, state: State, behavior_events_queue: VecDeque, } -impl Handler +impl Handler where + P: Recon, I: Recon, M: Recon, { @@ -40,12 +42,14 @@ where peer_id: PeerId, connection_id: ConnectionId, state: State, + peer: P, interest: I, model: M, ) -> Self { Self { remote_peer_id: peer_id, connection_id, + peer, interest, model, state, @@ -94,7 +98,7 @@ pub enum State { Idle, WaitingInbound, RequestOutbound { stream_set: StreamSet }, - WaitingOutbound, + WaitingOutbound { stream_set: StreamSet }, Outbound(SyncFuture, StreamSet), Inbound(SyncFuture, StreamSet), } @@ -108,7 +112,10 @@ impl std::fmt::Debug for State { .debug_struct("RequestOutbound") .field("stream_set", stream_set) .finish(), - Self::WaitingOutbound => f.debug_struct("WaitingOutbound").finish(), + Self::WaitingOutbound { stream_set } => f + .debug_struct("WaitingOutbound") + .field("stream_set", stream_set) + .finish(), Self::Outbound(_, stream_set) => f .debug_tuple("Outbound") .field(&"_") @@ -142,8 +149,9 @@ pub enum FromHandler { }, } -impl ConnectionHandler for Handler +impl ConnectionHandler for Handler where + P: Recon + Clone + Send + 'static, I: Recon + Clone + Send + 'static, M: Recon + Clone + Send + 'static, { @@ -158,7 +166,7 @@ where &self, ) -> libp2p::swarm::SubstreamProtocol { SubstreamProtocol::new( - MultiReadyUpgrade::new(vec![StreamSet::Interest, StreamSet::Model]), + MultiReadyUpgrade::new(vec![StreamSet::Peer, StreamSet::Interest, StreamSet::Model]), (), ) } @@ -181,7 +189,7 @@ where State::Idle | State::WaitingOutbound { .. } | State::WaitingInbound => {} State::RequestOutbound { stream_set } => { let stream_set = *stream_set; - self.transition_state(State::WaitingOutbound); + self.transition_state(State::WaitingOutbound { stream_set }); // Start outbound connection let protocol = SubstreamProtocol::new(MultiReadyUpgrade::new(vec![stream_set]), ()); @@ -247,6 +255,14 @@ where self.behavior_events_queue .push_front(FromHandler::Started { stream_set }); let stream = match stream_set { + StreamSet::Peer => protocol::respond_synchronize( + self.remote_peer_id, + self.connection_id, + stream_set, + self.peer.clone(), + stream, + ) + .boxed(), StreamSet::Interest => protocol::respond_synchronize( self.remote_peer_id, self.connection_id, @@ -280,10 +296,18 @@ where }, ) => { match &self.state { - State::WaitingOutbound => { + State::WaitingOutbound { .. } => { self.behavior_events_queue .push_front(FromHandler::Started { stream_set }); let stream = match stream_set { + StreamSet::Peer => protocol::initiate_synchronize( + self.remote_peer_id, + self.connection_id, + stream_set, + self.peer.clone(), + stream, + ) + .boxed(), StreamSet::Interest => protocol::initiate_synchronize( self.remote_peer_id, self.connection_id, @@ -311,14 +335,15 @@ where | State::Inbound(_, _) => {} } } - libp2p::swarm::handler::ConnectionEvent::AddressChange(_) => {} // We failed to upgrade the inbound connection. libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_err) => { match self.state { State::WaitingInbound => { - // We have stopped synchronization and cannot attempt again as we are unable to - // negotiate a protocol. - // This is expected if we connected to a node that does not speak Recon + // We were unable to negotiate a protocol with the remote peer. + // This is expected if we connected to a node that does not speak any + // shared Recon/StreamSet protocol. + // The remote may try to connect again with a different protocol that we do + // speak, however until then we are Stopped. self.behavior_events_queue.push_front(FromHandler::Stopped); self.transition_state(State::Idle) } @@ -332,11 +357,16 @@ where // We failed to upgrade the outbound connection. libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(_err) => { match self.state { - State::WaitingOutbound { .. } => { - // We have stopped synchronization and cannot attempt again as we are unable to - // negotiate a protocol. - // This is expected if we connected to a node that does not speak Recon - self.behavior_events_queue.push_front(FromHandler::Stopped); + State::WaitingOutbound { stream_set } => { + // We have failed to negotiate a protocol for this Recon/StreamSet. + // We can report that this stream_set has failed and try again on another + // StreamSet. + self.behavior_events_queue.push_front(FromHandler::Failed { + stream_set, + error: anyhow::anyhow!( + "failed to negotiate recon protocol for stream set: {stream_set:?}" + ), + }); self.transition_state(State::Idle) } State::Idle diff --git a/recon/src/libp2p/stream_set.rs b/recon/src/libp2p/stream_set.rs index e9d7e32ae..e5dcf4467 100644 --- a/recon/src/libp2p/stream_set.rs +++ b/recon/src/libp2p/stream_set.rs @@ -1,10 +1,12 @@ use anyhow::anyhow; -use crate::libp2p::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL}; +use super::{PROTOCOL_NAME_INTEREST, PROTOCOL_NAME_MODEL, PROTOCOL_NAME_PEER}; /// Represents a stream set key #[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub enum StreamSet { + /// Stream set of peer ranges + Peer, /// Stream set of interest ranges Interest, /// Stream set of models @@ -15,6 +17,7 @@ impl StreamSet { /// Report the sort key for this stream set. pub fn sort_key(&self) -> &str { match self { + StreamSet::Peer => "peer", StreamSet::Interest => "interest", StreamSet::Model => "model", } @@ -26,6 +29,7 @@ impl TryFrom<&str> for StreamSet { fn try_from(value: &str) -> Result { match value { + "peer" => Ok(StreamSet::Peer), "model" => Ok(StreamSet::Model), "interest" => Ok(StreamSet::Interest), _ => Err(anyhow!("unknown sort_key {}", value)), @@ -37,6 +41,7 @@ impl AsRef for StreamSet { fn as_ref(&self) -> &str { match self { StreamSet::Interest => PROTOCOL_NAME_INTEREST, + StreamSet::Peer => PROTOCOL_NAME_PEER, StreamSet::Model => PROTOCOL_NAME_MODEL, } } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 7f603f90e..c95d0748c 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -109,20 +109,32 @@ where // use a hackro to avoid setting all the generic types we'd need if using functions macro_rules! setup_test { - ($alice_store: expr, $alice_interest: expr, $bob_store: expr, $bob_interest: expr,) => {{ + ($alice_store: expr, $alice_peer: expr, $alice_interest: expr, $bob_store: expr, $bob_peer: expr, $bob_interest: expr,) => {{ let alice = Recon::new( $alice_store, FullInterests::default(), Metrics::register(&mut Registry::default()), ); + let alice_peer = Recon::new( + $alice_peer, + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let alice_interest = Recon::new( $alice_interest, FullInterests::default(), Metrics::register(&mut Registry::default()), ); - let bob_interests = Recon::new( + let bob_peer = Recon::new( + $bob_peer, + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + + let bob_interest = Recon::new( $bob_interest, FullInterests::default(), Metrics::register(&mut Registry::default()), @@ -141,10 +153,11 @@ macro_rules! setup_test { per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), }; let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new(alice_interest, alice, config.clone()) + crate::libp2p::Behaviour::new(alice_peer, alice_interest, alice, config.clone()) + }); + let swarm2 = Swarm::new_ephemeral(|_| { + crate::libp2p::Behaviour::new(bob_peer, bob_interest, bob, config) }); - let swarm2 = - Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_interests, bob, config)); (swarm1, swarm2) }}; @@ -157,6 +170,8 @@ async fn in_sync_no_overlap() { BTreeStoreErrors::default(), BTreeStoreErrors::default(), BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), ); let fut = async move { @@ -166,13 +181,11 @@ async fn in_sync_no_overlap() { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let ([p1_e1, p1_e2, p1_e3, p1_e4], [p2_e1, p2_e2, p2_e3, p2_e4]): ( - [crate::libp2p::Event; 4], - [crate::libp2p::Event; 4], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - assert_in_sync(p2, [p1_e1, p1_e2, p1_e3, p1_e4]); - assert_in_sync(p1, [p2_e1, p2_e2, p2_e3, p2_e4]); + assert_in_sync(p2, p1_events); + assert_in_sync(p1, p2_events); }; fut.await; @@ -189,24 +202,23 @@ async fn initiator_model_error() { BTreeStoreErrors::default(), BTreeStoreErrors::default(), BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), ); let fut = async move { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let ([p1_e1, p1_e2, p1_e3, failed_peer], [p2_e1, p2_e2, p2_e3]): ( - [crate::libp2p::Event; 4], - [crate::libp2p::Event; 3], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 5]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - for ev in &[p1_e1, p1_e2, p1_e3, p2_e1, p2_e2, p2_e3] { + for ev in p1_events.iter().chain(p2_events.iter()) { info!("{:?}", ev); } - info!("{:?}", failed_peer); assert_eq!( - failed_peer, + p1_events[5], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm2.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -226,28 +238,26 @@ async fn responder_model_error() { "transient error should be handled" ))); let (mut swarm1, mut swarm2) = setup_test!( + BTreeStoreErrors::default(), BTreeStoreErrors::default(), BTreeStoreErrors::default(), bob_model_store, BTreeStoreErrors::default(), + BTreeStoreErrors::default(), ); let fut = async move { swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; - let ([p1_e1, p1_e2, p1_e3, p1_e4], [p2_e1, p2_e2, p2_e3, p2_e4]): ( - [crate::libp2p::Event; 4], - [crate::libp2p::Event; 4], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + let (p1_events, p2_events): ([crate::libp2p::Event; 6], [crate::libp2p::Event; 6]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - for ev in [ - &p1_e1, &p1_e2, &p1_e3, &p1_e4, &p2_e1, &p2_e2, &p2_e3, &p2_e4, - ] { + for ev in p1_events.iter().chain(p2_events.iter()) { info!("{:?}", ev); } assert_eq!( - p2_e4, + p2_events[5], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -267,10 +277,12 @@ async fn model_error_backoff() { "transient error should be handled" ))); let (mut swarm1, mut swarm2) = setup_test!( + BTreeStoreErrors::default(), BTreeStoreErrors::default(), BTreeStoreErrors::default(), bob_model_store, BTreeStoreErrors::default(), + BTreeStoreErrors::default(), ); let fut = async move { @@ -278,22 +290,10 @@ async fn model_error_backoff() { swarm2.connect(&mut swarm1).await; // Expect interests to sync twice in a row since models fail to sync - let ( - [p1_e1, p1_e2, p1_e3, p1_e4, p1_e5, p1_e6, p1_e7, p1_e8, p1_e9, p1_e10, p1_e11, p1_e12], - [p2_e1, p2_e2, p2_e3, p2_e4, p2_e5, p2_e6, p2_e7, p2_e8, p2_e9, p2_e10, p2_e11, p2_e12], - ): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = + let (p1_events, p2_events): ([crate::libp2p::Event; 18], [crate::libp2p::Event; 18]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - let events = [ - [ - &p1_e1, &p1_e2, &p1_e3, &p1_e4, &p1_e5, &p1_e6, &p1_e7, &p1_e8, &p1_e9, &p1_e10, - &p1_e11, &p1_e12, - ], - [ - &p2_e1, &p2_e2, &p2_e3, &p2_e4, &p2_e5, &p2_e6, &p2_e7, &p2_e8, &p2_e9, &p2_e10, - &p2_e11, &p2_e12, - ], - ]; + let events = [p1_events, p2_events]; for ev in events.iter().flatten() { info!("{:?}", ev); @@ -316,31 +316,41 @@ async fn model_error_backoff() { }) .collect(); let expected_stream_set_order = vec![ + // First peers sync + Some(StreamSet::Peer), + Some(StreamSet::Peer), // First interests sync Some(StreamSet::Interest), Some(StreamSet::Interest), // First model sync Some(StreamSet::Model), Some(StreamSet::Model), + // Second peers sync + Some(StreamSet::Peer), + Some(StreamSet::Peer), // Second interests sync Some(StreamSet::Interest), Some(StreamSet::Interest), // Second model sync with initial short backoff Some(StreamSet::Model), Some(StreamSet::Model), + // Third peers sync + Some(StreamSet::Peer), + Some(StreamSet::Peer), // Third interests sync Some(StreamSet::Interest), Some(StreamSet::Interest), - // Third model sync is skipped because the backoff pushed it past the interests sync - Some(StreamSet::Interest), - Some(StreamSet::Interest), + // Third model sync is skipped because the backoff pushed it past the peer sync + Some(StreamSet::Peer), + Some(StreamSet::Peer), ]; assert_eq!( stream_sets, vec![expected_stream_set_order.clone(), expected_stream_set_order] ); + // Assert we saw the errors assert_eq!( - p2_e4, + events[1][5], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), status: PeerStatus::Failed { @@ -349,20 +359,11 @@ async fn model_error_backoff() { }) ); assert_eq!( - p1_e12, - crate::libp2p::Event::PeerEvent(PeerEvent { - remote_peer_id: swarm2.local_peer_id().to_owned(), - status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest - } - }) - ); - assert_eq!( - p2_e12, + events[1][11], crate::libp2p::Event::PeerEvent(PeerEvent { remote_peer_id: swarm1.local_peer_id().to_owned(), - status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest + status: PeerStatus::Failed { + stream_set: StreamSet::Model } }) ); @@ -377,13 +378,13 @@ fn into_peer_event(ev: crate::libp2p::Event) -> PeerEvent { } } -fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { +fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { assert_eq!( into_peer_event(events[0].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Started { - stream_set: StreamSet::Interest + stream_set: StreamSet::Peer } } ); @@ -392,7 +393,7 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest + stream_set: StreamSet::Peer } } ); @@ -401,12 +402,30 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { PeerEvent { remote_peer_id: id, status: PeerStatus::Started { - stream_set: StreamSet::Model + stream_set: StreamSet::Interest } } ); assert_eq!( into_peer_event(events[3].clone()), + PeerEvent { + remote_peer_id: id, + status: PeerStatus::Synchronized { + stream_set: StreamSet::Interest + } + } + ); + assert_eq!( + into_peer_event(events[4].clone()), + PeerEvent { + remote_peer_id: id, + status: PeerStatus::Started { + stream_set: StreamSet::Model + } + } + ); + assert_eq!( + into_peer_event(events[5].clone()), PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized { diff --git a/recon/src/recon.rs b/recon/src/recon.rs index c03bfc717..a09b23c3b 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -11,7 +11,7 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; -use ceramic_core::{EventId, Interest, NodeId, RangeOpen}; +use ceramic_core::{EventId, Interest, NodeId, PeerKey, RangeOpen}; use serde::{Deserialize, Serialize}; use tracing::{instrument, trace, Level}; @@ -255,7 +255,7 @@ where /// /// Reports any new keys and what the range indicates about how the local and remote node are /// synchronized. - #[instrument(skip(self), ret(level = Level::DEBUG))] + #[instrument(skip(self), ret(level = Level::TRACE))] async fn process_range(&self, range: RangeHash) -> Result> { let calculated_hash = self.store.hash_range(&range.first..&range.last).await?; if calculated_hash == range.hash { @@ -436,7 +436,7 @@ impl ReconItem where K: Key, { - /// Construct a new item with a key and optional value + /// Construct a new item with a key and value pub fn new(key: K, value: Vec) -> Self { Self { key, @@ -678,14 +678,6 @@ where } } -/// Store for Interests -#[async_trait::async_trait] -pub trait InterestStore: Store {} - -/// Store for EventId -#[async_trait::async_trait] -pub trait EventIdStore: Store {} - /// Represents a key that can be reconciled via Recon. pub trait Key: TryFrom> + Ord + Clone + Display + std::fmt::Debug + Send + Sync + 'static @@ -955,6 +947,24 @@ pub enum SyncState { }, } +impl Key for PeerKey { + fn min_value() -> Self { + PeerKey::builder().with_min_expiration().build_fencepost() + } + + fn max_value() -> Self { + PeerKey::builder().with_max_expiration().build_fencepost() + } + + fn as_bytes(&self) -> &[u8] { + self.as_slice() + } + + fn is_fencepost(&self) -> bool { + !self.has_jws() + } +} + impl Key for EventId { fn min_value() -> Self { EventId::builder().build_min_fencepost()