Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struggling to merge two parquet files #591

Closed
pellicceama opened this issue Aug 30, 2024 · 22 comments
Closed

Struggling to merge two parquet files #591

pellicceama opened this issue Aug 30, 2024 · 22 comments

Comments

@pellicceama
Copy link

pellicceama commented Aug 30, 2024

This is not a bug report on the library but I could really use some help with understanding how writes work.

I'm figuring out the simplest way to add new records to an existing parquet file. This would serve two use cases: merge two smaller files together OR to add new data to an existing one.

This is meant to run on a Cloudflare worker, so parquet-wasm is ideal, particularly with the upcoming async writer. I can implement reads and writes in streams and process data that exceeds the worker's available memory limitations. I made the reads from remote files and wrote to S3 work pretty quickly.

https://gist.github.com/pellicceama/35b1f59781f6481fa5c93036dbea44f0

However, since then, I've spent two working days figuring out how to append and add new data without success. I'm new to the Rust + Wasm + Arrrow world and would love some pointers!

I've tried instantiating a parquet-wasm RecordBatch from scratch or an arrow.RecordBatch so I can concatenate it to the end of the write stream without much success. Something I haven't tried yet is converting the arrow.RecordBatch to an array of bytes and simply appending that to the read stream's reader at the end, but I can't imagine that will work.

Do you have any advice on the best way to do this?

@kylebarron
Copy link
Owner

I'm figuring out the simplest way to add new records to an existing parquet file. This would serve two use cases: merge two smaller files together OR to add new data to an existing one.

It's not possible to add new records to an existing Parquet file. Parquet files are immutable.

I made the reads from remote files and wrote to S3 work pretty quickly.

It's good that the read stream is working; I've never used the streaming reader myself.

concatenate it to the end of the write stream without much success

I don't fully understand how these streams work here and its interaction with JS. At the end of the day, it becomes an async iterator on the Rust side. Once the async iterator is done, the Rust writer closes the file and writes the footer. Once that happens, it's not possible to mutate that Parquet file. So presumably you need to find a way to extend the stream without letting Rust close the file.

Or in the future we could have something like a ParquetWriter class to give you more control over when the footer is written.

@pellicceama
Copy link
Author

pellicceama commented Sep 3, 2024

Thanks, Kyle.

So presumably you need to find a way to extend the stream without letting Rust close the file.
This is totally possible. The challenge I have is that I can't seem to be able to construct / instantiate a RecordBatch object to pass to the existing writer.

So for example I can do

const fileInstance = await ParquetFile.fromUrl(url);
// gives me a stream of RecordBatch
const readStream = await fileInstance.stream();
// puts it in Bytes stream
const writeStream = await transformParquetStream(readStream);

So in theory if I could just instantiate RecordBatches on demand I could write it at the end of the read stream through before piping it to the write stream... I tried creating them in arrow but just found no way to construct the WASM RecordBatch object. Any ideas on how to do that? :)

Otherwise I'd need to parse it and put it in a buffer which means the entire file needs to be in memory, like in
https://observablehq.com/@kimmolinna/parquet-wasm

@pellicceama
Copy link
Author

@H-Plus-Time I saw your PR implementing the writer and this implementation relies on your work there. I also noted your comment here RE: creating a TransformStream of some kind.

Is there any advice you could give here for folks that want to use that interface beyond writing something that's been read.

@H-Plus-Time
Copy link
Contributor

Yeah, the transformParquetStream function creates a TransformStream internally, the only part that's lacking (on which I stalled after encountering some particularly nasty bugs) is a stream constructor (rather than, or in addition to, functional) interface.

Something like the following would do for the merging usecase:

function combineStreams(...streams) {
    const { readable, writable } = new TransformStream();

    (async () => {
        for (const stream of streams) {
            // Pipe the current stream to the writable side of the TransformStream
            console.log('piping stream...');
            await stream.pipeTo(writable, { preventClose: true });
        }
        writable.getWriter().close();
    })();

    return readable;
}

