Skip to content

Commit

Permalink
Update ArrowScan to use different FileSystem per file
Browse files Browse the repository at this point in the history
  • Loading branch information
jiakai-li committed Dec 21, 2024
1 parent 7699a76 commit eb5e491
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,13 +1333,14 @@ def _task_to_table(
return None


def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args), [(fs, delete) for delete in unique_deletes]
lambda args: _read_deletes(*args),
[(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
Expand Down Expand Up @@ -1373,7 +1374,6 @@ def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
_fs: FileSystem
_projected_schema: Schema
_bound_row_filter: BooleanExpression
_case_sensitive: bool
Expand All @@ -1383,7 +1383,6 @@ class ArrowScan:
Attributes:
_table_metadata: Current table metadata of the Iceberg table
_io: PyIceberg FileIO implementation from which to fetch the io properties
_fs: PyArrow FileSystem to use to read the files
_projected_schema: Iceberg Schema to project onto the data files
_bound_row_filter: Schema bound row expression to filter the data with
_case_sensitive: Case sensitivity when looking up column names
Expand All @@ -1401,7 +1400,6 @@ def __init__(
) -> None:
self._table_metadata = table_metadata
self._io = io
self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file
self._projected_schema = projected_schema
self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
self._case_sensitive = case_sensitive
Expand Down Expand Up @@ -1441,7 +1439,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
executor = ExecutorFactory.get_or_create()

def _table_from_scan_task(task: FileScanTask) -> pa.Table:
Expand Down Expand Up @@ -1504,7 +1502,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

def _record_batches_from_scan_tasks_and_deletes(
Expand All @@ -1515,7 +1513,7 @@ def _record_batches_from_scan_tasks_and_deletes(
if self._limit is not None and total_row_count >= self._limit:
break
batches = _task_to_record_batches(
self._fs,
_fs_from_file_path(task.file.file_path, self._io),
task,
self._bound_row_filter,
self._projected_schema,
Expand Down

0 comments on commit eb5e491

Please sign in to comment.