diff --git a/Cargo.lock b/Cargo.lock index 32f2802..1eb49fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,7 @@ dependencies = [ "serde", "serde_ipld_dagcbor", "test-strategy", + "thiserror", "tracing", "tracing-subscriber", "wnfs-common", @@ -1791,18 +1792,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 3a59ad5..89b7e99 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -36,6 +36,7 @@ proptest = { version = "1.1", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "1.0.183" serde_ipld_dagcbor = "0.4.0" +thiserror = "1.0.47" tracing = "0.1" tracing-subscriber = "0.3" wnfs-common = "0.1.23" diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 900aa32..aeac4d2 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,4 +1,6 @@ -use anyhow::{anyhow, bail, Result}; +#![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet. + +use anyhow::anyhow; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; use futures::TryStreamExt; @@ -6,10 +8,12 @@ use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; use std::io::Cursor; +use tracing::{debug, instrument, trace, warn}; use wnfs_common::BlockStore; use crate::{ dag_walk::DagWalk, + error::Error, incremental_verification::{BlockState, IncrementalDagVerification}, messages::{Bloom, PullRequest, PushResponse}, }; @@ -63,12 +67,13 @@ pub struct CarFile { /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. +#[instrument(skip(config, store))] pub async fn block_send( root: Cid, last_state: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let ReceiverState { ref missing_subgraph_roots, have_cids_bloom, @@ -86,6 +91,30 @@ pub async fn block_send( .try_collect() .await?; + if subgraph_roots.len() != missing_subgraph_roots.len() { + let unrelated_roots = missing_subgraph_roots + .iter() + .filter(|cid| !subgraph_roots.contains(cid)) + .map(|cid| cid.to_string()) + .collect::>() + .join(", "); + + warn!( + unrelated_roots = %unrelated_roots, + "got asked for DAG-unrelated blocks" + ); + } + + if let Some(bloom) = &have_cids_bloom { + debug!( + size_bits = bloom.as_bytes().len() * 8, + hash_count = bloom.hash_count(), + ones_count = bloom.count_ones(), + estimated_fpr = bloom.current_false_positive_rate(), + "received 'have cids' bloom", + ); + } + let bloom = have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))); // An empty bloom that contains nothing let mut writer = CarWriter::new( @@ -102,16 +131,35 @@ pub async fn block_send( Vec::new(), ); - writer.write_header().await?; + writer + .write_header() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; let mut block_bytes = 0; let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); while let Some((cid, block)) = dag_walk.next(store).await? { if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) { + debug!( + cid = %cid, + bloom_contains = bloom.contains(&cid.to_bytes()), + subgraph_roots_contains = subgraph_roots.contains(&cid), + "skipped writing block" + ); continue; } - writer.write(cid, &block).await?; + debug!( + cid = %cid, + num_bytes = block.len(), + frontier_size = dag_walk.frontier.len(), + "writing block to CAR", + ); + + writer + .write(cid, &block) + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; // TODO(matheus23): Count the actual bytes sent? // At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes. @@ -122,7 +170,11 @@ pub async fn block_send( } Ok(CarFile { - bytes: writer.finish().await?.into(), + bytes: writer + .finish() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))? + .into(), }) } @@ -134,33 +186,49 @@ pub async fn block_send( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. +#[instrument(skip(last_car, config, store), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let mut dag_verification = IncrementalDagVerification::new([root], store).await?; if let Some(car) = last_car { - let mut reader = CarReader::new(Cursor::new(car.bytes)).await?; + let mut reader = CarReader::new(Cursor::new(car.bytes)) + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; let mut block_bytes = 0; - while let Some((cid, vec)) = reader.next_block().await? { + while let Some((cid, vec)) = reader + .next_block() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))? + { let block = Bytes::from(vec); + debug!( + cid = %cid, + num_bytes = block.len(), + "reading block from CAR", + ); + block_bytes += block.len(); if block_bytes > config.receive_maximum { - bail!( - "Received more than {} bytes ({block_bytes}), aborting request.", - config.receive_maximum - ); + return Err(Error::TooManyBytes { + block_bytes, + receive_maximum: config.receive_maximum, + }); } match dag_verification.block_state(cid) { BlockState::Have => continue, BlockState::Unexpected => { - eprintln!("Warn: Received block {cid} out of order, may be due to bloom false positive."); + trace!( + cid = %cid, + "received block out of order (possibly due to bloom false positive)" + ); break; } BlockState::Want => { @@ -188,6 +256,14 @@ pub async fn block_receive( }); } + if missing_subgraph_roots.is_empty() { + // We're done. No need to compute a bloom. + return Ok(ReceiverState { + missing_subgraph_roots, + have_cids_bloom: None, + }); + } + let mut bloom = BloomFilter::new_from_fpr_po2(bloom_capacity, (config.bloom_fpr)(bloom_capacity)); @@ -196,6 +272,15 @@ pub async fn block_receive( .iter() .for_each(|cid| bloom.insert(&cid.to_bytes())); + debug!( + inserted_elements = bloom_capacity, + size_bits = bloom.as_bytes().len() * 8, + hash_count = bloom.hash_count(), + ones_count = bloom.count_ones(), + estimated_fpr = bloom.current_false_positive_rate(), + "built 'have cids' bloom", + ); + Ok(ReceiverState { missing_subgraph_roots, have_cids_bloom: Some(bloom), @@ -207,13 +292,18 @@ pub async fn block_receive( /// This will error out if /// - the codec is not supported /// - the block can't be parsed. -pub fn references>(cid: Cid, block: impl AsRef<[u8]>, mut refs: E) -> Result { +pub fn references>( + cid: Cid, + block: impl AsRef<[u8]>, + mut refs: E, +) -> Result { let codec: IpldCodec = cid .codec() .try_into() - .map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?; + .map_err(|_| Error::UnsupportedCodec { cid })?; - >::references(codec, &mut Cursor::new(block), &mut refs)?; + >::references(codec, &mut Cursor::new(block), &mut refs) + .map_err(Error::ParsingError)?; Ok(refs) } @@ -310,7 +400,7 @@ impl Default for Config { send_minimum: 128 * 1024, // 128KiB receive_maximum: 512 * 1024, // 512KiB max_roots_per_round: 1000, // max. ~41KB of CIDs - bloom_fpr: |num_of_elems| 0.1 / num_of_elems as f64, + bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64), } } } diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 3f27e7e..1ae84a7 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -1,5 +1,4 @@ -use crate::common::references; -use anyhow::Result; +use crate::{common::references, error::Error}; use bytes::Bytes; use futures::{stream::try_unfold, Stream}; use libipld_core::cid::Cid; @@ -54,7 +53,7 @@ impl DagWalk { /// Return the next node in the traversal. /// /// Returns `None` if no nodes are left to be visited. - pub async fn next(&mut self, store: &impl BlockStore) -> Result> { + pub async fn next(&mut self, store: &impl BlockStore) -> Result, Error> { let cid = loop { let popped = if self.breadth_first { self.frontier.pop_back() @@ -75,7 +74,10 @@ impl DagWalk { // TODO: Two opportunities for performance improvement: // - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type) // - run multiple `get_block` calls concurrently - let block = store.get_block(&cid).await?; + let block = store + .get_block(&cid) + .await + .map_err(Error::BlockStoreError)?; for ref_cid in references(cid, &block, Vec::new())? { if !self.visited.contains(&ref_cid) { self.frontier.push_front(ref_cid); @@ -89,7 +91,7 @@ impl DagWalk { pub fn stream( self, store: &impl BlockStore, - ) -> impl Stream> + Unpin + '_ { + ) -> impl Stream> + Unpin + '_ { Box::pin(try_unfold(self, move |mut this| async move { let maybe_block = this.next(store).await?; Ok(maybe_block.map(|b| (b, this))) @@ -110,7 +112,7 @@ impl DagWalk { } /// Skip a node from the traversal for now. - pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> { + pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> { let (cid, bytes) = block; let refs = references(cid, bytes, HashSet::new())?; self.visited.insert(cid); @@ -124,6 +126,7 @@ impl DagWalk { #[cfg(test)] mod tests { use super::*; + use anyhow::Result; use futures::TryStreamExt; use libipld::Ipld; use wnfs_common::MemoryBlockStore; diff --git a/car-mirror/src/error.rs b/car-mirror/src/error.rs new file mode 100644 index 0000000..1612f55 --- /dev/null +++ b/car-mirror/src/error.rs @@ -0,0 +1,92 @@ +use libipld::Cid; + +use crate::incremental_verification::BlockState; + +/// Errors raised from the CAR mirror library +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// An error raised during receival of blocks, when more than the configured maximum + /// bytes are received in a single batch. See the `Config` type. + #[error("Received more than {receive_maximum} bytes ({block_bytes}), aborting request.")] + TooManyBytes { + /// The configured amount of maximum bytes to receive + receive_maximum: usize, + /// The actual amount of bytes received so far + block_bytes: usize, + }, + + /// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g + /// This is raised if an unknown codec is read from a CID. See the `libipld` library for more information. + #[error("Unsupported codec in Cid: {cid}")] + UnsupportedCodec { + /// The CID with the unsupported codec + cid: Cid, + }, + + /// This library only supports a subset of default hash functions, including SHA-256, SHA-3, BLAKE3 and more. + /// This is raised if an unknown hash code is read from a CID. See the `libipld` library for more information. + #[error("Unsupported hash code in CID {cid}")] + UnsupportedHashCode { + /// The CID with the unsupported hash function + cid: Cid, + }, + + /// This error is raised when the hash function that the `BlockStore` uses a different hashing function + /// than the blocks which are received over the wire. + /// This error will be removed in the future, when the block store trait gets modified to support specifying + /// the hash function. + #[error("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {actual_cid}")] + BlockStoreIncompatible { + /// The expected CID + cid: Box, + /// The CID returned from the BlockStore implementation + actual_cid: Box, + }, + + // ------------- + // Anyhow Errors + // ------------- + /// An error raised when trying to parse a block (e.g. to look for further links) + #[error("Error during block parsing: {0}")] + ParsingError(anyhow::Error), + + /// An error rasied when trying to read or write a CAR file. + #[error("CAR (de)serialization error: {0}")] + CarFileError(anyhow::Error), + + /// An error rasied from the blockstore. + #[error("BlockStore error: {0}")] + BlockStoreError(anyhow::Error), + + // ---------- + // Sub-errors + // ---------- + /// Errors related to incremental verification + #[error(transparent)] + IncrementalVerificationError(#[from] IncrementalVerificationError), +} + +/// Errors related to incremental verification +#[derive(thiserror::Error, Debug)] +pub enum IncrementalVerificationError { + /// Raised when we receive a block with a CID that we don't expect. + /// We only expect blocks when they're related to the root CID of a DAG. + /// So a CID needs to have a path back to the root. + #[error("Expected to want block {cid}, but block state is: {block_state:?}")] + ExpectedWantedBlock { + /// The CID of the block we're currently processing + cid: Box, + /// The block state it has during incremental verification. + /// So either we already have it or it's unexpected. + block_state: BlockState, + }, + + /// Raised when the block stored in the CAR file doesn't match its hash. + #[error("Digest mismatch in CAR file: expected {cid}, got {actual_cid}")] + DigestMismatch { + /// The expected CID + cid: Box, + /// The CID it actually hashes to + actual_cid: Box, + }, +} diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index 0960699..8c46009 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -1,11 +1,16 @@ -use crate::dag_walk::DagWalk; -use anyhow::{anyhow, bail, Result}; +#![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet. + +use crate::{ + dag_walk::DagWalk, + error::{Error, IncrementalVerificationError}, +}; use bytes::Bytes; use libipld_core::{ cid::Cid, multihash::{Code, MultihashDigest}, }; use std::{collections::HashSet, matches}; +use tracing::instrument; use wnfs_common::{BlockStore, BlockStoreError}; /// A data structure that keeps state about incremental DAG verification. @@ -36,7 +41,7 @@ impl IncrementalDagVerification { pub async fn new( roots: impl IntoIterator, store: &impl BlockStore, - ) -> Result { + ) -> Result { let mut this = Self { want_cids: roots.into_iter().collect(), have_cids: HashSet::new(), @@ -47,20 +52,22 @@ impl IncrementalDagVerification { Ok(this) } - async fn update_have_cids(&mut self, store: &impl BlockStore) -> Result<()> { + #[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))] + async fn update_have_cids(&mut self, store: &impl BlockStore) -> Result<(), Error> { let mut dag_walk = DagWalk::breadth_first(self.want_cids.iter().cloned()); loop { match dag_walk.next(store).await { - Err(e) => { + Err(Error::BlockStoreError(e)) => { if let Some(BlockStoreError::CIDNotFound(not_found)) = e.downcast_ref::() { self.want_cids.insert(*not_found); } else { - bail!(e); + return Err(Error::BlockStoreError(e)); } } + Err(e) => return Err(e), Ok(Some((cid, _))) => { self.want_cids.remove(&cid); self.have_cids.insert(cid); @@ -104,33 +111,47 @@ impl IncrementalDagVerification { &mut self, block: (Cid, Bytes), store: &impl BlockStore, - ) -> Result<()> { + ) -> Result<(), Error> { let (cid, bytes) = block; let block_state = self.block_state(cid); if !matches!(block_state, BlockState::Want) { - bail!("Incremental verification failed. Block state is: {block_state:?}, expected BlockState::Want"); + return Err(IncrementalVerificationError::ExpectedWantedBlock { + cid: Box::new(cid), + block_state, + } + .into()); } let hash_func: Code = cid .hash() .code() .try_into() - .map_err(|_| anyhow!("Unsupported hash code in CID {cid}"))?; + .map_err(|_| Error::UnsupportedHashCode { cid })?; let hash = hash_func.digest(bytes.as_ref()); if &hash != cid.hash() { - let result_cid = Cid::new_v1(cid.codec(), hash); - bail!("Digest mismatch in CAR file: expected {cid}, got {result_cid}"); + let actual_cid = Cid::new_v1(cid.codec(), hash); + return Err(IncrementalVerificationError::DigestMismatch { + cid: Box::new(cid), + actual_cid: Box::new(actual_cid), + } + .into()); } - let result_cid = store.put_block(bytes, cid.codec()).await?; + let actual_cid = store + .put_block(bytes, cid.codec()) + .await + .map_err(Error::BlockStoreError)?; // TODO(matheus23): The BlockStore chooses the hashing function, // so it may choose a different hashing function, causing a mismatch - if result_cid != cid { - bail!("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {result_cid}"); + if actual_cid != cid { + return Err(Error::BlockStoreIncompatible { + cid: Box::new(cid), + actual_cid: Box::new(actual_cid), + }); } self.update_have_cids(store).await?; diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index b40d1a0..5b256e3 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -13,6 +13,8 @@ pub mod test_utils; pub mod common; /// Algorithms for walking IPLD directed acyclic graphs pub mod dag_walk; +/// Error types +pub mod error; /// Algorithms for doing incremental verification of IPLD DAGs on the receiving end. pub mod incremental_verification; /// Data types that are sent over-the-wire and relevant serialization code. diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 8bfdbb8..4b018fa 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -1,8 +1,8 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, + error::Error, messages::PullRequest, }; -use anyhow::Result; use libipld::Cid; use wnfs_common::BlockStore; @@ -22,7 +22,7 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { Ok(block_receive(root, last_response, config, store) .await? .into()) @@ -34,7 +34,7 @@ pub async fn response( request: PullRequest, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let receiver_state = Some(ReceiverState::from(request)); block_send(root, receiver_state, config, store).await } diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index 0fb229d..aa5618b 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -1,8 +1,8 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, + error::Error, messages::PushResponse, }; -use anyhow::Result; use libipld_core::cid::Cid; use wnfs_common::BlockStore; @@ -21,7 +21,7 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let receiver_state = last_response.map(ReceiverState::from); block_send(root, receiver_state, config, store).await } @@ -39,7 +39,7 @@ pub async fn response( request: CarFile, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { Ok(block_receive(root, Some(request), config, store) .await? .into())