-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow writing dataframes that are either a subset of table schema or in arbitrary order #829
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, sorry for the late reply. Feel free to ping me more aggressively.
This initial version looks good, thanks for working on this. One important thing I would love to see fixed in here as well. How about re-aligning the table before we write, otherwise we have to do all of this when reading. Most tables have far fewer writes than reads, so it is good to optimize for reads. I was hoping to re-use to_requested_schema
for this. WDYT?
pyiceberg/table/__init__.py
Outdated
Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. | ||
The schemas are compatible if: | ||
- All fields in `other_schema` are present in `table_schema`. (other_schema <= table_schema) | ||
- All required fields in `table_schema` are present in `other_schema`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads up, with V3 this changes since it is allowed to add required fields with a default value.
pyiceberg/table/__init__.py
Outdated
except ValueError as e: | ||
other_schema = _pyarrow_to_schema_without_ids(other_schema) | ||
additional_names = set(other_schema.column_names) - set(table_schema.column_names) | ||
raise ValueError( | ||
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." | ||
) from e | ||
|
||
if table_schema.as_struct() != task_schema.as_struct(): | ||
fields_missing_from_table = {field for field in other_schema.fields if field not in table_schema.fields} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check if it handles nested structs as well?
No worries at all, I forgot to ping about this PR
Can you talk a bit more about "re-aligning"? Is it to match the parquet schema with that of Iceberg's? Is the idea to do so for the entire arrow table before writing? If so, maybe we can push the |
e87a59a
to
19598c3
Compare
I still get the |
@pdpark can you share the iceberg table schema and the pyarrow table schema? |
I can't share the schemas, but it's just a few fields with simple
BTW: this function made it difficult to debug because printing the schemas invoked |
FYI: here's the
Note that the |
Let me give two examples: Out of order
It is fine to write a parquet file to this table with:
When the table is being read, the columns are re-ordered by CastingThe same goes for casting:
It is fine to write:
The upcasting to a long will be done when the data is being read, but it is less efficient since we first let Arrow read the data as an int, and then it will do the cast to long in |
19598c3
to
2ce6db3
Compare
@@ -2053,7 +2055,10 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down | |||
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." | |||
) from e | |||
|
|||
if table_schema.as_struct() != task_schema.as_struct(): | |||
fields_missing_from_table = {field for field in other_schema.fields if field not in table_schema.fields} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't work for nested structs, need a better solution
@@ -484,10 +484,6 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) | |||
_check_schema_compatible( | |||
self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | |||
) | |||
# cast if the two schemas are compatible but not equal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syun64 I want to get your take on this part. Due to the timestamp change, do you know if the df
need to be casted?
There are a couple of different parts involved in the write path. In particular, we need to look at the table schema, the df schema, and the df itself. As well as dealing with bin-packing and other transformations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to extract this convo into an issue, to also continue the convo from #786 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syun64 I want to get your take on this part. Due to the timestamp change, do you know if the
df
need to be casted? There are a couple of different parts involved in the write path. In particular, we need to look at the table schema, the df schema, and the df itself. As well as dealing with bin-packing and other transformations.
I have a PR open to try to fix this behavior: #910 I think it's almost ready to merge 😄
Fixed in #921 |
Fixes #674
This PR does the following
test_
prefix)large_*
types #807 is the proper fix)_check_schema_compatible
function