Skip to content

Commit

Permalink
Change DataScan to accept Metadata and io
Browse files Browse the repository at this point in the history
For the partial deletes I want to do a scan on in
memory metadata. Changing this API allows this.
  • Loading branch information
Fokko committed Apr 5, 2024
1 parent 4148edb commit aadc89c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 105 deletions.
26 changes: 14 additions & 12 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

if TYPE_CHECKING:
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table import FileScanTask

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1034,7 +1034,8 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic

def project_table(
tasks: Iterable[FileScanTask],
table: Table,
table_metadata: TableMetadata,
io: FileIO,
row_filter: BooleanExpression,
projected_schema: Schema,
case_sensitive: bool = True,
Expand All @@ -1044,7 +1045,8 @@ def project_table(
Args:
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
table (Table): The table that's being queried.
table_metadata (TableMetadata): The table metadata of the table that's being queried
io (FileIO): A FileIO to open streams to the object store
row_filter (BooleanExpression): The expression for filtering rows.
projected_schema (Schema): The output schema.
case_sensitive (bool): Case sensitivity when looking up column names.
Expand All @@ -1053,24 +1055,24 @@ def project_table(
Raises:
ResolveError: When an incompatible query is done.
"""
scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
if isinstance(table.io, PyArrowFileIO):
fs = table.io.fs_by_scheme(scheme, netloc)
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
if isinstance(io, PyArrowFileIO):
fs = io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO

if isinstance(table.io, FsspecFileIO):
if isinstance(io, FsspecFileIO):
from pyarrow.fs import PyFileSystem

fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

projected_field_ids = {
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
Expand All @@ -1089,7 +1091,7 @@ def project_table(
deletes_per_file.get(task.file.file_path),
case_sensitive,
limit,
table.name_mapping(),
None,
)
for task in tasks
]
Expand Down
64 changes: 28 additions & 36 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,8 @@ def scan(
limit: Optional[int] = None,
) -> DataScan:
return DataScan(
table=self,
table_metadata=self.metadata,
io=self.io,
row_filter=row_filter,
selected_fields=selected_fields,
case_sensitive=case_sensitive,
Expand Down Expand Up @@ -1462,7 +1463,8 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression:


class TableScan(ABC):
table: Table
table_metadata: TableMetadata
io: FileIO
row_filter: BooleanExpression
selected_fields: Tuple[str, ...]
case_sensitive: bool
Expand All @@ -1472,15 +1474,17 @@ class TableScan(ABC):

def __init__(
self,
table: Table,
table_metadata: TableMetadata,
io: FileIO,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
):
self.table = table
self.table_metadata = table_metadata
self.io = io
self.row_filter = _parse_row_filter(row_filter)
self.selected_fields = selected_fields
self.case_sensitive = case_sensitive
Expand All @@ -1490,19 +1494,20 @@ def __init__(

def snapshot(self) -> Optional[Snapshot]:
if self.snapshot_id:
return self.table.snapshot_by_id(self.snapshot_id)
return self.table.current_snapshot()
return self.table_metadata.snapshot_by_id(self.snapshot_id)
return self.table_metadata.current_snapshot()

def projection(self) -> Schema:
current_schema = self.table.schema()
current_schema = self.table_metadata.schema()
if self.snapshot_id is not None:
snapshot = self.table.snapshot_by_id(self.snapshot_id)
snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
if snapshot is not None:
if snapshot.schema_id is not None:
snapshot_schema = self.table.schemas().get(snapshot.schema_id)
if snapshot_schema is not None:
current_schema = snapshot_schema
else:
try:
current_schema = next(
schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
)
except StopIteration:
warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
else:
raise ValueError(f"Snapshot not found: {self.snapshot_id}")
Expand All @@ -1528,7 +1533,7 @@ def update(self: S, **overrides: Any) -> S:
def use_ref(self: S, name: str) -> S:
if self.snapshot_id:
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
if snapshot := self.table.snapshot_by_name(name):
if snapshot := self.table_metadata.snapshot_by_name(name):
return self.update(snapshot_id=snapshot.snapshot_id)

raise ValueError(f"Cannot scan unknown ref={name}")
Expand Down Expand Up @@ -1620,33 +1625,21 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent


class DataScan(TableScan):
def __init__(
self,
table: Table,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
):
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id])
return project(self.row_filter)

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table.specs()[spec_id]
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table.specs()[spec_id]
partition_type = spec.partition_type(self.table.schema())
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

Expand Down Expand Up @@ -1681,16 +1674,14 @@ def plan_files(self) -> Iterable[FileScanTask]:
if not snapshot:
return iter([])

io = self.table.io

# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(io)
for manifest_file in snapshot.manifests(self.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

Expand All @@ -1699,7 +1690,7 @@ def plan_files(self) -> Iterable[FileScanTask]:

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = _InclusiveMetricsEvaluator(
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval

min_data_sequence_number = _min_data_file_sequence_number(manifests)
Expand All @@ -1713,7 +1704,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
lambda args: _open_manifest(*args),
[
(
io,
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
metrics_evaluator,
Expand Down Expand Up @@ -1749,7 +1740,8 @@ def to_arrow(self) -> pa.Table:

return project_table(
self.plan_files(),
self.table,
self.table_metadata,
self.io,
self.row_filter,
self.projection(),
case_sensitive=self.case_sensitive,
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ def new_snapshot_id(self) -> int:

return snapshot_id

def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
"""Return the snapshot referenced by the given name or null if no such reference exists."""
if ref := self.refs.get(name):
return self.snapshot_by_id(ref.snapshot_id)
return None

def current_snapshot(self) -> Optional[Snapshot]:
"""Get the current snapshot for this table, or None if there is no current snapshot."""
if self.current_snapshot_id is not None:
Expand Down
93 changes: 36 additions & 57 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import pytest
from pyarrow.fs import FileType, LocalFileSystem

from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import (
AlwaysFalse,
Expand Down Expand Up @@ -72,7 +71,7 @@
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema, make_compatible_name, visit
from pyiceberg.table import FileScanTask, Table, TableProperties
from pyiceberg.table import FileScanTask, TableProperties
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
Expand Down Expand Up @@ -876,7 +875,7 @@ def project(
schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None
) -> pa.Table:
return project_table(
[
tasks=[
FileScanTask(
DataFile(
content=DataFileContent.DATA,
Expand All @@ -889,21 +888,16 @@ def project(
)
for file in files
],
Table(
("namespace", "table"),
metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[table_schema or schema],
partition_specs=[PartitionSpec()],
),
metadata_location="file://a/b/c.json",
io=PyArrowFileIO(),
catalog=NoopCatalog("NoopCatalog"),
table_metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[table_schema or schema],
partition_specs=[PartitionSpec()],
),
expr or AlwaysTrue(),
schema,
io=PyArrowFileIO(),
row_filter=expr or AlwaysTrue(),
projected_schema=schema,
case_sensitive=True,
)

Expand Down Expand Up @@ -1362,20 +1356,15 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp

with_deletes = project_table(
tasks=[example_task_with_delete],
table=Table(
("namespace", "table"),
metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
metadata_location=metadata_location,
io=load_file_io(),
catalog=NoopCatalog("noop"),
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
)
Expand Down Expand Up @@ -1405,20 +1394,15 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_

with_deletes = project_table(
tasks=[example_task_with_delete],
table=Table(
("namespace", "table"),
metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
metadata_location=metadata_location,
io=load_file_io(),
catalog=NoopCatalog("noop"),
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
)
Expand All @@ -1439,21 +1423,16 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None:
metadata_location = "file://a/b/c.json"
projection = project_table(
[example_task],
Table(
("namespace", "table"),
metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
metadata_location=metadata_location,
io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
catalog=NoopCatalog("NoopCatalog"),
tasks=[example_task],
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
format_version=2,
current_schema_id=1,
schemas=[table_schema_simple],
partition_specs=[PartitionSpec()],
),
io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
case_sensitive=True,
projected_schema=table_schema_simple,
row_filter=AlwaysTrue(),
Expand Down

0 comments on commit aadc89c

Please sign in to comment.