diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e701dd8717..02adda010f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1333,13 +1333,14 @@ def _task_to_table( return None -def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: +def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: deletes_per_file: Dict[str, List[ChunkedArray]] = {} unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) if len(unique_deletes) > 0: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( - lambda args: _read_deletes(*args), [(fs, delete) for delete in unique_deletes] + lambda args: _read_deletes(*args), + [(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1373,7 +1374,6 @@ def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem: class ArrowScan: _table_metadata: TableMetadata _io: FileIO - _fs: FileSystem _projected_schema: Schema _bound_row_filter: BooleanExpression _case_sensitive: bool @@ -1383,7 +1383,6 @@ class ArrowScan: Attributes: _table_metadata: Current table metadata of the Iceberg table _io: PyIceberg FileIO implementation from which to fetch the io properties - _fs: PyArrow FileSystem to use to read the files _projected_schema: Iceberg Schema to project onto the data files _bound_row_filter: Schema bound row expression to filter the data with _case_sensitive: Case sensitivity when looking up column names @@ -1401,7 +1400,6 @@ def __init__( ) -> None: self._table_metadata = table_metadata self._io = io - self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file self._projected_schema = projected_schema self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) self._case_sensitive = case_sensitive @@ -1441,7 +1439,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._fs, tasks) + deletes_per_file = _read_all_delete_files(self._io, tasks) executor = ExecutorFactory.get_or_create() def _table_from_scan_task(task: FileScanTask) -> pa.Table: @@ -1504,7 +1502,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._fs, tasks) + deletes_per_file = _read_all_delete_files(self._io, tasks) return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) def _record_batches_from_scan_tasks_and_deletes( @@ -1515,7 +1513,7 @@ def _record_batches_from_scan_tasks_and_deletes( if self._limit is not None and total_row_count >= self._limit: break batches = _task_to_record_batches( - self._fs, + _fs_from_file_path(task.file.file_path, self._io), task, self._bound_row_filter, self._projected_schema,