Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Writes implementation #305

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ pub mod properties;

#[cfg(feature = "async")]
pub mod fetch;

#[cfg(feature = "async")]
pub mod stream;
31 changes: 31 additions & 0 deletions src/common/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use futures::AsyncWrite;

pub struct WrappedWritableStream<'writer> {
pub stream: wasm_streams::writable::IntoAsyncWrite<'writer>,
}

impl<'writer> AsyncWrite for WrappedWritableStream<'writer> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
AsyncWrite::poll_write(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
AsyncWrite::poll_flush(std::pin::Pin::new(&mut self.get_mut().stream), cx)
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
AsyncWrite::poll_close(std::pin::Pin::new(&mut self.get_mut().stream), cx)
}
}

unsafe impl<'writer> Send for WrappedWritableStream<'writer> {}
6 changes: 5 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow::error::ArrowError;
use parquet::errors::ParquetError;
use thiserror::Error;
use wasm_bindgen::JsError;
use wasm_bindgen::{JsError, JsValue};

#[derive(Error, Debug)]
pub enum ParquetWasmError {
Expand All @@ -15,6 +15,10 @@ pub enum ParquetWasmError {
#[cfg(feature = "async")]
#[error("HTTP error: `{0}`")]
HTTPError(Box<reqwest::Error>),
#[error("Platform error: `{0}`")]
PlatformSupportError(String),
#[error("Dyn casting error")]
DynCastingError(JsValue),
}

pub type Result<T> = std::result::Result<T, ParquetWasmError>;
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub mod writer;
#[cfg(feature = "writer")]
pub mod writer_properties;

#[cfg(all(feature = "writer", feature = "async"))]
pub mod writer_async;

// When the `wee_alloc` feature is enabled, use `wee_alloc` as the global
// allocator.
/*#[cfg(feature = "wee_alloc")]
Expand Down
91 changes: 91 additions & 0 deletions src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,94 @@ pub async fn read_parquet_stream(
});
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())
}

/// Transform a ReadableStream of RecordBatches to a ReadableStream of bytes
///
/// Browser example with piping to a file via the File System API:
///
/// ```js
/// import initWasm, {ParquetFile, transformParquetStream} from "parquet-wasm";
///
/// // Instantiate the WebAssembly context
/// await initWasm();
///
/// const fileInstance = await ParquetFile.fromUrl("https://example.com/file.parquet");
/// const recordBatchStream = await fileInstance.stream();
/// const serializedParquetStream = await transformParquetStream(recordBatchStream);
/// // NB: requires transient user activation - you would typically do this before ☝️
/// const handle = await window.showSaveFilePicker();
/// const writable = await handle.createWritable();
/// await serializedParquetStream.pipeTo(writable);
/// ```
///
/// NodeJS (ESM) example with piping to a file:
/// ```js
/// import { open } from "node:fs/promises";
/// import { Writable } from "node:stream";
/// import initWasm, {ParquetFile, transformParquetStream} from "parquet-wasm";
///
/// // Instantiate the WebAssembly context
/// await initWasm();
///
/// const fileInstance = await ParquetFile.fromUrl("https://example.com/file.parquet");
/// const recordBatchStream = await fileInstance.stream();
/// const serializedParquetStream = await transformParquetStream(recordBatchStream);
///
/// // grab a file handle via fsPromises
/// const handle = await open("file.parquet");
/// const destinationStream = Writable.toWeb(handle.createWriteStream());
/// await serializedParquetStream.pipeTo(destinationStream);
///
/// ```
/// NB: the above is a little contrived - `await writeFile("file.parquet", serializedParquetStream)`
/// is enough for most use cases.
///
/// Browser kitchen sink example - teeing to the Cache API, using as a streaming post body, transferring
/// to a Web Worker:
/// ```js
/// // prelude elided - see above
/// const serializedParquetStream = await transformParquetStream(recordBatchStream);
/// const [cacheStream, bodyStream] = serializedParquetStream.tee();
/// const postProm = fetch(targetUrl, {
/// method: "POST",
/// duplex: "half",
/// body: bodyStream
/// });
/// const targetCache = await caches.open("foobar");
/// await targetCache.put("https://example.com/file.parquet", new Response(cacheStream));
/// // this could have been done with another tee, but beware of buffering
/// const workerStream = await targetCache.get("https://example.com/file.parquet").body;
/// const worker = new Worker("worker.js");
/// worker.postMessage(workerStream, [workerStream]);
/// await postProm;
/// ```
///
/// @param stream A {@linkcode ReadableStream} of {@linkcode RecordBatch} instances
/// @param writer_properties (optional) Configuration for writing to Parquet. Use the {@linkcode
/// WriterPropertiesBuilder} to build a writing configuration, then call `.build()` to create an
/// immutable writer properties to pass in here.
/// @returns ReadableStream containing serialized Parquet data.
#[wasm_bindgen(js_name = "transformParquetStream")]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here? Potentially include an example too? (The docstring will be included in the typescript-generated typedefs and seen by JS users)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include an example of how you can pass in a File handle? So you can write out to a Parquet file on disk without materializing the buffer in memory?

#[cfg(all(feature = "writer", feature = "async"))]
pub async fn transform_parquet_stream(
stream: wasm_streams::readable::sys::ReadableStream,
writer_properties: Option<crate::writer_properties::WriterProperties>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::{StreamExt, TryStreamExt};
use wasm_bindgen::convert::TryFromJsValue;

use crate::error::ParquetWasmError;
let batches = wasm_streams::ReadableStream::from_raw(stream)
.into_stream()
.map(|maybe_chunk| {
let chunk = maybe_chunk?;
arrow_wasm::RecordBatch::try_from_js_value(chunk)
})
.map_err(ParquetWasmError::DynCastingError);
let output_stream = super::writer_async::transform_parquet_stream(
batches,
writer_properties.unwrap_or_default(),
)
.await;
Ok(output_stream?)
}
70 changes: 70 additions & 0 deletions src/writer_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::common::stream::WrappedWritableStream;
use crate::error::{ParquetWasmError, Result};
use async_compat::CompatExt;
use futures::channel::oneshot;
use futures::StreamExt;
use parquet::arrow::async_writer::AsyncArrowWriter;
use wasm_bindgen_futures::spawn_local;

pub async fn transform_parquet_stream(
batches: impl futures::Stream<Item = Result<arrow_wasm::RecordBatch>> + 'static,
writer_properties: crate::writer_properties::WriterProperties,
) -> Result<wasm_streams::readable::sys::ReadableStream> {
let options = Some(writer_properties.into());

let raw_stream = wasm_streams::transform::sys::TransformStream::new();
if let Ok(raw_stream) = raw_stream {
let (writable_stream, output_stream) = {
let raw_writable = raw_stream.writable();
let inner_writer =
wasm_streams::WritableStream::from_raw(raw_writable).into_async_write();
let writable_stream = WrappedWritableStream {
stream: inner_writer,
};
(writable_stream, raw_stream.readable())
};
// construct a channel for the purposes of signalling errors occuring at the start of the stream.
// Errors that occur during writing will have to fuse the stream.
let (sender, receiver) = oneshot::channel::<Result<()>>();
spawn_local(async move {
let mut adapted_stream = batches.peekable();
let mut pinned_stream = std::pin::pin!(adapted_stream);
let first_batch = pinned_stream.as_mut().peek().await;
if let Some(Ok(first_batch)) = first_batch {
let schema = first_batch.schema().into_inner();
let writer = AsyncArrowWriter::try_new(writable_stream.compat(), schema, options);
match writer {
Ok(mut writer) => {
// unblock the calling thread's receiver (indicating that stream initialization was error-free)
let _ = sender.send(Ok(()));
while let Some(batch) = pinned_stream.next().await {
if let Ok(batch) = batch {
let _ = writer.write(&batch.into()).await;
}
}
let _ = writer.close().await;
}
Err(err) => {
let _ = sender.send(Err(ParquetWasmError::ParquetError(Box::new(err))));
}
}
} else if let Some(Err(err)) = first_batch {
let _ = sender.send(Err(ParquetWasmError::DynCastingError(
err.to_string().into(),
)));
} else {
let _ = sender.send(Err(ParquetWasmError::DynCastingError(
"null first batch".to_string().into(),
)));
}
});
match receiver.await.unwrap() {
Ok(()) => Ok(output_stream),
Err(err) => Err(err),
}
} else {
Err(ParquetWasmError::PlatformSupportError(
"Failed to create TransformStream".to_string(),
))
}
}
20 changes: 19 additions & 1 deletion tests/js/read-write.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as wasm from "../../pkg/node/parquet_wasm";
import { readFileSync } from "fs";
import { tableFromIPC, tableToIPC } from "apache-arrow";
import { testArrowTablesEqual, readExpectedArrowData } from "./utils";
import { testArrowTablesEqual, readExpectedArrowData, temporaryServer } from "./utils";
import { describe, it, expect } from "vitest";

// Path from repo root
Expand Down Expand Up @@ -89,3 +89,21 @@ it("reads empty file", async (t) => {
expect(table.numCols).toStrictEqual(0);
// console.log("empty table schema", table.schema);
});

it("read stream-write stream-read stream round trip (no writer properties provided)", async (t) => {
const server = await temporaryServer();
const listeningPort = server.addresses()[0].port;
const rootUrl = `http://localhost:${listeningPort}`;

const expectedTable = readExpectedArrowData();

const url = `${rootUrl}/1-partition-brotli.parquet`;
const originalStream = await wasm.readParquetStream(url);

const stream = await wasm.transformParquetStream(originalStream);
const accumulatedBuffer = new Uint8Array(await new Response(stream).arrayBuffer());
Comment on lines +103 to +104
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how would you write this to a file in Node? Can you pass the stream object to a node file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out to be quite ergonomic:

const destinationWritable = Writable.toWeb(handle.createWriteStream());
await outputStream.pipeTo(destinationWritable);

alternatively:

await handle.writeFile(outputStream)

Deno's version of the former is pretty succinct too:

await outputStream.pipeTo(handle.writable);

(there's ~10 different ways to do it (e.g. fs.writeFile(path, inputStream) is just a shortcut for explicitly creating a stream.Writable and piping to it), these strike a reasonable balance).

const roundtripTable = tableFromIPC(wasm.readParquet(accumulatedBuffer).intoIPCStream());

testArrowTablesEqual(expectedTable, roundtripTable);
await server.close();
})
Loading