Skip to content

Commit

Permalink
feat: Use pre-chunks Bytes and re-chunk in data_store
Browse files Browse the repository at this point in the history
  • Loading branch information
enmand committed Oct 29, 2024
1 parent 35e69d8 commit ba5a662
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 29 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dwn-rs-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ thiserror = "1.0.63"
serde_json = "1.0.113"
tracing = "0.1.40"
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
bytes = "1.8.0"

[dev-dependencies]
serde_json = "1.0.113"
3 changes: 2 additions & 1 deletion crates/dwn-rs-core/src/stores.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt::Debug, future::Future, pin::Pin};

use bytes::Bytes;
use futures_util::Stream;
use ipld_core::cid::Cid;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub trait DataStore: Default {

fn close(&mut self) -> impl Future<Output = ()> + Send;

fn put<T: Stream<Item = u8> + Send + Unpin>(
fn put<T: Stream<Item = Bytes> + Send + Unpin>(
&self,
tenant: &str,
record_id: &str,
Expand Down
1 change: 1 addition & 0 deletions crates/dwn-rs-stores/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async-std = { version = "1.12.0", default-features = false, features = [
"pin-project-lite",
] }
memoize = { version = "0.4.2", default-features = false }
bytes = { version = "1.8.0", features = ["serde"] }

[dev-dependencies]
rand = "0.8.5"
33 changes: 20 additions & 13 deletions crates/dwn-rs-stores/src/surrealdb/data_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_std::stream::from_iter;
use bytes::Bytes;
use futures_util::{pin_mut, Stream, StreamExt};
use surrealdb::{
sql::{
Expand Down Expand Up @@ -43,13 +44,15 @@ impl DataStore for SurrealDB {
value: T,
) -> Result<PutDataResults, DataStoreError>
where
T: Stream<Item = u8> + Unpin + Send,
T: Stream<Item = Bytes> + Unpin + Send,
{
let chunks = value.chunks(CHUNK_CAPACITY);
pin_mut!(chunks);

let id: RecordId = (DATA_TABLE, record_id.to_string()).into();

pin_mut!(value);
let mut chunks = value
.flat_map(|b| from_iter(b.into_iter()))
.chunks(CHUNK_CAPACITY);

let len = self
.with_database(tenant, |db| async move {
db.delete::<Option<GetData>>(id.clone())
Expand Down Expand Up @@ -282,7 +285,7 @@ fn chunks_graph_query() -> Query {

#[cfg(test)]
mod test {
use async_std::stream::from_iter;
use async_std::stream;
use futures_util::StreamExt;
use std::iter::repeat_with;

Expand All @@ -307,12 +310,14 @@ mod test {
let record_id = "test_put_get";
let cid = "test_put_get_cid";

let data = repeat_with(rand::random::<u8>)
.take(1024 * 1024)
.collect::<Vec<u8>>();
let data = Bytes::from_iter(
repeat_with(rand::random::<u8>)
.take(1024 * 1024)
.collect::<Vec<u8>>(),
);

let put = db
.put(tenant, record_id, cid, from_iter(data.clone()))
.put(tenant, record_id, cid, stream::once(data.clone()))
.await
.unwrap();
assert_eq!(put.size, data.len());
Expand Down Expand Up @@ -352,12 +357,14 @@ mod test {
let record_id = "test_delete";
let cid = "test_delete_cid";

let data = repeat_with(rand::random::<u8>)
.take(1024 * 1024)
.collect::<Vec<u8>>();
let data = Bytes::from_iter(
repeat_with(rand::random::<u8>)
.take(1024 * 1024)
.collect::<Vec<u8>>(),
);

let put = db
.put(tenant, record_id, cid, from_iter(data.clone()))
.put(tenant, record_id, cid, stream::once(data.clone()))
.await
.unwrap();
assert_eq!(put.size, data.len());
Expand Down
4 changes: 1 addition & 3 deletions crates/dwn-rs-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ web-sys = { version = "0.3.64", features = [
"ReadableStream",
] }

serde_bytes = { version = "0.11.14", default-features = false, features = [
"alloc",
] }
tracing = { version = "0.1.40", default-features = false, features = [
"attributes",
] }
Expand Down Expand Up @@ -75,3 +72,4 @@ async-std = { version = "1.12.0", default-features = false, features = [
"kv-log-macro",
"pin-project-lite",
] }
bytes = { version = "1.8.0", features = ["serde"] }
9 changes: 5 additions & 4 deletions crates/dwn-rs-wasm/src/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use core::{

use alloc::boxed::Box;
use async_std::channel::{unbounded, Receiver};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use wasm_bindgen::prelude::*;
Expand Down Expand Up @@ -40,7 +41,7 @@ impl StreamReadable {
/// JavaScript stream, as JsValues.
pub async fn from_stream<St>(stream: St) -> Result<Self, JsValue>
where
St: Stream<Item = Option<serde_bytes::ByteBuf>> + 'static,
St: Stream<Item = Option<Bytes>> + 'static,
{
let (data_tx, data_rx) = unbounded::<JsValue>();
let controller = AbortController::new()?;
Expand Down Expand Up @@ -100,15 +101,15 @@ impl StreamReadable {
/// can be used in Rust to read data from the JavaScript stream, and return items as a JsValue.
#[derive(Debug)]
pub struct IntoStream {
data_rx: Receiver<serde_bytes::ByteBuf>,
data_rx: Receiver<Bytes>,
done_rx: Receiver<()>,
done: bool,
}

impl IntoStream {
pub fn new(r: StreamReadable) -> Self {
let readable = r.as_raw();
let (data_tx, data_rx) = unbounded::<serde_bytes::ByteBuf>();
let (data_tx, data_rx) = unbounded::<Bytes>();
let (done_tx, done_rx) = unbounded::<()>();

let data_cb = Closure::wrap(Box::new(move |d: JsValue| {
Expand Down Expand Up @@ -138,7 +139,7 @@ impl IntoStream {
}

impl Stream for IntoStream {
type Item = serde_bytes::ByteBuf;
type Item = Bytes;

// poll_next is the main function that drives the stream. It is called by the runtime to
// read the data in the Readable, and return it as a JsValue.
Expand Down
7 changes: 2 additions & 5 deletions crates/dwn-rs-wasm/src/surrealdb/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ impl SurrealDataStore {
cid: &str,
value: Readable,
) -> Result<DataStorePutResult, JsValue> {
let readable = StreamReadable::new(value).into_stream().flat_map(|r| {
let val = serde_wasm_bindgen::to_value(&r).unwrap();
async_std::stream::from_iter(js_sys::Uint8Array::new(&val).to_vec())
});
let readable = StreamReadable::new(value).into_stream();

match self
.store
Expand Down Expand Up @@ -92,7 +89,7 @@ impl SurrealDataStore {
let reader = v
.data
.chunks(READ_CHUNK_SIZE)
.map(|r| Some(serde_bytes::ByteBuf::from(r)));
.map(|r| Some(r.to_vec().into()));

let obj: DataStoreGetResult = JsCast::unchecked_into(Object::new());
Reflect::set(&obj, &"dataSize".into(), &size.into())?;
Expand Down

0 comments on commit ba5a662

Please sign in to comment.