Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge into / Upsert #402

Open
Fokko opened this issue Feb 9, 2024 · 15 comments
Open

Merge into / Upsert #402

Fokko opened this issue Feb 9, 2024 · 15 comments

Comments

@Fokko
Copy link
Contributor

Fokko commented Feb 9, 2024

Feature Request / Improvement

Have an API to efficiently perform an upsert

@sungwy sungwy added this to the PyIceberg 0.7.0 release milestone Feb 13, 2024
@corleyma
Copy link

corleyma commented Mar 15, 2024

To work well with some of the larger data usecases where folks are using PySpark today, I think this would need to play well with pyarrow streaming read/write functionality, so that one could do atomic upsert of batches without having to read all the data into memory at once.

I call this out because current write functionality works with pyarrow Tables, which are fully materialized in memory. Working with larger data might include making the pyiceberg write APIs work with Iterator[RecordBatch] and friends (as returned by pyarrow Datasets/Scanner) in addition to pyarrow Tables.

@sungwy
Copy link
Collaborator

sungwy commented Jun 12, 2024

Hi @corleyma - I opened up this PR to address your comment here by introducing a scan API that will return a RecordBatchReader. It's pending some resolutions with related issues, but it's almost complete. Would appreciate your feedback if you are interested in using this API 🙂

@Milias
Copy link

Milias commented Jul 31, 2024

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

@sungwy
Copy link
Collaborator

sungwy commented Jul 31, 2024

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

Hi @Milias you can create a table with a partition by following the documentation here on Creating a table.

I realize we could have had an explicit section on creating and writing to a partitioned table under Write to a Table section. Currently, we support partitioned writes for IdentityTransform and TimeTransform (Year, Month, Day, Hour) partitions. Please let us know if that works for you!

Sung

@Milias
Copy link

Milias commented Aug 1, 2024

Hey @sungwy, thanks for the quick answer!

I was already making use of writing support and indeed had seen that section of the documentation. Right now I prepared a very quick test of writing to a table partitioned with IdentityTransform. Then, .append and .overwrite work as expected, that is, they either append new data to the appropriate partitions and replace the whole table, respectively.

After this I'm left wondering how can individual partitions be replaced. Maybe this functionality is not yet supported, with writes to partitioned table being only the first step. To give an example of what I mean, taking the table from the example in the documentation:

import pyarrow as pa

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297, "index": 1},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989, "index": 2},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014, "index": 2},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

tbl.overwrite(df)

With table:

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
    NestedField(4, "index", IntegerType(), required=True),
)

partition_spec = PartitionSpec(   
    PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="index_partition"),
)

tbl = catalog.create_table("public.cities", schema=schema, partition_spec=partition_spec)

Then, if we add a few more rows:

df2 = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "Null Island", "lat": 0.0, "long": 0.0, "index": 3},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

Then, when doing tbl.overwrite(df2) I would like to have some way of indicating that partition with index = 2 should be left as-is.

It is very possible that I misunderstood the precise scope of write support to partitioned tables, since this issue #402 is still open. But in case that it is already possible to overwrite specific partitions, that's the piece of information I was searching for.

Thanks a lot again 😃

@sungwy
Copy link
Collaborator

sungwy commented Aug 1, 2024

I think you must be referring to dynamic overwrite / replace partition API that detects the partitions of the given input and replaces it. This feature is actually still in progress on this PR: #931

@Milias
Copy link

Milias commented Aug 1, 2024

That PR looks exactly like what I am asking for, yes! Thank you very much for pointing it out. I will keep an eye on it.

@ev2900
Copy link

ev2900 commented Aug 6, 2024

Any update on if there is an API for merge into / upsert?

@sungwy
Copy link
Collaborator

sungwy commented Aug 6, 2024

Hi @ev2900 - would using the overwrite feature by specifying the boolean expression on which to upsert work for your use case?

https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L479-L483

I realize we don't have an example of invoking an overwrite without the overwrite_filter specified. I'll raise an issue to track adding this explicitly into our API documentation. #1008

@ev2900
Copy link

ev2900 commented Aug 6, 2024

Let me take a look at this. It would be very helpful if there was an example

@sungwy
Copy link
Collaborator

sungwy commented Aug 6, 2024

@ev2900 agreed :) I've added that Issue above (#1008) to address that

@Minfante377
Copy link
Contributor

Any updates on this one? I'm good with overwrite + overwrite filters for now but for tables where columns are populated by different sources it would be awesome to have full MERGE INTO support and to be able to select which columns to update

@sungwy
Copy link
Collaborator

sungwy commented Sep 24, 2024

Hi @Minfante377 sorry for the delayed response, and thank you for the interest!

Unfortunately, this is still an open issue on PyIceberg with no assignee. MERGE INTO with the column matching semantics like:

MERGE INTO table t using (SELECT ...) s ON t.id = s.id

is unfortunately a bit complicated to support efficiently, so I've been trying to make time to look at it in depth. And unfortunately I haven't had the time for this specific issue.

Would you be interested in making a contribution?

@Minfante377
Copy link
Contributor

@sungwy sorry I've been really busy lately. Of course. I'll start taking a look at the code base and see where I can start to try to accomplish this. Thank you!

@tusharchou
Copy link

@sungwy I would like to pick this up 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants