Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Writes implementation #305

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ pub mod properties;

#[cfg(feature = "async")]
pub mod fetch;

#[cfg(feature = "async")]
pub mod stream;
31 changes: 31 additions & 0 deletions src/common/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use futures::AsyncWrite;

pub struct WrappedWritableStream<'writer> {
pub stream: wasm_streams::writable::IntoAsyncWrite<'writer>,
}

impl<'writer> AsyncWrite for WrappedWritableStream<'writer> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
AsyncWrite::poll_write(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
AsyncWrite::poll_flush(std::pin::Pin::new(&mut self.get_mut().stream), cx)
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
AsyncWrite::poll_close(std::pin::Pin::new(&mut self.get_mut().stream), cx)
}
}

unsafe impl<'writer> Send for WrappedWritableStream<'writer> {}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub mod writer;
#[cfg(feature = "writer")]
pub mod writer_properties;

#[cfg(all(feature = "writer", feature = "async"))]
pub mod writer_async;

// When the `wee_alloc` feature is enabled, use `wee_alloc` as the global
// allocator.
/*#[cfg(feature = "wee_alloc")]
Expand Down
21 changes: 21 additions & 0 deletions src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,24 @@ pub async fn read_parquet_stream(
});
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())
}

#[wasm_bindgen(js_name = "transformParquetStream")]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here? Potentially include an example too? (The docstring will be included in the typescript-generated typedefs and seen by JS users)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include an example of how you can pass in a File handle? So you can write out to a Parquet file on disk without materializing the buffer in memory?

#[cfg(all(feature = "writer", feature = "async"))]
pub fn transform_parquet_stream(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is transform the usual name for writing to a stream? E.g. why is this not write_parquet_stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is transform the usual name for writing to a stream? E.g. why is this not write_parquet_stream?

I've had a bit of time to think this one through, and it largely boils down to convention/platform semantics, basically the use-case is split between three scenarios:

ReadableStream<RecordBatch> -> ReadableStream<ArrayBuffer>

The Streams spec preferences the pipeThrough approach, which involves any of:

  1. inputStream.pipeThrough(new TransformStream({transform(), start(), flush()})) - best for trivial transforms that occur in JS (e.g. uppercasing text)
  2. inputStream.pipeThrough(new TransformSubClassStream(...params)) - for more involved internals that merit defining a class, or browser-provided native subclasses (e.g. CompressionStream, TextDecoderStream).
  3. inputStream.pipeThrough({writable, readable}) - any object with a writable and readable property (both of the correct type). Useful when you can't explicitly subclass from TransformStream.

It turns out that the following design for a TransformStream works flawlessly (the second internal TransformStream seems to have near-zero overhead):

flowchart TB
 subgraph B1["TransformStream"]
    direction TB
        f1["readable"]
        i1["writable"]
  end
 subgraph B2["TransformStream"]
    direction TB
        f2["readable"]
        i2["writable"]
  end
 subgraph TOP["ParquetEncoderStream"]
    direction LR
        B1
        nm["Stream&lt;RecordBatch&gt;"]
        ny[["Loop:<br>Encode Parquet"]]
        B2
  end
    i1 --> f1
    i2 --> f2
    A["Input Stream:<br>ReadableStream&lt;RecordBatch&gt;"] -- pipeThrough --> TOP
    B1 -- dyn_into --> nm
    nm --> ny
    ny --> B2
    TOP --> B["Output Stream:<br>ReadableStream&lt;ArrayBuffer&gt;"]
    style i1 fill:#FFD600,color:#000000
    style f2 fill:#FFD600,color:#000000
Loading

(yellow blocks being wasm-bindgen members of the struct, accessible from JS)

Table -> ReadableStream<ArrayBuffer>

Quite similar to a callable equivalent of Response::body - you'd typically provide Table::stream(), Table::parquetStream() for method-based apis, or toParquetStream(Table) for functional apis.

Rust: Stream<RecordBatch> -> ReadableStream<ArrayBuffer>

For downstream rust dependencies that expose something along the lines of runPipeline(targetUrl), or foo_state_machine.toParquetStream() to JS, and need a means of getting the final rust stream of record batches back out to a readable stream of bytes. Used by the JS-facing transform stream internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think with all that said, we'd have 3 exports:

  • JS, ParquetEncoderStream, with usage inputStream.pipeThrough(new ParquetEncoderStream(writerProperties))
  • JS, writeParquetStream, with usage: writeParquetStream(inputTable)
  • RS, to_parquet_stream, with usage: to_parquet_stream(rs_stream_of_record_batches)

The sticking point on the last two exports is really that you can't do function overloads, and reasonably accurate distinguished function names would be too verbose (e.g. writeTableToParquetStream/writeTableParquetStream, writeStreamToParquetStream 😬 )

stream: wasm_streams::readable::sys::ReadableStream,
writer_properties: Option<crate::writer_properties::WriterProperties>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::StreamExt;
use wasm_bindgen::convert::TryFromJsValue;
let batches = wasm_streams::ReadableStream::from_raw(stream)
.into_stream()
.map(|maybe_chunk| {
let chunk = maybe_chunk.unwrap();
arrow_wasm::RecordBatch::try_from_js_value(chunk).unwrap()
H-Plus-Time marked this conversation as resolved.
Show resolved Hide resolved
});
let output_stream = super::writer_async::transform_parquet_stream(
batches,
writer_properties.unwrap_or_default(),
);
Ok(output_stream.unwrap())
}
38 changes: 38 additions & 0 deletions src/writer_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::common::stream::WrappedWritableStream;
use crate::error::Result;
use async_compat::CompatExt;
use futures::StreamExt;
use parquet::arrow::async_writer::AsyncArrowWriter;
use wasm_bindgen_futures::spawn_local;

pub fn transform_parquet_stream(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have a docstring here

batches: impl futures::Stream<Item = arrow_wasm::RecordBatch> + 'static,
writer_properties: crate::writer_properties::WriterProperties,
) -> Result<wasm_streams::readable::sys::ReadableStream> {
let options = Some(writer_properties.into());
// let encoding = writer_properties.get_encoding();
H-Plus-Time marked this conversation as resolved.
Show resolved Hide resolved

let (writable_stream, output_stream) = {
let raw_stream = wasm_streams::transform::sys::TransformStream::new().unwrap();
let raw_writable = raw_stream.writable();
let inner_writer = wasm_streams::WritableStream::from_raw(raw_writable).into_async_write();
let writable_stream = WrappedWritableStream {
stream: inner_writer,
};
(writable_stream, raw_stream.readable())
};
spawn_local::<_>(async move {
H-Plus-Time marked this conversation as resolved.
Show resolved Hide resolved
let mut adapted_stream = batches.peekable();
let mut pinned_stream = std::pin::pin!(adapted_stream);
let first_batch = pinned_stream.as_mut().peek().await.unwrap();
let schema = first_batch.schema().into_inner();
// Need to create an encoding for each column
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a helper that creates encodings given a WriterProperties instance plus an Arrow schema

let mut writer =
AsyncArrowWriter::try_new(writable_stream.compat(), schema, options).unwrap();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it hard to have proper error handling from within this async transform? Can we remove this unwrap and as many others as we can?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat - everything outside the while loop is surfaceable as part of the call itself (I've tied the return of the stream to successful writer creation).

Inside, that's somewhat trickier - ideally we would abort the stream, but that seems like it will take a fair amount of gymnastics.

while let Some(batch) = pinned_stream.next().await {
let _ = writer.write(&batch.into()).await;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have to assign the output to something? Can we have just writer.write(&batch.into()).await? Are these infallible APIs?

}
let _ = writer.close().await;
});
Ok(output_stream)
}
20 changes: 19 additions & 1 deletion tests/js/read-write.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as wasm from "../../pkg/node/parquet_wasm";
import { readFileSync } from "fs";
import { tableFromIPC, tableToIPC } from "apache-arrow";
import { testArrowTablesEqual, readExpectedArrowData } from "./utils";
import { testArrowTablesEqual, readExpectedArrowData, temporaryServer } from "./utils";
import { describe, it, expect } from "vitest";

// Path from repo root
Expand Down Expand Up @@ -89,3 +89,21 @@ it("reads empty file", async (t) => {
expect(table.numCols).toStrictEqual(0);
// console.log("empty table schema", table.schema);
});

it("read stream-write stream-read stream round trip (no writer properties provided)", async (t) => {
const server = await temporaryServer();
const listeningPort = server.addresses()[0].port;
const rootUrl = `http://localhost:${listeningPort}`;

const expectedTable = readExpectedArrowData();

const url = `${rootUrl}/1-partition-brotli.parquet`;
const originalStream = await wasm.readParquetStream(url);

const stream = await wasm.transformParquetStream(originalStream);
const accumulatedBuffer = new Uint8Array(await new Response(stream).arrayBuffer());
Comment on lines +103 to +104
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how would you write this to a file in Node? Can you pass the stream object to a node file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out to be quite ergonomic:

const destinationWritable = Writable.toWeb(handle.createWriteStream());
await outputStream.pipeTo(destinationWritable);

alternatively:

await handle.writeFile(outputStream)

Deno's version of the former is pretty succinct too:

await outputStream.pipeTo(handle.writable);

(there's ~10 different ways to do it (e.g. fs.writeFile(path, inputStream) is just a shortcut for explicitly creating a stream.Writable and piping to it), these strike a reasonable balance).

const roundtripTable = tableFromIPC(wasm.readParquet(accumulatedBuffer).intoIPCStream());

testArrowTablesEqual(expectedTable, roundtripTable);
await server.close();
})
Loading