Skip to content

Commit

Permalink
use schema_to_pyarrow directly for backporting
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Mar 29, 2024
1 parent 56899e6 commit 93b61dd
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = table.schema().as_arrow()
file_schema = schema_to_pyarrow(table.schema())

fo = table.io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
Expand Down
12 changes: 8 additions & 4 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,10 +1053,12 @@ def append(self, df: pa.Table) -> None:
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

from pyiceberg.io.pyarrow import schema_to_pyarrow

_check_schema_compatible(self.schema(), other_schema=df.schema)
# cast if the two schemas are compatible but not equal
if self.schema().as_arrow() != df.schema:
df = df.cast(self.schema().as_arrow())
if schema_to_pyarrow(self.schema()) != df.schema:
df = df.cast(schema_to_pyarrow(self.schema()))

merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)

Expand Down Expand Up @@ -1091,10 +1093,12 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

from pyiceberg.io.pyarrow import schema_to_pyarrow

_check_schema_compatible(self.schema(), other_schema=df.schema)
# cast if the two schemas are compatible but not equal
if self.schema().as_arrow() != df.schema:
df = df.cast(self.schema().as_arrow())
if schema_to_pyarrow(self.schema()) != df.schema:
df = df.cast(schema_to_pyarrow(self.schema()))

merge = _MergingSnapshotProducer(
operation=Operation.OVERWRITE if self.current_snapshot() is not None else Operation.APPEND,
Expand Down

0 comments on commit 93b61dd

Please sign in to comment.