diff --git a/Cargo.lock b/Cargo.lock index b764437..01fa289 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,73 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00c055ee2d014ae5981ce1016374e8213682aa14d9bf40e48ab48b5f3ef20eaa" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -499,7 +566,8 @@ name = "car-mirror-reqwest" version = "0.1.0" dependencies = [ "anyhow", - "async-std", + "axum", + "axum-macros", "bytes", "car-mirror", "futures", @@ -510,6 +578,7 @@ dependencies = [ "test-strategy", "testresult", "thiserror", + "tokio", "tokio-util", "tracing", "tracing-subscriber", @@ -1107,7 +1176,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", + "indexmap 2.2.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap 2.2.3", "slab", "tokio", @@ -1137,6 +1225,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1163,6 +1257,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1170,7 +1275,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1196,9 +1324,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1210,6 +1338,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1217,13 +1364,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.11", + "hyper 0.14.28", "rustls", "tokio", "tokio-rustls", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.5", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1508,6 +1671,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.1" @@ -1637,6 +1806,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.6", + "libc", +] + [[package]] name = "num_enum" version = "0.5.11" @@ -1726,6 +1905,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -2016,10 +2215,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "ipnet", "js-sys", @@ -2057,7 +2256,7 @@ checksum = "88a3e86aa6053e59030e7ce2d2a3b258dd08fc2d337d52f73f6cb480f5858690" dependencies = [ "anyhow", "async-trait", - "http", + "http 0.2.11", "reqwest", "serde", "task-local-extensions", @@ -2165,6 +2364,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -2275,6 +2480,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2317,6 +2532,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -2579,11 +2803,26 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.5", + "tokio-macros", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -2617,6 +2856,28 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -2629,6 +2890,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/car-mirror-reqwest/Cargo.toml b/car-mirror-reqwest/Cargo.toml index 8e10c31..00cfee9 100644 --- a/car-mirror-reqwest/Cargo.toml +++ b/car-mirror-reqwest/Cargo.toml @@ -23,18 +23,22 @@ bytes = "1.4" car-mirror = { version = "0.1", path = "../car-mirror" } futures = "0.3" libipld = "0.16" -reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls-tls", "stream"] } -reqwest-middleware = "0.2.4" +reqwest = { version = "0.11", default-features = false, features = ["json", "stream"] } +reqwest-middleware = "0.2" thiserror = "1.0" -tokio-util = "0.7" +tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" wnfs-common = { workspace = true } [dev-dependencies] -async-std = { version = "1.11", features = ["attributes"] } +axum = "0.7" +axum-macros = "0.4" +car-mirror = { version = "0.1", path = "../car-mirror", features = ["quick_cache"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } test-log = { version = "0.2", default-features = false, features = ["trace"] } test-strategy = "0.3" testresult = "0.3" +tokio = { version = "1.0", features = ["full"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } [features] diff --git a/car-mirror-reqwest/examples/axum.rs b/car-mirror-reqwest/examples/axum.rs new file mode 100644 index 0000000..28634c9 --- /dev/null +++ b/car-mirror-reqwest/examples/axum.rs @@ -0,0 +1,146 @@ +use anyhow::Result; +use axum::{ + body::Body, + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use car_mirror::{ + cache::{InMemoryCache, NoCache}, + common::Config, + messages::{PullRequest, PushResponse}, +}; +use car_mirror_reqwest::RequestBuilderExt; +use futures::TryStreamExt; +use libipld::Cid; +use reqwest::Client; +use std::{future::IntoFuture, str::FromStr}; +use tokio_util::io::StreamReader; +use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; + +#[tokio::main] +async fn main() -> Result<()> { + // Say, you have a webserver running like so: + let app = Router::new() + .route("/dag/pull/:cid", get(car_mirror_pull)) + .route("/dag/push/:cid", post(car_mirror_push)) + .with_state(ServerState::new()); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3344").await?; + tokio::spawn(axum::serve(listener, app).into_future()); + + // You can issue requests from your client like so: + let store = MemoryBlockStore::new(); + let data = b"Hello, world!".to_vec(); + let root = store.put_block(data, CODEC_RAW).await?; + + let config = &Config::default(); + + let client = Client::new(); + client + .post(format!("http://localhost:3344/dag/push/{root}")) + .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol + .await?; + + let store = MemoryBlockStore::new(); // clear out data + client + .get(format!("http://localhost:3344/dag/pull/{root}")) + .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol + .await?; + + assert!(store.has_block(&root).await?); + + Ok(()) +} + +// Server details: + +#[derive(Debug, Clone)] +struct ServerState { + store: MemoryBlockStore, + cache: InMemoryCache, +} + +impl ServerState { + fn new() -> Self { + Self { + store: MemoryBlockStore::new(), + cache: InMemoryCache::new(100_000), + } + } +} + +#[axum_macros::debug_handler] +async fn car_mirror_push( + State(state): State, + Path(cid_string): Path, + body: Body, +) -> Result<(StatusCode, Json), AppError> +where { + let cid = Cid::from_str(&cid_string)?; + + let body_stream = body.into_data_stream(); + + let reader = StreamReader::new( + body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + let response = car_mirror::push::response_streaming( + cid, + reader, + &Config::default(), + &state.store, + &state.cache, + ) + .await?; + + if response.indicates_finished() { + Ok((StatusCode::OK, Json(response))) + } else { + Ok((StatusCode::ACCEPTED, Json(response))) + } +} + +#[axum_macros::debug_handler] +async fn car_mirror_pull( + State(state): State, + Path(cid_string): Path, + Json(request): Json, +) -> Result<(StatusCode, Body), AppError> { + let cid = Cid::from_str(&cid_string)?; + + let car_chunks = car_mirror::pull::response_streaming( + cid, + request, + state.store.clone(), + state.cache.clone(), + ) + .await?; + + Ok((StatusCode::OK, Body::from_stream(car_chunks))) +} + +// Basic anyhow error handling: + +struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} + +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} diff --git a/car-mirror-reqwest/src/lib.rs b/car-mirror-reqwest/src/lib.rs index 2e5fa6e..c38dc7d 100644 --- a/car-mirror-reqwest/src/lib.rs +++ b/car-mirror-reqwest/src/lib.rs @@ -2,7 +2,42 @@ #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] #![deny(unreachable_pub)] -//! A helper library that helps making car-mirror client requests using reqwest +//! # car-mirror-reqwest +//! +//! A helper library that helps making car-mirror client requests using reqwest. +//! +//! ```no_run +//! # use anyhow::Result; +//! use car_mirror::{cache::NoCache, common::Config}; +//! use car_mirror_reqwest::RequestBuilderExt; +//! use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; +//! +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! let store = MemoryBlockStore::new(); +//! let data = b"Hello, world!".to_vec(); +//! let root = store.put_block(data, CODEC_RAW).await?; +//! +//! let config = &Config::default(); +//! +//! let client = reqwest::Client::new(); +//! client +//! .post(format!("http://localhost:3344/dag/push/{root}")) +//! .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol +//! .await?; +//! +//! let store = MemoryBlockStore::new(); // clear out data +//! client +//! .get(format!("http://localhost:3344/dag/pull/{root}")) +//! .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol +//! .await?; +//! +//! assert!(store.has_block(&root).await?); +//! # Ok(()) +//! # } +//! ``` +//! +//! For the full example, please see `examples/axum.rs` in the source repository. mod error; mod request; diff --git a/car-mirror/src/cache.rs b/car-mirror/src/cache.rs index ef3b7ac..df2293b 100644 --- a/car-mirror/src/cache.rs +++ b/car-mirror/src/cache.rs @@ -91,7 +91,7 @@ impl Cache for Box { } /// An implementation of `Cache` that doesn't cache at all. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NoCache; impl Cache for NoCache { diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 698ba1e..f70c14d 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -58,7 +58,7 @@ pub mod messages; /// let client_store = MemoryBlockStore::new(); /// let server_store = MemoryBlockStore::new(); /// -/// // Give both peers ~1MB of cache space for speeding up computations. +/// // Give both peers ~10MB of cache space for speeding up computations. /// // These are available under the `quick_cache` feature. /// // (You can also implement your own, or disable caches using `NoCache`) /// let client_cache = InMemoryCache::new(100_000); @@ -254,7 +254,7 @@ pub mod pull; /// let client_store = MemoryBlockStore::new(); /// let server_store = MemoryBlockStore::new(); /// -/// // Give both peers ~1MB of cache space for speeding up computations. +/// // Give both peers ~10MB of cache space for speeding up computations. /// // These are available under the `quick_cache` feature. /// // (You can also implement your own, or disable caches using `NoCache`) /// let client_cache = InMemoryCache::new(100_000);