Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Table.to_arrow_batch_reader to return RecordBatchReader instead of a fully materialized Arrow Table #786

Merged
merged 10 commits into from
Jun 21, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Jun 2, 2024

  • RecordBatchReader is preferable for memory constrained applications
  • update to_requested_schema to take pa.RecordBatch instead of pa.Table

Question: pyarrow schema is currently inconsistent - it either returns pyarrow schema without Parquet metadata, or it adds the Parquet metadata and returns the value in some cases - we should make this consistent across our table scans and update the unit tests accordingly.

Edit: Opened up the Issue here to discuss above question #788

@sungwy sungwy marked this pull request as draft June 2, 2024 02:31
@sungwy sungwy changed the title to_arrow_batches to support returning Iterator[Recordbatch] instead of a fully materialized Arrow Table Support Table.to_arrow_batches to return Iterator[Recordbatch] instead of a fully materialized Arrow Table Jun 2, 2024
@djouallah
Copy link

how about PyCapsuleInterface
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

@sungwy
Copy link
Collaborator Author

sungwy commented Jun 2, 2024

how about PyCapsuleInterface https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Hi @djouallah thank you for the suggestion, but I'm a bit confused here - what exactly did you have in mind for suggesting PyCapsule?

@djouallah
Copy link

sorry if I am confusing issue here, pyarrow dataset is awesome but still give substandard performance with Duckdb as statistics are not provided, particularly when doing joins reordering, my understanding PyCapsuleInterface fix that.
delta-io/delta-rs#1838

@sungwy
Copy link
Collaborator Author

sungwy commented Jun 3, 2024

Thank you for sharing the additional context @djouallah

That's interesting! Although we don't use a pyarrow dataset in PyIceberg yet. We use a PyArrow dataset Fragment to create a dataset Scanner for each data file, and then directly create a PyArrow table instead of pushing down that layer into Arrow and using a PyArrow Dataset that represents the entire table of many files. I believe this is due to many outstanding issues that are being tracked together here on this issue: #30

My understanding of PyCapsuleInterface is that it is an interface for passing data back and forth Python and C layer if we wanted to write our own implementations of handling Arrow data format in the C layer. This is potentially an option, but I think the long-term roadmap for doing something more efficient for PyIceberg is to invest into Rust Python bindings as @Fokko mentioned at the Iceberg Summit.

In the interim, this PR aims to just add an extra API that returns an Iterator[RecordBatch] or a RecordBatchReader instead of a Table so that we don't have to fully materialize the data into memory when we are scanning for data.

Please let me know what you think about these points, I want to make sure I'm understanding your suggestion correctly!

batches = _task_to_record_batches(
fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping
)
return pa.Table.from_batches(batches, schema=schema_to_pyarrow(projected_schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was exactly what I had in mind, looking good 👍

@sungwy sungwy marked this pull request as ready for review June 3, 2024 22:08
@sungwy sungwy requested a review from Fokko June 4, 2024 17:26
@sungwy sungwy changed the title Support Table.to_arrow_batches to return Iterator[Recordbatch] instead of a fully materialized Arrow Table Support Table.to_arrow_batch_reader to return RecordBatchReader instead of a fully materialized Arrow Table Jun 12, 2024
@sungwy sungwy mentioned this pull request Jun 12, 2024
else:
arrow_table = fragment_scanner.take(indices)
current_index = 0
batches = fragment_scanner.to_batches()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I review this, it occurs to me that it might be useful to expose options related to batching/read ahead, etc, that pyarrow accepts when constructing the scanner. See the pyarrow docs for more details.

Specifically, I think setting batch_size is probably something that ought to be tunable, since the memory pressure will be a function of batch size and the number and types of columns in the table.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion @corleyma I'll adopt this feedback when I make the next round of changes

else:
file_schema = table_schema

batches = task.record_batches
arrow_table = pa.Table.from_batches(batches)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, looking here, this forced materialization seems to preclude streaming writes, which would you may want if e.g. upserting large amounts of data. ParquetWriter can be used for streaming writes, so this seems unnecessary?

Copy link

@corleyma corleyma Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e., maybe we could do something like the following?:

def sanitize_batches(batches: Iterator[RecordBatch], table_schema: Schema, sanitized_schema: Schema) -> Iterator[RecordBatch]:
    if sanitized_schema != table_schema:
        for batch in batches:
            yield to_requested_schema(requested_schema=sanitized_schema, file_schema=table_schema, batch=batch)
    else:
        yield from batches

def write_parquet(task: WriteTask) -> DataFile:
    table_schema = task.schema

    # Check if schema needs to be transformed
    sanitized_schema = sanitize_column_names(table_schema)

    file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}'
    fo = io.new_output(file_path)
    
    with fo.create(overwrite=True) as fos:
        with pq.ParquetWriter(fos, schema=sanitized_schema.as_arrow(), **parquet_writer_kwargs) as writer:
            for sanitized_batch in sanitize_batches(task.record_batches, table_schema, sanitized_schema):
                writer.write_table(pa.Table.from_batches([sanitized_batch]), row_group_size=row_group_size)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I totally agree. I wanted to focus this PR on introducing the reader first, and then work on a subsequent PR to incorporate batches into writes. This just maintains the existing functionality while making use of the refactored to_requested_schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so the change here is the order of operations. We want to call to_requested_schema on each batch first before creating a pyarrow Table from those batches.

I wonder if we can push to_requested_schema up the stack since we already bin-pack batches before passing to WriteTask

for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)

Also in this #829, I wanted to introduce schema projection
https://github.com/apache/iceberg-python/pull/829/files#diff-23e8153e0fd497a9212215bd2067068f3b56fa071770c7ef326db3d3d03cee9bR2904

We can keep write_parquet very simple and just handle the writing, and preprocess all the batch/table level operations together.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great suggestion @kevinjqliu - good to see that our work is going to be converging naturally here.

I was hoping to focus on the new read API here and the necessary refactoring in the utility functions, and keep the changes to the write functions to a minimum.

I could incorporate these changes and continue the discussion on updating the write functions in a follow up PR. I think there’s much discussion that are worth continuing on that topic (can we avoid materializing an arrow table and write with record batches)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, ty!

Copy link
Collaborator Author

@sungwy sungwy Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review!

@sungwy sungwy added this to the PyIceberg 0.7.0 release milestone Jun 15, 2024
@sungwy
Copy link
Collaborator Author

sungwy commented Jun 19, 2024

@HonahX @Fokko - could I ask your review on this PR?

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late to the party here.

Thanks for working on this @syun64. This was also on my list, but thanks for picking it up! Let me know if you have any results to share on performance/memory-consumtion.

I left a few comments, but I think this is good. I believe we can simplify even more in _task_to_table by collecting batches there. We can do that in a separate PR, so also @kevinjqliu's work can continue in parallel.

@sungwy
Copy link
Collaborator Author

sungwy commented Jun 21, 2024

Sorry for being late to the party here.

Thanks for working on this @syun64. This was also on my list, but thanks for picking it up! Let me know if you have any results to share on performance/memory-consumtion.

I left a few comments, but I think this is good. I believe we can simplify even more in _task_to_table by collecting batches there. We can do that in a separate PR, so also @kevinjqliu's work can continue in parallel.

Thank you for the review @Fokko . I've adopted all your feedback. Thank you @corleyma and @kevinjqliu again for your reviews as well!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @syun64

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants