From ba5a662ffcbbec27ed26131cb4a67b7216cf0bbe Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 29 Oct 2024 16:42:30 -0200 Subject: [PATCH] feat: Use pre-chunks Bytes and re-chunk in data_store --- Cargo.lock | 8 +++-- crates/dwn-rs-core/Cargo.toml | 1 + crates/dwn-rs-core/src/stores.rs | 3 +- crates/dwn-rs-stores/Cargo.toml | 1 + .../dwn-rs-stores/src/surrealdb/data_store.rs | 33 +++++++++++-------- crates/dwn-rs-wasm/Cargo.toml | 4 +-- crates/dwn-rs-wasm/src/streams/stream.rs | 9 ++--- .../dwn-rs-wasm/src/surrealdb/data_store.rs | 7 ++-- 8 files changed, 37 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23424b8..16b0d72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,9 +780,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -1389,6 +1389,7 @@ name = "dwn-rs-core" version = "0.1.0" dependencies = [ "base64 0.22.1", + "bytes", "chrono", "cid", "derive_more", @@ -1416,6 +1417,7 @@ name = "dwn-rs-stores" version = "0.1.0" dependencies = [ "async-std", + "bytes", "chrono", "cid", "dwn-rs-core", @@ -1444,6 +1446,7 @@ name = "dwn-rs-wasm" version = "0.2.1" dependencies = [ "async-std", + "bytes", "console_error_panic_hook", "dwn-rs-core", "dwn-rs-stores", @@ -1452,7 +1455,6 @@ dependencies = [ "js-sys", "serde", "serde-wasm-bindgen 0.6.5", - "serde_bytes", "tracing", "tracing-subscriber", "tracing-subscriber-wasm", diff --git a/crates/dwn-rs-core/Cargo.toml b/crates/dwn-rs-core/Cargo.toml index 1c08eee..2104b6f 100644 --- a/crates/dwn-rs-core/Cargo.toml +++ b/crates/dwn-rs-core/Cargo.toml @@ -38,6 +38,7 @@ thiserror = "1.0.63" serde_json = "1.0.113" tracing = "0.1.40" tracing-test = { version = "0.2.5", features = ["no-env-filter"] } +bytes = "1.8.0" [dev-dependencies] serde_json = "1.0.113" diff --git a/crates/dwn-rs-core/src/stores.rs b/crates/dwn-rs-core/src/stores.rs index 82e93d8..af41a1e 100644 --- a/crates/dwn-rs-core/src/stores.rs +++ b/crates/dwn-rs-core/src/stores.rs @@ -1,5 +1,6 @@ use std::{fmt::Debug, future::Future, pin::Pin}; +use bytes::Bytes; use futures_util::Stream; use ipld_core::cid::Cid; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -53,7 +54,7 @@ pub trait DataStore: Default { fn close(&mut self) -> impl Future + Send; - fn put + Send + Unpin>( + fn put + Send + Unpin>( &self, tenant: &str, record_id: &str, diff --git a/crates/dwn-rs-stores/Cargo.toml b/crates/dwn-rs-stores/Cargo.toml index 9422857..4924a44 100644 --- a/crates/dwn-rs-stores/Cargo.toml +++ b/crates/dwn-rs-stores/Cargo.toml @@ -53,6 +53,7 @@ async-std = { version = "1.12.0", default-features = false, features = [ "pin-project-lite", ] } memoize = { version = "0.4.2", default-features = false } +bytes = { version = "1.8.0", features = ["serde"] } [dev-dependencies] rand = "0.8.5" diff --git a/crates/dwn-rs-stores/src/surrealdb/data_store.rs b/crates/dwn-rs-stores/src/surrealdb/data_store.rs index e910d25..dd92fb5 100644 --- a/crates/dwn-rs-stores/src/surrealdb/data_store.rs +++ b/crates/dwn-rs-stores/src/surrealdb/data_store.rs @@ -1,4 +1,5 @@ use async_std::stream::from_iter; +use bytes::Bytes; use futures_util::{pin_mut, Stream, StreamExt}; use surrealdb::{ sql::{ @@ -43,13 +44,15 @@ impl DataStore for SurrealDB { value: T, ) -> Result where - T: Stream + Unpin + Send, + T: Stream + Unpin + Send, { - let chunks = value.chunks(CHUNK_CAPACITY); - pin_mut!(chunks); - let id: RecordId = (DATA_TABLE, record_id.to_string()).into(); + pin_mut!(value); + let mut chunks = value + .flat_map(|b| from_iter(b.into_iter())) + .chunks(CHUNK_CAPACITY); + let len = self .with_database(tenant, |db| async move { db.delete::>(id.clone()) @@ -282,7 +285,7 @@ fn chunks_graph_query() -> Query { #[cfg(test)] mod test { - use async_std::stream::from_iter; + use async_std::stream; use futures_util::StreamExt; use std::iter::repeat_with; @@ -307,12 +310,14 @@ mod test { let record_id = "test_put_get"; let cid = "test_put_get_cid"; - let data = repeat_with(rand::random::) - .take(1024 * 1024) - .collect::>(); + let data = Bytes::from_iter( + repeat_with(rand::random::) + .take(1024 * 1024) + .collect::>(), + ); let put = db - .put(tenant, record_id, cid, from_iter(data.clone())) + .put(tenant, record_id, cid, stream::once(data.clone())) .await .unwrap(); assert_eq!(put.size, data.len()); @@ -352,12 +357,14 @@ mod test { let record_id = "test_delete"; let cid = "test_delete_cid"; - let data = repeat_with(rand::random::) - .take(1024 * 1024) - .collect::>(); + let data = Bytes::from_iter( + repeat_with(rand::random::) + .take(1024 * 1024) + .collect::>(), + ); let put = db - .put(tenant, record_id, cid, from_iter(data.clone())) + .put(tenant, record_id, cid, stream::once(data.clone())) .await .unwrap(); assert_eq!(put.size, data.len()); diff --git a/crates/dwn-rs-wasm/Cargo.toml b/crates/dwn-rs-wasm/Cargo.toml index ab9ee68..dd8aae4 100644 --- a/crates/dwn-rs-wasm/Cargo.toml +++ b/crates/dwn-rs-wasm/Cargo.toml @@ -43,9 +43,6 @@ web-sys = { version = "0.3.64", features = [ "ReadableStream", ] } -serde_bytes = { version = "0.11.14", default-features = false, features = [ - "alloc", -] } tracing = { version = "0.1.40", default-features = false, features = [ "attributes", ] } @@ -75,3 +72,4 @@ async-std = { version = "1.12.0", default-features = false, features = [ "kv-log-macro", "pin-project-lite", ] } +bytes = { version = "1.8.0", features = ["serde"] } diff --git a/crates/dwn-rs-wasm/src/streams/stream.rs b/crates/dwn-rs-wasm/src/streams/stream.rs index 519d9e5..f45cb4a 100644 --- a/crates/dwn-rs-wasm/src/streams/stream.rs +++ b/crates/dwn-rs-wasm/src/streams/stream.rs @@ -5,6 +5,7 @@ use core::{ use alloc::boxed::Box; use async_std::channel::{unbounded, Receiver}; +use bytes::Bytes; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use wasm_bindgen::prelude::*; @@ -40,7 +41,7 @@ impl StreamReadable { /// JavaScript stream, as JsValues. pub async fn from_stream(stream: St) -> Result where - St: Stream> + 'static, + St: Stream> + 'static, { let (data_tx, data_rx) = unbounded::(); let controller = AbortController::new()?; @@ -100,7 +101,7 @@ impl StreamReadable { /// can be used in Rust to read data from the JavaScript stream, and return items as a JsValue. #[derive(Debug)] pub struct IntoStream { - data_rx: Receiver, + data_rx: Receiver, done_rx: Receiver<()>, done: bool, } @@ -108,7 +109,7 @@ pub struct IntoStream { impl IntoStream { pub fn new(r: StreamReadable) -> Self { let readable = r.as_raw(); - let (data_tx, data_rx) = unbounded::(); + let (data_tx, data_rx) = unbounded::(); let (done_tx, done_rx) = unbounded::<()>(); let data_cb = Closure::wrap(Box::new(move |d: JsValue| { @@ -138,7 +139,7 @@ impl IntoStream { } impl Stream for IntoStream { - type Item = serde_bytes::ByteBuf; + type Item = Bytes; // poll_next is the main function that drives the stream. It is called by the runtime to // read the data in the Readable, and return it as a JsValue. diff --git a/crates/dwn-rs-wasm/src/surrealdb/data_store.rs b/crates/dwn-rs-wasm/src/surrealdb/data_store.rs index 446d0a6..b6009d5 100644 --- a/crates/dwn-rs-wasm/src/surrealdb/data_store.rs +++ b/crates/dwn-rs-wasm/src/surrealdb/data_store.rs @@ -60,10 +60,7 @@ impl SurrealDataStore { cid: &str, value: Readable, ) -> Result { - let readable = StreamReadable::new(value).into_stream().flat_map(|r| { - let val = serde_wasm_bindgen::to_value(&r).unwrap(); - async_std::stream::from_iter(js_sys::Uint8Array::new(&val).to_vec()) - }); + let readable = StreamReadable::new(value).into_stream(); match self .store @@ -92,7 +89,7 @@ impl SurrealDataStore { let reader = v .data .chunks(READ_CHUNK_SIZE) - .map(|r| Some(serde_bytes::ByteBuf::from(r))); + .map(|r| Some(r.to_vec().into())); let obj: DataStoreGetResult = JsCast::unchecked_into(Object::new()); Reflect::set(&obj, &"dataSize".into(), &size.into())?;