const readStreams = await Promise.all(sourceUrls.map(async source => {
    const instance = await ParquetFile.fromUrl(source);
    return instance.stream();
}));
console.log('readStreams constructed');

// separated to confirm the combineStreams function completes before reading is actually finished
const intermediate = combineStreams(...readStreams);
console.log('combined stream', intermediate);

const outputStream = await transformParquetStream(intermediate);
console.log('outputStream', outputStream);
// sink that byte stream to wherever

You'd need to make sure the schemas are identical, and be somewhat careful with row group sizes - transformParquetStream will pull up to it's configured max row group size (defaults to ~65k) before backpressure kicks in, so your peak memory usage will be ~width of each row * row group size.

@H-Plus-Time
Copy link
Contributor

As for the newly constructed data usecase 🤔 , with a slight tweak in arrow-wasm, this would work:

// grab an IPC buf from apache-arrow (or duckdb-wasm, etc)
const buf = tableToIPC(externalTable);
// pull it into a wasm Table
const intermediate = parquetMod.Table.fromIPCStream(buf);
// get an array of record batches and wrap that in a ReadableStream
// via ReadableStream.from
const newBatchesStream = ReadableStream.from(intermediate.recordBatches());
// include that in the list of streams passed into combineStreams
const outputStream = await transformParquetStream(combineStreams(...others, newBatchesStream));

The recordBatches method is really the missing link there (that or a symmetric from_ipc_stream on RecordBatch) .

Ideally you'd want to compare schemas at runtime, right after you've instantiated ParquetFile - the data's there, it just isn't exposed yet.

P.S. ReadableStream.from is very new, though it's mainly just syntactic sugar for what's described in https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_an_iterator_or_async_iterator_to_a_stream.

@pellicceama
Copy link
Author

Amazing! I didn't know the .recordBatches() API existed. Thank you! Will reopen if its useful for readers to see follow up questions and will also post back the OSS implementation of this on https://github.com/evefancom/evefan when it goes in

@pellicceama
Copy link
Author

Update: Stream merge + write worked like a charm!

Cloudflare R2 has a weird requirement that the stream be fixed length, so I buffer the merged file at the end to count it. I didn't spend too much time on it as I was just prototyping, but we can probably get around it. If I didn't put it in a chunks array, it'd sometimes fall out of order when uploading.

https://gist.github.com/pellicceama/1ec95f1f2f1dc4d1265a54c55496ca8c

@pellicceama
Copy link
Author

As for the newly constructed data usecase 🤔 , with a slight tweak in arrow-wasm, this would work:

// grab an IPC buf from apache-arrow (or duckdb-wasm, etc)
const buf = tableToIPC(externalTable);
// pull it into a wasm Table
const intermediate = parquetMod.Table.fromIPCStream(buf);
// get an array of record batches and wrap that in a ReadableStream
// via ReadableStream.from
const newBatchesStream = ReadableStream.from(intermediate.recordBatches());
// include that in the list of streams passed into combineStreams
const outputStream = await transformParquetStream(combineStreams(...others, newBatchesStream));

The recordBatches method is really the missing link there (that or a symmetric from_ipc_stream on RecordBatch) .

Ideally you'd want to compare schemas at runtime, right after you've instantiated ParquetFile - the data's there, it just isn't exposed yet.

P.S. ReadableStream.from is very new, though it's mainly just syntactic sugar for what's described in https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_an_iterator_or_async_iterator_to_a_stream.

@kylebarron I wonder if this is a use case that the library should support without having to import arrow-wasm separately.

@H-Plus-Time
Copy link
Contributor

As for the newly constructed data usecase 🤔 , with a slight tweak in arrow-wasm, this would work:

@kylebarron I wonder if this is a use case that the library should support without having to import arrow-wasm separately.

