From 206f6d88f9d5fe9a2774b27ca2d7e29c19824831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 21 Feb 2024 17:41:19 +0100 Subject: [PATCH] feat: Create `car-mirror-reqwest` crate & add high-level streaming functions (#42) * feat: Initial version of `car-mirror-reqwest` crate * feat: High-level streaming push & pull abstractions * feat: Add configurable max block size * feat: Implement streaming response shortcuts * chore: Write documentation * refactor: Rename `send_` -> `run_` * chore: Improve bloom FPR logging * chore: Write documentation for `push` module * chore: Write docs for `pull` module & clean up * chore: Allow `MPL-2.0` in `deny.toml` * chore: Write `axum.rs` example & module docs * refactor: Move example to `integration`, run as integration test * chore: Remove accidental testing code * chore: Run tests for all crates on coverage * chore: Write more tests, specifically for error cases --- .github/workflows/coverage.yml | 2 +- Cargo.lock | 884 ++++++++++++++++++++- Cargo.toml | 3 +- car-mirror-reqwest/.dockerignore | 7 + car-mirror-reqwest/Cargo.toml | 53 ++ car-mirror-reqwest/LICENSE-APACHE | 201 +++++ car-mirror-reqwest/LICENSE-MIT | 23 + car-mirror-reqwest/README.md | 53 ++ car-mirror-reqwest/integration/axum.rs | 146 ++++ car-mirror-reqwest/src/error.rs | 32 + car-mirror-reqwest/src/lib.rs | 46 ++ car-mirror-reqwest/src/request.rs | 222 ++++++ car-mirror/Cargo.toml | 5 +- car-mirror/src/cache.rs | 2 +- car-mirror/src/common.rs | 161 +++- car-mirror/src/error.rs | 16 +- car-mirror/src/incremental_verification.rs | 7 +- car-mirror/src/lib.rs | 419 +++++++++- car-mirror/src/pull.rs | 110 ++- car-mirror/src/push.rs | 110 ++- car-mirror/src/test_utils/local_utils.rs | 10 + deny.toml | 1 + 22 files changed, 2415 insertions(+), 98 deletions(-) create mode 100644 car-mirror-reqwest/.dockerignore create mode 100644 car-mirror-reqwest/Cargo.toml create mode 100644 car-mirror-reqwest/LICENSE-APACHE create mode 100644 car-mirror-reqwest/LICENSE-MIT create mode 100644 car-mirror-reqwest/README.md create mode 100644 car-mirror-reqwest/integration/axum.rs create mode 100644 car-mirror-reqwest/src/error.rs create mode 100644 car-mirror-reqwest/src/lib.rs create mode 100644 car-mirror-reqwest/src/request.rs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 9d78cff..b7d4672 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -37,7 +37,7 @@ jobs: RUSTFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off' RUSTDOCFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off' ## currently just runs coverage on rust project - run: cargo test -p car-mirror --all-features + run: cargo test --all-features - name: Install grcov run: "curl -L https://github.com/mozilla/grcov/releases/download/v0.8.12/grcov-x86_64-unknown-linux-gnu.tar.bz2 | tar jxf -" diff --git a/Cargo.lock b/Cargo.lock index c7df932..43a717c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-attributes" version = "1.1.2" @@ -157,7 +163,7 @@ dependencies = [ "polling 2.8.0", "rustix 0.37.27", "slab", - "socket2", + "socket2 0.4.10", "waker-fn", ] @@ -261,6 +267,17 @@ version = "4.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" +[[package]] +name = "async-trait" +version = "0.1.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -284,6 +301,73 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00c055ee2d014ae5981ce1016374e8213682aa14d9bf40e48ab48b5f3ef20eaa" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -305,6 +389,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "bit-set" version = "0.5.3" @@ -437,6 +527,7 @@ name = "car-mirror" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "async-std", "async-stream", "bytes", @@ -456,9 +547,11 @@ dependencies = [ "testresult", "thiserror", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "wnfs-common", + "wnfs-unixfs-file", ] [[package]] @@ -475,6 +568,30 @@ dependencies = [ "wnfs-common", ] +[[package]] +name = "car-mirror-reqwest" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "axum-macros", + "bytes", + "car-mirror", + "futures", + "libipld", + "reqwest", + "reqwest-middleware", + "test-log", + "test-strategy", + "testresult", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "wnfs-common", +] + [[package]] name = "car-mirror-wasm" version = "0.1.0" @@ -579,7 +696,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "bitflags 1.3.2", "clap_lex", - "indexmap", + "indexmap 1.9.3", "textwrap", ] @@ -617,6 +734,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -773,6 +900,15 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -865,6 +1001,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "funty" version = "2.0.0" @@ -1027,6 +1172,44 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.11", + "indexmap 2.2.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.2.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.3.1" @@ -1049,6 +1232,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1064,6 +1253,147 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" +[[package]] +name = "http" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "0.14.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.11", + "hyper 0.14.28", + "rustls", + "tokio", + "tokio-rustls", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.5", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1087,6 +1417,16 @@ dependencies = [ "cc", ] +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -1097,6 +1437,16 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "indexmap" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" +dependencies = [ + "equivalent", + "hashbrown 0.14.3", +] + [[package]] name = "instant" version = "0.1.12" @@ -1117,6 +1467,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "iroh-car" version = "0.4.0" @@ -1322,6 +1678,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.1" @@ -1351,6 +1713,22 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1360,6 +1738,17 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "multibase" version = "0.9.1" @@ -1425,22 +1814,53 @@ dependencies = [ ] [[package]] -name = "object" -version = "0.32.2" +name = "num_cpus" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "memchr", + "hermit-abi 0.3.6", + "libc", ] [[package]] -name = "once_cell" -version = "1.19.0" +name = "num_enum" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] [[package]] -name = "oorandom" +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "oorandom" version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" @@ -1486,6 +1906,32 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "pin-project" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1608,6 +2054,29 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -1742,6 +2211,80 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "reqwest" +version = "0.11.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "mime_guess", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots", + "winreg", +] + +[[package]] +name = "reqwest-middleware" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a3e86aa6053e59030e7ce2d2a3b258dd08fc2d337d52f73f6cb480f5858690" +dependencies = [ + "anyhow", + "async-trait", + "http 0.2.11", + "reqwest", + "serde", + "task-local-extensions", + "thiserror", +] + +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "roaring" version = "0.10.3" @@ -1797,6 +2340,43 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -1836,6 +2416,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "serde" version = "1.0.196" @@ -1897,6 +2487,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1927,6 +2539,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1952,6 +2573,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "structmeta" version = "0.2.0" @@ -1997,6 +2634,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "synstructure" version = "0.12.6" @@ -2009,12 +2652,42 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tap" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "task-local-extensions" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8" +dependencies = [ + "pin-utils", +] + [[package]] name = "tempfile" version = "3.10.0" @@ -2112,6 +2785,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.36.0" @@ -2120,7 +2808,50 @@ checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2 0.5.5", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", "pin-project-lite", + "tokio", + "tracing", ] [[package]] @@ -2132,12 +2863,41 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2207,6 +2967,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.17.0" @@ -2219,12 +2985,36 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-width" version = "0.1.11" @@ -2243,6 +3033,23 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] +name = "url" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "valuable" version = "0.1.0" @@ -2286,6 +3093,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2385,6 +3201,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.68" @@ -2395,6 +3224,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "winapi" version = "0.3.9" @@ -2567,6 +3402,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wnfs-common" version = "0.2.0" @@ -2590,6 +3435,25 @@ dependencies = [ "thiserror", ] +[[package]] +name = "wnfs-unixfs-file" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf5f1e8e6537a38c0e662a5cb4524bf6bec7d2c0d5e37f8a3849e7eedb212312" +dependencies = [ + "anyhow", + "async-stream", + "bytes", + "futures", + "libipld", + "num_enum", + "prost", + "rand_core", + "testresult", + "tokio", + "wnfs-common", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 8a0a57d..e5b7f1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,9 @@ members = [ "car-mirror", "car-mirror-benches", + "car-mirror-reqwest", "car-mirror-wasm", - "examples" + "examples", ] [workspace.dependencies] diff --git a/car-mirror-reqwest/.dockerignore b/car-mirror-reqwest/.dockerignore new file mode 100644 index 0000000..b94f2b7 --- /dev/null +++ b/car-mirror-reqwest/.dockerignore @@ -0,0 +1,7 @@ +* + +!Cargo.lock +!Cargo.toml +!src + +src/bin diff --git a/car-mirror-reqwest/Cargo.toml b/car-mirror-reqwest/Cargo.toml new file mode 100644 index 0000000..45084b4 --- /dev/null +++ b/car-mirror-reqwest/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "car-mirror-reqwest" +version = "0.1.0" +description = "Adapter for using car-mirror with reqwest-middleware" +keywords = [] +categories = [] +include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"] +license = "Apache-2.0" +readme = "README.md" +edition = "2021" +rust-version = "1.66" +documentation = "https://docs.rs/car-mirror-reqwest" +repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror-reqwest" +authors = ["Philipp Krüger "] + +[lib] +path = "src/lib.rs" +doctest = true + +[dependencies] +anyhow = "1.0" +bytes = "1.4" +car-mirror = { version = "0.1", path = "../car-mirror" } +futures = "0.3" +libipld = "0.16" +reqwest = { version = "0.11", default-features = false, features = ["json", "stream"] } +reqwest-middleware = "0.2" +thiserror = "1.0" +tokio-util = { version = "0.7", features = ["io"] } +tracing = "0.1" +wnfs-common = { workspace = true } + +[dev-dependencies] +axum = "0.7" +axum-macros = "0.4" +car-mirror = { version = "0.1", path = "../car-mirror", features = ["quick_cache"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } +test-log = { version = "0.2", default-features = false, features = ["trace"] } +test-strategy = "0.3" +testresult = "0.3" +tokio = { version = "1.0", features = ["full"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } + +[features] + +[package.metadata.docs.rs] +all-features = true +# defines the configuration attribute `docsrs` +rustdoc-args = ["--cfg", "docsrs"] + +[[test]] +name = "integration" +path = "integration/axum.rs" diff --git a/car-mirror-reqwest/LICENSE-APACHE b/car-mirror-reqwest/LICENSE-APACHE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/car-mirror-reqwest/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/car-mirror-reqwest/LICENSE-MIT b/car-mirror-reqwest/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/car-mirror-reqwest/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/car-mirror-reqwest/README.md b/car-mirror-reqwest/README.md new file mode 100644 index 0000000..57facc2 --- /dev/null +++ b/car-mirror-reqwest/README.md @@ -0,0 +1,53 @@ +
+

car-mirror-reqwest

+ +

+ + Crate + + + Code Coverage + + + Build Status + + + License-Apache + + + License-MIT + + + Docs + + + Discord + +

+
+ +
:warning: Work in progress :warning:
+ +## car-mirror-reqwest + +Description. + +## License + +This project is licensed under either of + +- Apache License, Version 2.0, ([LICENSE-APACHE](./LICENSE-APACHE) or [http://www.apache.org/licenses/LICENSE-2.0][apache]) +- MIT license ([LICENSE-MIT](./LICENSE-MIT) or [http://opensource.org/licenses/MIT][mit]) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally +submitted for inclusion in the work by you, as defined in the Apache-2.0 +license, shall be dual licensed as above, without any additional terms or +conditions. + + +[apache]: https://www.apache.org/licenses/LICENSE-2.0 +[mit]: http://opensource.org/licenses/MIT diff --git a/car-mirror-reqwest/integration/axum.rs b/car-mirror-reqwest/integration/axum.rs new file mode 100644 index 0000000..148fa1f --- /dev/null +++ b/car-mirror-reqwest/integration/axum.rs @@ -0,0 +1,146 @@ +use anyhow::Result; +use axum::{ + body::Body, + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use car_mirror::{ + cache::{InMemoryCache, NoCache}, + common::Config, + messages::{PullRequest, PushResponse}, +}; +use car_mirror_reqwest::RequestBuilderExt; +use futures::TryStreamExt; +use libipld::Cid; +use reqwest::Client; +use std::{future::IntoFuture, str::FromStr}; +use tokio_util::io::StreamReader; +use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; + +#[test_log::test(tokio::test)] +async fn main() -> Result<()> { + // Say, you have a webserver running like so: + let app = Router::new() + .route("/dag/pull/:cid", get(car_mirror_pull)) + .route("/dag/push/:cid", post(car_mirror_push)) + .with_state(ServerState::new()); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3344").await?; + tokio::spawn(axum::serve(listener, app).into_future()); + + // You can issue requests from your client like so: + let store = MemoryBlockStore::new(); + let data = b"Hello, world!".to_vec(); + let root = store.put_block(data, CODEC_RAW).await?; + + let config = &Config::default(); + + let client = Client::new(); + client + .post(format!("http://localhost:3344/dag/push/{root}")) + .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol + .await?; + + let store = MemoryBlockStore::new(); // clear out data + client + .get(format!("http://localhost:3344/dag/pull/{root}")) + .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol + .await?; + + assert!(store.has_block(&root).await?); + + Ok(()) +} + +// Server details: + +#[derive(Debug, Clone)] +struct ServerState { + store: MemoryBlockStore, + cache: InMemoryCache, +} + +impl ServerState { + fn new() -> Self { + Self { + store: MemoryBlockStore::new(), + cache: InMemoryCache::new(100_000), + } + } +} + +#[axum_macros::debug_handler] +async fn car_mirror_push( + State(state): State, + Path(cid_string): Path, + body: Body, +) -> Result<(StatusCode, Json), AppError> +where { + let cid = Cid::from_str(&cid_string)?; + + let body_stream = body.into_data_stream(); + + let reader = StreamReader::new( + body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + let response = car_mirror::push::response_streaming( + cid, + reader, + &Config::default(), + &state.store, + &state.cache, + ) + .await?; + + if response.indicates_finished() { + Ok((StatusCode::OK, Json(response))) + } else { + Ok((StatusCode::ACCEPTED, Json(response))) + } +} + +#[axum_macros::debug_handler] +async fn car_mirror_pull( + State(state): State, + Path(cid_string): Path, + Json(request): Json, +) -> Result<(StatusCode, Body), AppError> { + let cid = Cid::from_str(&cid_string)?; + + let car_chunks = car_mirror::pull::response_streaming( + cid, + request, + state.store.clone(), + state.cache.clone(), + ) + .await?; + + Ok((StatusCode::OK, Body::from_stream(car_chunks))) +} + +// Basic anyhow error handling: + +struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} + +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} diff --git a/car-mirror-reqwest/src/error.rs b/car-mirror-reqwest/src/error.rs new file mode 100644 index 0000000..a74ff80 --- /dev/null +++ b/car-mirror-reqwest/src/error.rs @@ -0,0 +1,32 @@ +use reqwest::Response; + +/// Possible errors raised in this library +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Raised when the HTTP response code didn't end up as a 200 or 202 + #[error("Unexpected response code: {}, expected 200 or 202", response.status())] + UnexpectedStatusCode { + /// The response + response: Response, + }, + + /// Raised when `RequestBuilder::try_clone` fails, usually because + /// `RequestBuilder::body(Body::wrap_stream(...))` was called. + /// + /// Generally, car-mirror-reqwest will take over body creation, so there's + /// no point in setting the body before `run_car_mirror_pull` / `run_car_mirror_push`. + #[error("Body must not be set on request builder")] + RequestBuilderBodyAlreadySet, + + /// reqwest errors + #[error(transparent)] + ReqwestError(#[from] reqwest::Error), + + /// reqwest-middleware errors + #[error(transparent)] + ReqwestMiddlewareError(#[from] reqwest_middleware::Error), + + /// car-mirror errors + #[error(transparent)] + CarMirrorError(#[from] car_mirror::Error), +} diff --git a/car-mirror-reqwest/src/lib.rs b/car-mirror-reqwest/src/lib.rs new file mode 100644 index 0000000..d18aa59 --- /dev/null +++ b/car-mirror-reqwest/src/lib.rs @@ -0,0 +1,46 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] +#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] +#![deny(unreachable_pub)] + +//! # car-mirror-reqwest +//! +//! A helper library that helps making car-mirror client requests using reqwest. +//! +//! ```no_run +//! # use anyhow::Result; +//! use car_mirror::{cache::NoCache, common::Config}; +//! use car_mirror_reqwest::RequestBuilderExt; +//! use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; +//! +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! let store = MemoryBlockStore::new(); +//! let data = b"Hello, world!".to_vec(); +//! let root = store.put_block(data, CODEC_RAW).await?; +//! +//! let config = &Config::default(); +//! +//! let client = reqwest::Client::new(); +//! client +//! .post(format!("http://localhost:3344/dag/push/{root}")) +//! .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol +//! .await?; +//! +//! let store = MemoryBlockStore::new(); // clear out data +//! client +//! .get(format!("http://localhost:3344/dag/pull/{root}")) +//! .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol +//! .await?; +//! +//! assert!(store.has_block(&root).await?); +//! # Ok(()) +//! # } +//! ``` +//! +//! For the full example, please see `integration/axum.rs` in the source repository. + +mod error; +mod request; + +pub use error::*; +pub use request::*; diff --git a/car-mirror-reqwest/src/request.rs b/car-mirror-reqwest/src/request.rs new file mode 100644 index 0000000..641183a --- /dev/null +++ b/car-mirror-reqwest/src/request.rs @@ -0,0 +1,222 @@ +use crate::Error; +use anyhow::Result; +use car_mirror::{cache::Cache, common::Config, messages::PullRequest}; +use futures::{Future, TryStreamExt}; +use libipld::Cid; +use reqwest::{Body, Response, StatusCode}; +use tokio_util::io::StreamReader; +use wnfs_common::BlockStore; + +/// Extension methods on `RequestBuilder`s for sending car mirror protocol requests. +pub trait RequestBuilderExt { + /// Initiate a car mirror push request to send some data to the + /// server via HTTP. + /// + /// Repeats this request until the car mirror protocol is finished. + /// + /// The `root` is the CID of the DAG root that should be made fully + /// present on the server side at the end of the protocol. + /// + /// The full DAG under `root` needs to be available in the local + /// blockstore `store`. + /// + /// `cache` is used to reduce costly `store` lookups and computations + /// regarding which blocks still need to be transferred. + /// + /// This will call `try_clone()` and `send()` on this + /// request builder, so it must not have a `body` set yet. + /// There is no need to set a body, this function will do so automatically. + /// + /// `store` and `cache` need to be references to `Clone`-able types + /// which don't borrow data, because of the way request streaming + /// lifetimes work with `reqwest`. + /// Usually blockstores and caches satisfy these conditions due to + /// using atomic reference counters. + fn run_car_mirror_push( + &self, + root: Cid, + store: &(impl BlockStore + Clone + 'static), + cache: &(impl Cache + Clone + 'static), + ) -> impl Future> + Send; + + /// Initiate a car mirror pull request to load some data from + /// a server via HTTP. + /// + /// Repeats the request in rounds, until the car mirror protocol is finished. + /// + /// The `root` is the CID of the DAG root that should be made fully + /// present on the client side at the end of the protocol. + /// + /// At the end of the protocol, `store` will contain all blocks under `root`. + /// + /// `cache` is used to reduce costly `store` lookups and computations + /// regarding which blocks still need to be transferred. + /// + /// This will call `try_clone()` and `send()` on this + /// request builder, so it must not have a `body` set yet. + /// There is no need to set a body, this function will do so automatically. + fn run_car_mirror_pull( + &self, + root: Cid, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, + ) -> impl Future> + Send; +} + +impl RequestBuilderExt for reqwest_middleware::RequestBuilder { + async fn run_car_mirror_push( + &self, + root: Cid, + store: &(impl BlockStore + Clone + 'static), + cache: &(impl Cache + Clone + 'static), + ) -> Result<(), Error> { + push_with(root, store, cache, |body| async { + Ok::<_, Error>( + self.try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .body(body) + .send() + .await?, + ) + }) + .await + } + + async fn run_car_mirror_pull( + &self, + root: Cid, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, + ) -> Result<(), Error> { + pull_with(root, config, store, cache, |pull_request| async move { + Ok::<_, Error>( + self.try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .json(&pull_request) + .send() + .await?, + ) + }) + .await + } +} + +impl RequestBuilderExt for reqwest::RequestBuilder { + async fn run_car_mirror_push( + &self, + root: Cid, + store: &(impl BlockStore + Clone + 'static), + cache: &(impl Cache + Clone + 'static), + ) -> Result<(), Error> { + push_with(root, store, cache, |body| async { + Ok::<_, Error>( + self.try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .body(body) + .send() + .await?, + ) + }) + .await + } + + async fn run_car_mirror_pull( + &self, + root: Cid, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, + ) -> Result<(), Error> { + pull_with(root, config, store, cache, |pull_request| async move { + Ok::<_, Error>( + self.try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .json(&pull_request) + .send() + .await?, + ) + }) + .await + } +} + +/// Run (possibly multiple rounds of) the car mirror push protocol. +/// +/// See `run_car_mirror_push` for a more ergonomic interface. +/// +/// Unlike `run_car_mirror_push`, this allows customizing the +/// request every time it gets built, e.g. to refresh authentication tokens. +pub async fn push_with( + root: Cid, + store: &(impl BlockStore + Clone + 'static), + cache: &(impl Cache + Clone + 'static), + mut make_request: F, +) -> Result<(), E> +where + F: FnMut(reqwest::Body) -> Fut, + Fut: Future>, + E: From, + E: From, + E: From, +{ + let mut push_state = None; + + loop { + let car_stream = + car_mirror::push::request_streaming(root, push_state, store.clone(), cache.clone()) + .await?; + let reqwest_stream = Body::wrap_stream(car_stream); + + let response = make_request(reqwest_stream).await?.error_for_status()?; + + match response.status() { + StatusCode::OK => { + return Ok(()); + } + StatusCode::ACCEPTED => { + // We need to continue. + } + _ => { + // Some unexpected response code + return Err(Error::UnexpectedStatusCode { response }.into()); + } + } + + push_state = Some(response.json().await?); + } +} + +/// Run (possibly multiple rounds of) the car mirror pull protocol. +/// +/// See `run_car_mirror_pull` for a more ergonomic interface. +/// +/// Unlike `run_car_mirror_pull`, this allows customizing the +/// request every time it gets built, e.g. to refresh authentication tokens. +pub async fn pull_with( + root: Cid, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, + mut make_request: F, +) -> Result<(), E> +where + F: FnMut(PullRequest) -> Fut, + Fut: Future>, + E: From, + E: From, +{ + let mut pull_request = car_mirror::pull::request(root, None, config, store, cache).await?; + + while !pull_request.indicates_finished() { + let answer = make_request(pull_request).await?.error_for_status()?; + + let stream = StreamReader::new(answer.bytes_stream().map_err(std::io::Error::other)); + + pull_request = + car_mirror::pull::handle_response_streaming(root, stream, config, store, cache).await?; + } + + Ok(()) +} diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 6623dfa..06f99c3 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -37,14 +37,17 @@ tracing = "0.1" wnfs-common = { workspace = true } [dev-dependencies] +assert_matches = "1.5.0" async-std = { version = "1.11", features = ["attributes"] } -car-mirror = { path = ".", features = ["test_utils"] } +car-mirror = { path = ".", features = ["quick_cache", "test_utils"] } proptest = "1.1" roaring-graphs = "0.12" test-log = { version = "0.2", default-features = false, features = ["trace"] } test-strategy = "0.3" testresult = "0.3" +tokio-util = { version = "0.7.8", features = ["io"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } +wnfs-unixfs-file = { version = "0.2.0" } [features] default = [] diff --git a/car-mirror/src/cache.rs b/car-mirror/src/cache.rs index ef3b7ac..df2293b 100644 --- a/car-mirror/src/cache.rs +++ b/car-mirror/src/cache.rs @@ -91,7 +91,7 @@ impl Cache for Box { } /// An implementation of `Cache` that doesn't cache at all. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NoCache; impl Cache for NoCache { diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 7699d9e..c757177 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -5,7 +5,6 @@ use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; use std::io::Cursor; -use tracing::{debug, instrument, trace, warn}; use wnfs_common::{ utils::{boxed_stream, BoxStream, CondSend}, BlockStore, @@ -26,15 +25,55 @@ use crate::{ /// Configuration values (such as byte limits) for the CAR mirror protocol #[derive(Clone, Debug)] pub struct Config { - /// The maximum number of bytes per request that a recipient should. + /// The maximum number of bytes per request that a recipient should accept. + /// + /// This only has an effect in non-streaming versions of this protocol. + /// In streaming versions, car-mirror will check the validity of each block + /// while streaming. + /// + /// By default this is 2MB. pub receive_maximum: usize, + /// The maximum number of bytes per block. + /// + /// As long as we can't verify the hash value of a block, we can't verify if we've + /// been given the data we actuall want or not, thus we need to put a maximum value + /// on the byte size that we accept per block. + /// + /// By default this is 1MB. + /// + /// 1MB is also the default maximum block size in IPFS's bitswap protocol. + /// 256KiB is the default maximum block size that Kubo produces by default when generating + /// UnixFS blocks. + /// + /// `iroh-car` internally has a maximum 4MB limit on a CAR file frame (CID + block), so + /// any value above 4MB doesn't work. + pub max_block_size: usize, /// The maximum number of roots per request that will be requested by the recipient /// to be sent by the sender. + /// + /// By default this is 1_000. pub max_roots_per_round: usize, /// The target false positive rate for the bloom filter that the recipient sends. + /// + /// By default it's set to `|num| min(0.001, 0.1 / num)`. + /// + /// This default means bloom filters will aim to have a false positive probability + /// one order of magnitude under the number of elements. E.g. for 100_000 elements, + /// a false positive probability of 1 in 1 million. pub bloom_fpr: fn(u64) -> f64, } +impl Default for Config { + fn default() -> Self { + Self { + receive_maximum: 2_000_000, // 2 MB + max_block_size: 1_000_000, // 1 MB + max_roots_per_round: 1000, // max. ~41KB of CIDs + bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64), + } + } +} + /// Some information that the block receiving end provides the block sending end /// in order to deduplicate block transfers. #[derive(Clone)] @@ -56,6 +95,10 @@ pub struct CarFile { /// A stream of blocks. This requires the underlying futures to be `Send`, except when the target is `wasm32`. pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>; +/// A stream of byte chunks of a CAR file. +/// The underlying futures are `Send`, except when the target is `wasm32`. +pub type CarStream<'a> = BoxStream<'a, Result>; + //-------------------------------------------------------------------------------------------------- // Functions //-------------------------------------------------------------------------------------------------- @@ -67,7 +110,7 @@ pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>; /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. -#[instrument(skip_all, fields(root, last_state))] +#[tracing::instrument(skip_all, fields(root, last_state))] pub async fn block_send( root: Cid, last_state: Option, @@ -93,17 +136,17 @@ pub async fn block_send( /// This is the streaming equivalent of `block_send`. /// /// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`. -#[instrument(skip_all, fields(root, last_state))] +#[tracing::instrument(skip_all, fields(root, last_state))] pub async fn block_send_car_stream( root: Cid, last_state: Option, - stream: W, + writer: W, send_limit: Option, store: impl BlockStore, cache: impl Cache, ) -> Result { let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; - write_blocks_into_car(stream, &mut block_stream, send_limit).await + write_blocks_into_car(writer, &mut block_stream, send_limit).await } /// This is the car mirror block sending function, but unlike `block_send_car_stream` @@ -141,7 +184,7 @@ pub async fn block_send_block_stream<'a>( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. -#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] +#[tracing::instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, @@ -173,7 +216,7 @@ pub async fn block_receive( } /// Like `block_receive`, but allows consuming the CAR file as a stream. -#[instrument(skip_all, fields(root))] +#[tracing::instrument(skip_all, fields(root))] pub async fn block_receive_car_stream( root: Cid, reader: R, @@ -202,9 +245,21 @@ pub async fn block_receive_block_stream( store: impl BlockStore, cache: impl Cache, ) -> Result { + let max_block_size = config.max_block_size; let mut dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?; while let Some((cid, block)) = stream.try_next().await? { + let block_bytes = block.len(); + // TODO(matheus23): Find a way to restrict size *before* framing. Possibly inside `CarReader`? + // Possibly needs making `MAX_ALLOC` in `iroh-car` configurable. + if block_bytes > config.max_block_size { + return Err(Error::BlockSizeExceeded { + cid, + block_bytes, + max_block_size, + }); + } + match read_and_verify_block(&mut dag_verification, (cid, block), &store, &cache).await? { BlockState::Have => { // This can happen because we've just discovered a subgraph we already have. @@ -239,9 +294,7 @@ pub async fn block_receive_block_stream( /// The frame boundaries are after the header section and between each block. /// /// The first frame will always be a CAR file header frame. -pub async fn stream_car_frames( - mut blocks: BlockStream<'_>, -) -> Result>, Error> { +pub async fn stream_car_frames(mut blocks: BlockStream<'_>) -> Result, Error> { // https://github.com/wnfs-wg/car-mirror-spec/issues/6 // CAR files *must* have at least one CID in them, and all of them // need to appear as a block in the payload. @@ -250,7 +303,7 @@ pub async fn stream_car_frames( // so we're simply writing the first one in here, since we know // at least one block will be written (and it'll be that one). let Some((cid, block)) = blocks.try_next().await? else { - debug!("No blocks to write."); + tracing::debug!("No blocks to write."); return Ok(boxed_stream(futures::stream::empty())); }; @@ -333,7 +386,7 @@ async fn verify_missing_subgraph_roots( .collect::>() .join(", "); - warn!( + tracing::warn!( unrelated_roots = %unrelated_roots, "got asked for DAG-unrelated blocks" ); @@ -344,7 +397,7 @@ async fn verify_missing_subgraph_roots( fn handle_missing_bloom(have_cids_bloom: Option) -> BloomFilter { if let Some(bloom) = &have_cids_bloom { - debug!( + tracing::debug!( size_bits = bloom.as_bytes().len() * 8, hash_count = bloom.hash_count(), ones_count = bloom.count_ones(), @@ -394,7 +447,7 @@ async fn write_blocks_into_car( // so we're simply writing the first one in here, since we know // at least one block will be written (and it'll be that one). let Some((cid, block)) = blocks.try_next().await? else { - debug!("No blocks to write."); + tracing::debug!("No blocks to write."); return Ok(write); }; @@ -403,7 +456,7 @@ async fn write_blocks_into_car( block_bytes += writer.write(cid, block).await?; while let Some((cid, block)) = blocks.try_next().await? { - debug!( + tracing::debug!( cid = %cid, num_bytes = block.len(), "writing block to CAR", @@ -415,7 +468,7 @@ async fn write_blocks_into_car( if let Some(receive_limit) = size_limit { if block_bytes + added_bytes > receive_limit { - debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit"); + tracing::debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit"); break; } } @@ -441,7 +494,7 @@ async fn read_and_verify_block( match dag_verification.block_state(cid) { BlockState::Have => Ok(BlockState::Have), BlockState::Unexpected => { - trace!( + tracing::trace!( cid = %cid, "received block out of order (possibly due to bloom false positive)" ); @@ -543,16 +596,6 @@ impl ReceiverState { } } -impl Default for Config { - fn default() -> Self { - Self { - receive_maximum: 2_000_000, // 2 MB - max_roots_per_round: 1000, // max. ~41KB of CIDs - bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64), - } - } -} - impl std::fmt::Debug for ReceiverState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let have_cids_bloom = self @@ -576,11 +619,12 @@ impl std::fmt::Debug for ReceiverState { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::{cache::NoCache, test_utils::assert_cond_send_sync}; + use assert_matches::assert_matches; use testresult::TestResult; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{MemoryBlockStore, CODEC_RAW}; #[allow(clippy::unreachable, unused)] fn test_assert_send() { @@ -617,4 +661,61 @@ mod tests { Ok(()) } + + #[test_log::test(async_std::test)] + async fn test_stream_car_frame_empty() -> TestResult { + let car_frames = stream_car_frames(futures::stream::empty().boxed()).await?; + let frames: Vec = car_frames.try_collect().await?; + + assert!(frames.is_empty()); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_write_blocks_into_car_empty() -> TestResult { + let car_file = + write_blocks_into_car(Vec::new(), &mut futures::stream::empty().boxed(), None).await?; + + assert!(car_file.is_empty()); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_block_receive_block_stream_block_size_exceeded() -> TestResult { + let store = &MemoryBlockStore::new(); + + let block_small: Bytes = b"This one is small".to_vec().into(); + let block_big: Bytes = b"This one is very very very big".to_vec().into(); + let root_small = store.put_block(block_small.clone(), CODEC_RAW).await?; + let root_big = store.put_block(block_big.clone(), CODEC_RAW).await?; + + let config = &Config { + max_block_size: 20, + ..Config::default() + }; + + block_receive_block_stream( + root_small, + &mut futures::stream::iter(vec![Ok((root_small, block_small))]).boxed(), + config, + MemoryBlockStore::new(), + NoCache, + ) + .await?; + + let result = block_receive_block_stream( + root_small, + &mut futures::stream::iter(vec![Ok((root_big, block_big))]).boxed(), + config, + MemoryBlockStore::new(), + NoCache, + ) + .await; + + assert_matches!(result, Err(Error::BlockSizeExceeded { .. })); + + Ok(()) + } } diff --git a/car-mirror/src/error.rs b/car-mirror/src/error.rs index 2d76029..5460a35 100644 --- a/car-mirror/src/error.rs +++ b/car-mirror/src/error.rs @@ -1,8 +1,7 @@ +use crate::incremental_verification::BlockState; use libipld::Cid; use wnfs_common::BlockStoreError; -use crate::incremental_verification::BlockState; - /// Errors raised from the CAR mirror library #[derive(thiserror::Error, Debug)] pub enum Error { @@ -16,7 +15,18 @@ pub enum Error { bytes_read: usize, }, - /// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g + /// An error raised when an individual block exceeded the maximum configured block size + #[error("Maximum block size exceeded, maximum configured block size is {max_block_size} bytes, but got {block_bytes} at {cid}")] + BlockSizeExceeded { + /// The CID of the block that exceeded the maximum + cid: Cid, + /// The amount of bytes we got for this block up to this point + block_bytes: usize, + /// The maximum block size from our configuration + max_block_size: usize, + }, + + /// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more. /// This is raised if an unknown codec is read from a CID. See the `libipld` library for more information. #[error("Unsupported codec in Cid: {cid}")] UnsupportedCodec { diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index 95bbd9d..01e5b60 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -11,7 +11,6 @@ use libipld_core::{ multihash::{Code, MultihashDigest}, }; use std::{collections::HashSet, matches}; -use tracing::instrument; use wnfs_common::BlockStore; /// A data structure that keeps state about incremental DAG verification. @@ -57,7 +56,7 @@ impl IncrementalDagVerification { /// Updates the state of incremental dag verification. /// This goes through all "want" blocks and what they link to, /// removing items that we now have and don't want anymore. - #[instrument(level = "trace", skip_all)] + #[tracing::instrument(level = "trace", skip_all)] pub async fn update_have_cids( &mut self, store: &impl BlockStore, @@ -191,7 +190,8 @@ impl IncrementalDagVerification { }; } - let mut bloom = BloomFilter::new_from_fpr_po2(bloom_capacity, bloom_fpr(bloom_capacity)); + let target_fpr = bloom_fpr(bloom_capacity); + let mut bloom = BloomFilter::new_from_fpr_po2(bloom_capacity, target_fpr); self.have_cids .into_iter() @@ -202,6 +202,7 @@ impl IncrementalDagVerification { size_bits = bloom.as_bytes().len() * 8, hash_count = bloom.hash_count(), ones_count = bloom.count_ones(), + target_fpr, estimated_fpr = bloom.current_false_positive_rate(), "built 'have cids' bloom", ); diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index d994424..f70c14d 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -2,7 +2,14 @@ #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] #![deny(unreachable_pub)] -//! car-mirror +//! # Car Mirror +//! +//! This crate provides the "no-io" protocol pieces to run the car mirror protocol. +//! +//! For more information, see the `push` and `pull` modules for further documentation +//! or take a look at the [specification]. +//! +//! [specification]: https://github.com/wnfs-wg/car-mirror-spec /// Test utilities. Enabled with the `test_utils` feature flag. #[cfg(any(test, feature = "test_utils"))] @@ -11,17 +18,417 @@ pub mod test_utils; /// Module with local caching strategies and mechanisms that greatly enhance CAR mirror performance pub mod cache; -/// Common utilities +/// Code that's common among the push and pull protocol sides (most of the code). +/// +/// This code is less concerened about the "client" and "server" ends of the protocol, but +/// more about the "block sending" and "block receiving" end of the protocol. I.e. which +/// direction do blocks go? +/// When going from "push" to "pull" protocol, the client and server swap the "block sending" +/// and "block receiving" roles. +/// +/// Consider the functions in here mostly internal, and refer to the `push` and `pull` modules instead. pub mod common; /// Algorithms for walking IPLD directed acyclic graphs pub mod dag_walk; /// Error types -pub mod error; -/// Algorithms for doing incremental verification of IPLD DAGs on the receiving end. +mod error; +/// Algorithms for doing incremental verification of IPLD DAGs against a root hash on the receiving end. pub mod incremental_verification; /// Data types that are sent over-the-wire and relevant serialization code. pub mod messages; -/// The CAR mirror pull protocol. Meant to be used qualified, i.e. `pull::request` and `pull::response` +/// The CAR mirror pull protocol. Meant to be used qualified, i.e. `pull::request` and `pull::response`. +/// +/// This library exposes both streaming and non-streaming variants. It's recommended to use +/// the streaming variants if possible. +/// +/// ## Examples +/// +/// ### Test Data +/// +/// We'll set up some test data to simulate the protocol like this: +/// +/// ```no_run +/// use car_mirror::cache::InMemoryCache; +/// use wnfs_common::MemoryBlockStore; +/// use wnfs_unixfs_file::builder::FileBuilder; +/// +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// // We simulate peers having separate data stores +/// let client_store = MemoryBlockStore::new(); +/// let server_store = MemoryBlockStore::new(); +/// +/// // Give both peers ~10MB of cache space for speeding up computations. +/// // These are available under the `quick_cache` feature. +/// // (You can also implement your own, or disable caches using `NoCache`) +/// let client_cache = InMemoryCache::new(100_000); +/// let server_cache = InMemoryCache::new(100_000); +/// +/// let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// +/// // Load some data onto the client +/// let root = FileBuilder::new() +/// .content_bytes(file_bytes.clone()) +/// .fixed_chunker(1024) // Generate lots of small blocks +/// .degree(4) +/// .build()? +/// .store(&client_store) +/// .await?; +/// +/// // The server may already have a subset of the data +/// FileBuilder::new() +/// .content_bytes(file_bytes[0..10_000].to_vec()) +/// .fixed_chunker(1024) // Generate lots of small blocks +/// .degree(4) +/// .build()? +/// .store(&server_store) +/// .await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// ### With Streaming +/// +/// This simulates a pull protocol run between two peers locally: +/// +/// ``` +/// use car_mirror::{pull, common::Config}; +/// use futures::TryStreamExt; +/// use tokio_util::io::StreamReader; +/// # use car_mirror::cache::InMemoryCache; +/// # use wnfs_common::MemoryBlockStore; +/// # use wnfs_unixfs_file::builder::FileBuilder; +/// # +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// # let client_store = MemoryBlockStore::new(); +/// # let server_store = MemoryBlockStore::new(); +/// # +/// # let client_cache = InMemoryCache::new(100_000); +/// # let server_cache = InMemoryCache::new(100_000); +/// # +/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// # +/// # let root = FileBuilder::new() +/// # .content_bytes(file_bytes.clone()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&client_store) +/// # .await?; +/// # +/// # FileBuilder::new() +/// # .content_bytes(file_bytes[0..10_000].to_vec()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&server_store) +/// # .await?; +/// +/// // We set up some protocol configurations (allowed maximum block sizes etc.) +/// let config = &Config::default(); +/// +/// // The client generates a request of what data still needs to be fetched +/// let mut request = +/// pull::request(root, None, config, &client_store, &client_cache).await?; +/// +/// // The request contains information about which blocks still need to be +/// // fetched, so we can use it to find out whether we need to to fetch any +/// // blocks at all. +/// while !request.indicates_finished() { +/// // The server answers with a stream of data +/// let chunk_stream = pull::response_streaming( +/// root, +/// request, +/// &server_store, +/// &server_cache +/// ).await?; +/// +/// let byte_stream = StreamReader::new( +/// chunk_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), +/// ); +/// +/// // The client verifies & stores the streamed data and possibly +/// // interrupts the stream to produce a new request with more precise +/// // information on what to pull. +/// request = pull::handle_response_streaming( +/// root, +/// byte_stream, +/// config, +/// &client_store, +/// &client_cache, +/// ).await?; +/// } +/// # Ok(()) +/// # } +/// ``` +/// +/// +/// ### Without Streaming +/// +/// This simulates a pull protocol run between two peers locally, without streaming: +/// +/// ``` +/// use car_mirror::{pull, common::Config}; +/// # use car_mirror::cache::InMemoryCache; +/// # use wnfs_common::MemoryBlockStore; +/// # use wnfs_unixfs_file::builder::FileBuilder; +/// # +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// # let client_store = MemoryBlockStore::new(); +/// # let server_store = MemoryBlockStore::new(); +/// # +/// # let client_cache = InMemoryCache::new(100_000); +/// # let server_cache = InMemoryCache::new(100_000); +/// # +/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// # +/// # let root = FileBuilder::new() +/// # .content_bytes(file_bytes.clone()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&client_store) +/// # .await?; +/// # +/// # FileBuilder::new() +/// # .content_bytes(file_bytes[0..10_000].to_vec()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&server_store) +/// # .await?; +/// +/// // We set up some protocol configurations (allowed maximum block sizes etc.) +/// let config = &Config::default(); +/// +/// let mut last_car = None; +/// loop { +/// // The client handles a possible previous response and produces a request +/// let request = pull::request( +/// root, +/// last_car, +/// config, +/// &client_store, +/// &client_cache +/// ).await?; +/// +/// if request.indicates_finished() { +/// break; // No need to fetch more, we already have all data +/// } +/// +/// // The server consumes the car file and provides information about +/// // further blocks needed +/// last_car = Some(pull::response( +/// root, +/// request, +/// config, +/// &server_store, +/// &server_cache +/// ).await?); +/// } +/// # Ok(()) +/// # } +/// ``` pub mod pull; -/// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response` +/// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response`. +/// +/// This library exposes both streaming and non-streaming variants. It's recommended to use +/// the streaming variants if possible. +/// +/// ## Examples +/// +/// ### Test Data +/// +/// We'll set up some test data to simulate the protocol like this: +/// +/// ```no_run +/// use car_mirror::cache::InMemoryCache; +/// use wnfs_common::MemoryBlockStore; +/// use wnfs_unixfs_file::builder::FileBuilder; +/// +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// // We simulate peers having separate data stores +/// let client_store = MemoryBlockStore::new(); +/// let server_store = MemoryBlockStore::new(); +/// +/// // Give both peers ~10MB of cache space for speeding up computations. +/// // These are available under the `quick_cache` feature. +/// // (You can also implement your own, or disable caches using `NoCache`) +/// let client_cache = InMemoryCache::new(100_000); +/// let server_cache = InMemoryCache::new(100_000); +/// +/// let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// +/// // Load some data onto the client +/// let root = FileBuilder::new() +/// .content_bytes(file_bytes.clone()) +/// .fixed_chunker(1024) // Generate lots of small blocks +/// .degree(4) +/// .build()? +/// .store(&client_store) +/// .await?; +/// +/// // The server may already have a subset of the data +/// FileBuilder::new() +/// .content_bytes(file_bytes[0..10_000].to_vec()) +/// .fixed_chunker(1024) // Generate lots of small blocks +/// .degree(4) +/// .build()? +/// .store(&server_store) +/// .await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// ### With Streaming +/// +/// This simulates a push protocol run between two peers locally: +/// +/// ``` +/// use car_mirror::{push, common::Config}; +/// use futures::TryStreamExt; +/// use tokio_util::io::StreamReader; +/// # use car_mirror::cache::InMemoryCache; +/// # use wnfs_common::MemoryBlockStore; +/// # use wnfs_unixfs_file::builder::FileBuilder; +/// # +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// # let client_store = MemoryBlockStore::new(); +/// # let server_store = MemoryBlockStore::new(); +/// # +/// # let client_cache = InMemoryCache::new(100_000); +/// # let server_cache = InMemoryCache::new(100_000); +/// # +/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// # +/// # let root = FileBuilder::new() +/// # .content_bytes(file_bytes.clone()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&client_store) +/// # .await?; +/// # +/// # FileBuilder::new() +/// # .content_bytes(file_bytes[0..10_000].to_vec()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&server_store) +/// # .await?; +/// +/// // We set up some protocol configurations (allowed maximum block sizes etc.) +/// let config = &Config::default(); +/// +/// let mut last_response = None; +/// loop { +/// // The client generates a request that streams the data to the server +/// let chunk_stream = push::request_streaming( +/// root, +/// last_response, +/// &client_store, +/// &client_cache +/// ).await?; +/// +/// let byte_stream = StreamReader::new( +/// chunk_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), +/// ); +/// +/// // The server consumes the streaming request & interrupts with new +/// // information about what blocks it already has or in case the client +/// // can stop sending altogether. +/// let response = push::response_streaming( +/// root, +/// byte_stream, +/// config, +/// &server_store, +/// &server_cache +/// ).await?; +/// +/// if response.indicates_finished() { +/// break; // we're done! +/// } +/// +/// last_response = Some(response); +/// } +/// # Ok(()) +/// # } +/// ``` +/// +/// +/// ### Without Streaming +/// +/// This simulates a push protocol run between two peers locally, without streaming: +/// +/// ``` +/// use car_mirror::{push, common::Config}; +/// # use car_mirror::cache::InMemoryCache; +/// # use wnfs_common::MemoryBlockStore; +/// # use wnfs_unixfs_file::builder::FileBuilder; +/// # +/// # #[async_std::main] +/// # async fn main() -> anyhow::Result<()> { +/// # let client_store = MemoryBlockStore::new(); +/// # let server_store = MemoryBlockStore::new(); +/// # +/// # let client_cache = InMemoryCache::new(100_000); +/// # let server_cache = InMemoryCache::new(100_000); +/// # +/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?; +/// # +/// # let root = FileBuilder::new() +/// # .content_bytes(file_bytes.clone()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&client_store) +/// # .await?; +/// # +/// # FileBuilder::new() +/// # .content_bytes(file_bytes[0..10_000].to_vec()) +/// # .fixed_chunker(1024) // Generate lots of small blocks +/// # .degree(4) +/// # .build()? +/// # .store(&server_store) +/// # .await?; +/// +/// // We set up some protocol configurations (allowed maximum block sizes etc.) +/// let config = &Config::default(); +/// +/// let mut last_response = None; +/// loop { +/// // The client creates a CAR file for the request +/// let car_file = push::request( +/// root, +/// last_response, +/// config, +/// &client_store, +/// &client_cache +/// ).await?; +/// +/// // The server consumes the car file and provides information about +/// // further blocks needed +/// let response = push::response( +/// root, +/// car_file, +/// config, +/// &server_store, +/// &server_cache +/// ).await?; +/// +/// if response.indicates_finished() { +/// break; // we're done! +/// } +/// +/// last_response = Some(response); +/// } +/// # Ok(()) +/// # } +/// ``` pub mod push; + +pub use error::*; diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index f4aa5bd..eb1e129 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -1,11 +1,15 @@ use crate::{ cache::Cache, - common::{block_receive, block_send, CarFile, Config, ReceiverState}, + common::{ + block_receive, block_receive_car_stream, block_send, block_send_block_stream, + stream_car_frames, CarFile, CarStream, Config, ReceiverState, + }, error::Error, messages::PullRequest, }; use libipld::Cid; -use wnfs_common::BlockStore; +use tokio::io::AsyncRead; +use wnfs_common::{utils::CondSend, BlockStore}; /// Create a CAR mirror pull request. /// @@ -17,7 +21,7 @@ use wnfs_common::BlockStore; /// /// Before actually sending the request over the network, /// make sure to check the `request.indicates_finished()`. -/// If true, the client already has all data and the request +/// If true, the "client" already has all data and the request /// doesn't need to be sent. pub async fn request( root: Cid, @@ -31,7 +35,23 @@ pub async fn request( .into()) } -/// Respond to a CAR mirror pull request. +/// On the "client" side, handle a streaming response from a pull request. +/// +/// This will accept blocks as long as they're useful to get the DAG under +/// `root`, verify them, and store them in the given `store`. +pub async fn handle_response_streaming( + root: Cid, + stream: impl AsyncRead + Unpin + CondSend, + config: &Config, + store: impl BlockStore, + cache: impl Cache, +) -> Result { + Ok(block_receive_car_stream(root, stream, config, store, cache) + .await? + .into()) +} + +/// Respond to a CAR mirror pull request on the "server" side. pub async fn response( root: Cid, request: PullRequest, @@ -43,19 +63,35 @@ pub async fn response( block_send(root, receiver_state, config, store, cache).await } +/// On the "server" side, respond to a pull request with a stream. +/// +/// This can especially speed up cold pull requests. +pub async fn response_streaming<'a>( + root: Cid, + request: PullRequest, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, +) -> Result, Error> { + let block_stream = block_send_block_stream(root, Some(request.into()), store, cache).await?; + let car_stream = stream_car_frames(block_stream).await?; + Ok(car_stream) +} + #[cfg(test)] mod tests { use crate::{ - cache::NoCache, + cache::{InMemoryCache, NoCache}, common::Config, dag_walk::DagWalk, - test_utils::{setup_random_dag, Metrics}, + pull, + test_utils::{setup_random_dag, store_test_unixfs, Metrics}, }; use anyhow::Result; use futures::TryStreamExt; use libipld::Cid; use std::collections::HashSet; use testresult::TestResult; + use tokio_util::io::StreamReader; use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( @@ -65,11 +101,10 @@ mod tests { server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); - let mut request = crate::pull::request(root, None, config, client_store, &NoCache).await?; - loop { + let mut request = pull::request(root, None, config, client_store, &NoCache).await?; + while !request.indicates_finished() { let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len(); - let response = - crate::pull::response(root, request, config, server_store, NoCache).await?; + let response = pull::response(root, request, config, server_store, NoCache).await?; let response_bytes = response.bytes.len(); metrics.push(Metrics { @@ -77,11 +112,7 @@ mod tests { response_bytes, }); - request = - crate::pull::request(root, Some(response), config, client_store, &NoCache).await?; - if request.indicates_finished() { - break; - } + request = pull::request(root, Some(response), config, client_store, &NoCache).await?; } Ok(metrics) @@ -110,6 +141,43 @@ mod tests { Ok(()) } + + #[test_log::test(async_std::test)] + async fn test_streaming_transfer() -> TestResult { + let client_store = MemoryBlockStore::new(); + let server_store = MemoryBlockStore::new(); + + let client_cache = InMemoryCache::new(100_000); + let server_cache = InMemoryCache::new(100_000); + + let file_bytes = async_std::fs::read("../Cargo.lock").await?; + let root = store_test_unixfs(file_bytes.clone(), &client_store).await?; + store_test_unixfs(file_bytes[0..10_000].to_vec(), &server_store).await?; + + let config = &Config::default(); + + let mut request = pull::request(root, None, config, &client_store, &client_cache).await?; + + while !request.indicates_finished() { + let car_stream = + pull::response_streaming(root, request, &server_store, &server_cache).await?; + + let byte_stream = StreamReader::new( + car_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + request = pull::handle_response_streaming( + root, + byte_stream, + config, + &client_store, + &client_cache, + ) + .await?; + } + + Ok(()) + } } #[cfg(test)] @@ -118,6 +186,7 @@ mod proptests { cache::NoCache, common::Config, dag_walk::DagWalk, + pull, test_utils::{setup_blockstore, variable_blocksize_dag}, }; use futures::TryStreamExt; @@ -133,14 +202,9 @@ mod proptests { let server_store = &setup_blockstore(blocks).await.unwrap(); let client_store = &MemoryBlockStore::new(); - crate::pull::tests::simulate_protocol( - root, - &Config::default(), - client_store, - server_store, - ) - .await - .unwrap(); + pull::tests::simulate_protocol(root, &Config::default(), client_store, server_store) + .await + .unwrap(); // client should have all data let client_cids = DagWalk::breadth_first([root]) diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index f0d5c17..c7ae1c8 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -1,11 +1,14 @@ use crate::{ cache::Cache, - common::{block_receive, block_send, CarFile, Config, ReceiverState}, + common::{ + block_receive, block_receive_car_stream, block_send, block_send_block_stream, + stream_car_frames, CarFile, CarStream, Config, ReceiverState, + }, error::Error, messages::PushResponse, }; use libipld_core::cid::Cid; -use wnfs_common::BlockStore; +use wnfs_common::{utils::CondSend, BlockStore}; /// Create a CAR mirror push request. /// @@ -28,13 +31,30 @@ pub async fn request( block_send(root, receiver_state, config, store, cache).await } +/// Streaming version of `request` to create a push request. +/// +/// It's recommended to run the streaming push until the "server" interrupts +/// it with an updated `PushResponse`. Then continuing with another +/// push request with updated information. +pub async fn request_streaming<'a>( + root: Cid, + last_response: Option, + store: impl BlockStore + 'a, + cache: impl Cache + 'a, +) -> Result, Error> { + let receiver_state = last_response.map(|s| s.into()); + let block_stream = block_send_block_stream(root, receiver_state, store, cache).await?; + let car_stream = stream_car_frames(block_stream).await?; + Ok(car_stream) +} + /// Create a response for a CAR mirror push request. /// /// This takes in the CAR file from the request body and stores its blocks /// in the given `store`, if the blocks can be shown to relate /// to the `root` CID. /// -/// Returns a response that gives the client information about what +/// Returns a response that gives the "client" information about what /// other data remains to be fetched. pub async fn response( root: Cid, @@ -48,15 +68,36 @@ pub async fn response( .into()) } +/// Respond to a push request on the "server" side in a streaming fashing +/// (as opposed to the `response` function). +/// +/// This will read from the `request` until the server realizes it got +/// some bytes it already had. Then it'll create an updated bloom filter +/// and send a `PushResponse`, interrupting the incoming stream. +pub async fn response_streaming( + root: Cid, + request: impl tokio::io::AsyncRead + Unpin + CondSend, + config: &Config, + store: impl BlockStore, + cache: impl Cache, +) -> Result { + Ok( + block_receive_car_stream(root, request, config, store, cache) + .await? + .into(), + ) +} + #[cfg(test)] mod tests { use crate::{ - cache::NoCache, + cache::{InMemoryCache, NoCache}, common::Config, dag_walk::DagWalk, + push, test_utils::{ - get_cid_at_approx_path, setup_random_dag, total_dag_blocks, total_dag_bytes, Metrics, - Rvg, + get_cid_at_approx_path, setup_random_dag, store_test_unixfs, total_dag_blocks, + total_dag_bytes, Metrics, Rvg, }, }; use anyhow::Result; @@ -65,6 +106,7 @@ mod tests { use proptest::collection::vec; use std::collections::HashSet; use testresult::TestResult; + use tokio_util::io::StreamReader; use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( @@ -74,11 +116,10 @@ mod tests { server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); - let mut request = crate::push::request(root, None, config, client_store, &NoCache).await?; + let mut request = push::request(root, None, config, client_store, &NoCache).await?; loop { let request_bytes = request.bytes.len(); - let response = - crate::push::response(root, request, config, server_store, &NoCache).await?; + let response = push::response(root, request, config, server_store, &NoCache).await?; let response_bytes = serde_ipld_dagcbor::to_vec(&response)?.len(); metrics.push(Metrics { @@ -89,8 +130,7 @@ mod tests { if response.indicates_finished() { break; } - request = - crate::push::request(root, Some(response), config, client_store, &NoCache).await?; + request = push::request(root, Some(response), config, client_store, &NoCache).await?; } Ok(metrics) @@ -119,6 +159,42 @@ mod tests { Ok(()) } + #[test_log::test(async_std::test)] + async fn test_streaming_transfer() -> TestResult { + let client_store = MemoryBlockStore::new(); + let server_store = MemoryBlockStore::new(); + + let client_cache = InMemoryCache::new(100_000); + let server_cache = InMemoryCache::new(100_000); + + let file_bytes = async_std::fs::read("../Cargo.lock").await?; + let root = store_test_unixfs(file_bytes.clone(), &client_store).await?; + store_test_unixfs(file_bytes[0..10_000].to_vec(), &server_store).await?; + + let config = &Config::default(); + + let mut last_response = None; + loop { + let stream = + push::request_streaming(root, last_response, &client_store, &client_cache).await?; + + let byte_stream = StreamReader::new( + stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + let response = + push::response_streaming(root, byte_stream, config, &server_store, &server_cache) + .await?; + + if response.indicates_finished() { + break; + } + + last_response = Some(response); + } + Ok(()) + } + #[test_log::test(async_std::test)] async fn test_deduplicating_transfer() -> TestResult { let (root, ref client_store) = setup_random_dag(256, 10 * 1024 /* 10 KiB */).await?; @@ -191,6 +267,7 @@ mod proptests { cache::NoCache, common::Config, dag_walk::DagWalk, + push, test_utils::{setup_blockstore, variable_blocksize_dag}, }; use futures::TryStreamExt; @@ -206,14 +283,9 @@ mod proptests { let client_store = &setup_blockstore(blocks).await.unwrap(); let server_store = &MemoryBlockStore::new(); - crate::push::tests::simulate_protocol( - root, - &Config::default(), - client_store, - server_store, - ) - .await - .unwrap(); + push::tests::simulate_protocol(root, &Config::default(), client_store, server_store) + .await + .unwrap(); // client should have all data let client_cids = DagWalk::breadth_first([root]) diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index e1b08ab..54f469f 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -94,3 +94,13 @@ pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Resu } pub(crate) fn assert_cond_send_sync(_fut: fn() -> T) {} + +pub(crate) async fn store_test_unixfs(data: Vec, store: &impl BlockStore) -> Result { + wnfs_unixfs_file::builder::FileBuilder::new() + .content_bytes(data) + .fixed_chunker(1024) // Generate lots of small blocks + .degree(4) + .build()? + .store(store) + .await +} diff --git a/deny.toml b/deny.toml index 120a945..f36d036 100644 --- a/deny.toml +++ b/deny.toml @@ -77,6 +77,7 @@ allow = [ "BSD-3-Clause", "ISC", "Zlib", + "MPL-2.0", "BSL-1.0" ] # List of explicitly disallowed licenses