The record batches exposed by parquet-wasm come from that library (no new type wrappers or anything like that, they're exactly the same type). The tweak I talked about is merged upstream as of a day or two ago, but we haven't repinned this repo's dependency on it just yet.

There's an upcoming 0.7.0 release, I'm aiming to get that and probably the schema exposure in that version.

P.S. interoperation between different wasm module instances is fraught with showstoppers (E.g. attempting to pass a struct in module instance A to module instance B, even if the source of the modules is identical, doesn't work (they're essentially in two completely different memory address spaces)).

@pellicceama
Copy link
Author

@H-Plus-Time as a tag along to this thread, what would be the most efficient way to free up record batches that from from a ParquetFile.fromUrl that is them stream()ed and passed to the transform async writer?

Im seeing that I can call free() on the file after it’s been streamed but I try to call free() on the individual record batches after they are read by the async writer and I run into issues..

Any advice would be appreciated 😊

@H-Plus-Time
Copy link
Contributor

You don't have to/can't - transform_parquet_stream calls try_from_js_value on each RecordBatch in the incoming stream, which takes ownership over the RecordBatch (so normal rust borrow checker behaviour applies, wherein the batch is freed once it's written to the internal parquet writer). write_parquet works like this too (ignoring the destroy into raw effects (which neuter the JS-side object), the memory occupied by the table is nonsense after write_parquet).

My guess, going off the gist, is the retained chunks are the culprit - your max memory is tied to your total output file size + baseline memory (typically 10MB). A combination of a chunking transform stream (set to 5MB) and R2's multi-part upload API (which only requires the parts (each chunk of the transform stream) to be of known size) should keep you below the memory limit (128MB, right, for cloudflare workers?).

@pellicceama
Copy link
Author

pellicceama commented Sep 16, 2024

I see I see. Thank you for confirming.

I'm seeing some weird behavior then, but it may have to do more with objects lingering in memory across invocations of the Cloudflare workerd runtime.

For instance, for the merging use case three files that total up to ~60mb I get up to 180mb of memory. The reason I suspect that this is on the rust side is that it breaks in production with a silent error whereas if I over allocate on the JS side I get a clear 'out of memory' error. To my read, the JS side is not keeping it in buffers (my 'prod' code is different than the simple buffered example in the gist).

Screenshot 2024-09-15 at 5 38 34 PM

It may be that after each run I need to nuke the WASM instance entirely. I'll doble check and revert.

the memory occupied by the table is nonsense after write_parquet).

Perhaps nonsense but not 'freed' by workerd? 🤔

Thank you for your support!

@kylebarron
Copy link
Owner

Perhaps nonsense but not 'freed' by workerd? 🤔

That's possible. The WebAssembly memory space is contained within the JS runtime. So when you free a wasm-bindgen object, you allow the Wasm allocator to use that space, but unless you delete the WebAssembly instance itself, you won't allow the JS garbage collector to reclaim the space.

@pellicceama
Copy link
Author

pellicceama commented Sep 16, 2024

but unless you delete the WebAssembly instance itself, you won't allow the JS garbage collector to reclaim the space.

Is there a way to do this? Something like the reverse of the current await initWasm(binary);

@kylebarron
Copy link
Owner

I've never tried to do it.

@pellicceama
Copy link
Author

pellicceama commented Sep 16, 2024

Tried a bunch of things and added detailed logging.

In short, for reasons beyond my understanding the stream approach in this gist is currently allocating twice as much memory as it 'should'. I get this error when trying to merge two ~25mb parquet files. I get the size of the parquet-wasm wasm instance via wasmInstance.memory.buffer.byteLength.

image

The same code works when I try to merge tiny files. I will keep an artificially lower limit for now and see if there's an across invocation memory leak, but I no longer think this is the reason.

Disclaimer: I know that some of the things I do there are a bit unconventional given the goals of keeping everything in streams, such as piping a fetch.response.blob() to ParquetFile.fromFile. I added comments to try to explain the logic (even though i'd love to figure out a way around it in future, its not a major blocker for now).

Some ideas, not mutually exclusive:

A parquetFile.free() and / or (record batch) chunk.free() are either not working or not working fast enough within workerd. I tried awaiting them but that doesn't help;
B parquetFile.stream() is unnecessarily duplicating the data? the 2x number seems suspect.

It's possible that Cloudflare's workerd is taking a little while to figure out that I'm out of memory and by the time that the stream reader from transformParquetStream starts reading, i'm basically already OOM. Note in the screenshot that it doesn't even get to properly read even the first chunk. Their documentation is pretty confusing on this error, but if we look at their workerd codebase it occurs when limitEnforcer->getLimitsExceeded() is truthy.

Is there anything I'm doing wrong regarding memory management to go beyond your suggestion? @H-Plus-Time

@kylebarron
Copy link
Owner

kylebarron commented Sep 16, 2024

A parquetFile.free() and / or (record batch) chunk.free() are either not working or not working fast enough within workerd. I tried awaiting them but that doesn't help;

It's possible that .free is not working correctly. I've never done memory profiling on it.

If you think that might be the culprit, you should try and set up some repro case outside of a stream. Though I'm not really sure how to test it.

@pellicceama
Copy link
Author

pellicceama commented Sep 16, 2024

@kylebarron, AFAIK, free doesn't seem to work for ParquetFile, but it seems to work for RecordBatch.

Methodology:

  • I get the size of the parquet-wasm wasm instance via wasmInstance.memory.buffer.byteLength.
  • I built the binary off from the latest main e03f096 using
wasm-pack build --target web --no-default-features --features snappy,async,reader,writer,debug
  • Ran it on nodejs with node index.js and not in a cloudflare worker.

1. Parquet file not freeing resources example:

https://gist.github.com/pellicceama/395e5a2fa7d6a579517874a72c01ba44

Output:

Initial memory post loading wasm 1.5625 MB
File instance created, memory = 1.625 MB
Stream created, memory = 1.625 MB
File instance freed, memory = 1.625 MB
Memory 10 seconds after freeing file instance = 1.625 MB

Also, perhaps more concerning this example, stream downloading and writing to disk a 20mb file increases memory to 155mb. The compression settings of the new file are different than the old one, so it would've made sense to have a ~2x increase but ~7.5x was unexpected. By using a writter properties that matches the read file ones in the cloudflare worker this margin was closer to 2-3x though.

2. Stream reader/writer overallocating example:

https://gist.github.com/pellicceama/56e98df9ad8281c409ec010456e7c67e

Output:

Initial memory post loading wasm 1.5625 MB
File instance created, memory = 1.625 MB
Stream created, memory = 1.625 MB
File instance freed, memory = 1.625 MB
transformParquetStream 1
transformParquetStream 2
Stream written to file successfully
Memory 10 seconds after = 155.375 MB

--
It seems like as @H-Plus-Time pointed out, after the transformParquetStream goes through the RecordBatch it seems to be freed correctly.

An error occurred: Error: null pointer passed to rust
    at imports.wbg.__wbindgen_throw (file:///xxx/evefan/wasm-memory/parquet-wasm/parquet_wasm.js:3470:9)
    at wasm://wasm/0113bb1e:wasm-function[6519]:0x3cc25b
    at wasm://wasm/0113bb1e:wasm-function[6520]:0x3cc268
    at wasm://wasm/0113bb1e:wasm-function[2067]:0x2e7c99
    at wasm://wasm/0113bb1e:wasm-function[4220]:0x39a26f
    at RecordBatch.free (file:///xxx/evefan/wasm-memory/parquet-wasm/parquet_wasm.js:1717:8)

In summary, I think there are two bugs

  1. ParquetFile.free() does not appear to be working.
  2. Unclear why reading requires so much more memory than the file itself. Much more concerning is that without a way to let go / flush existing resources it appears to keep growing.

image

Reopening issue but feel free to ask me to split off.

Usual disclaimer: I'm new to the rust / wasm / arrow world. I'd appreciate any advice on the matter, I'm happy to help fix or diagnose where we can and if anyone is open to it I'd be OK with sponsoring this work so that we can get stream reads and writes on Evefan with consistent memory usage and without exhausting the resources don't the worker. I'm on @pellicceama on twitter if easier to DM!

@pellicceama pellicceama reopened this Sep 16, 2024
@H-Plus-Time
Copy link
Contributor

H-Plus-Time commented Sep 17, 2024

Alright, I've done a bit of digging. First, here's the code that eliminates more or less all the memory issues:

const numRows = fileInstance.metadata().fileMetadata().numRows();
// pick a mini-batch size
const MINI_BATCH_SIZE = 2 ** 17; // ~131k, more or less the threshold chosen by duckdb
const numMiniBatches = Math.ceil(numRows / MINI_BATCH_SIZE);
// presumably neater to do this as a second async generator
const miniStreams = await Promise.all(new Array({length: numMiniBatches}).map((_, index) => (
  fileInstance.stream({limit: MINI_BATCH_SIZE, offset: index * MINI_BATCH_SIZE})
)));
const transformedStream = await transformParquetStream(combineStreams(miniStreams));

For me, that kept wasm memory at 38MB.


Investigation

So, digging around a bit in the metadata of that file, there's ~3 sizes, affecting different stages of the process:

  1. Uncompressed on-disk file size - ~21MB.
  2. Per-row group just before row group decoding (i.e. after decompressing all columns in the current row group) -> ~24MB for the first row group, negligible for the last one.
  3. Per-row group after row group decoding -> ~72MB ❗ (the second one is tiny, but the largest row group is what matters).

While the rust-side allocator can (and does) free memory, the WebAssembly.Memory object itself cannot shrink - a momentary peak in allocated memory inside wasm results in a permanent increase in host-allocated memory (until the wasm memory.discard proposal gets standardized/implemented). Parquet readers are pretty much all designed to treat individual columns within individual row groups as the smallest unit during decoding; if there are no column or row filters, that smallest unit becomes the entire row group (so at least 72MB, likely with a fraction of that in intermediate buffers). The below got me ~109MB peak memory:

const partialTable = readParquet(fileBlob, {rowGroups: [0]});
partialTable.free(); // de-allocated in the wasm address space
// in wasm, the allocator sees a 109MB address space with ~2-3MB allocated.
// running the above in a loop (freeing after each readParquet call) won't result in
// additional memory.grow calls, but (because they don't exist yet in any wasm hosts)
// also won't result in any memory.discard calls.

The rest of the ~155MB footprint, I'm tipping is from the allocator's overalloc heuristic (highly likely that it doubles the number of pages it grows the memory by on every alloc that requires it, and halves it on every alloc that doesn't).

Dropping the row group size (down to, say, 100k) is usually the solution (especially if you've got very wide column sets, or large high variation string columns), but in this case the (tiny) overhead of row filters is a better tradeoff.

P.S. ParquetFile.free doesn't free a lot, best case scenario you'll free up the reader struct + metadata (~20k for this file) if you don't have a stream that hasn't been fully consumed.

@pellicceama
Copy link
Author

Thank you! We still have a lot to optimize and test before scaling this for larger files, so we set the minimum merge threshold as low while we understand better Cloudflares limits and WASM runtime.

Here's the PR: openintegrations/openint#25

I appreciate your support! Mentioned you here:

https://evefan.com/blog/s3-hive-parquet-duckdb-support

@kylebarron
Copy link
Owner

evefan.com/blog/s3-hive-parquet-duckdb-support

November 23, 2024

writing from the future! 😉

@pellicceama
Copy link
Author

hahaha! 😂 Damn Strings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants