Skip to content

Commit

Permalink
Merge branch 'main' into location-providers
Browse files Browse the repository at this point in the history
  • Loading branch information
smaheshwar-pltr authored Dec 20, 2024
2 parents 00917e9 + dbcf65b commit c4e6be9
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 354 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ help: ## Display this help
install-poetry: ## Install poetry if the user has not done that yet.
@if ! command -v poetry &> /dev/null; then \
echo "Poetry could not be found. Installing..."; \
pip install --user poetry==1.8.4; \
pip install --user poetry==1.8.5; \
else \
echo "Poetry is already installed."; \
fi
Expand Down
20 changes: 11 additions & 9 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@
from pyiceberg.schema import Schema
from pyiceberg.types import FixedType, NestedField, UUIDType

spark = SparkSession.builder.getOrCreate()
# The configuration is important, otherwise we get many small
# parquet files with a single row. When a positional delete
# hits the Parquet file with one row, the parquet file gets
# dropped instead of having a merge-on-read delete file.
spark = (
SparkSession
.builder
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
.getOrCreate()
)

catalogs = {
'rest': load_catalog(
Expand Down Expand Up @@ -120,10 +130,6 @@
"""
)

# Partitioning is not really needed, but there is a bug:
# https://github.com/apache/iceberg/pull/7685
spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years")

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_positional_mor_deletes
Expand Down Expand Up @@ -168,10 +174,6 @@
"""
)

# Partitioning is not really needed, but there is a bug:
# https://github.com/apache/iceberg/pull/7685
spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years")

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes
Expand Down
24 changes: 12 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 0 additions & 25 deletions pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,9 @@
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.properties import property_as_int


class DeprecatedConstants:
@property
@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="DEFAULT_MAX_SNAPSHOT_AGE_MS is deprecated. Use TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT instead.",
)
def DEFAULT_MAX_SNAPSHOT_AGE_MS(self) -> int:
return 432000000

@property
@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="DEFAULT_MIN_SNAPSHOTS_TO_KEEP is deprecated. Use TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT instead.",
)
def DEFAULT_MIN_SNAPSHOTS_TO_KEEP(self) -> int:
return 1


DEFAULT_MIN_SNAPSHOTS_TO_KEEP = DeprecatedConstants().DEFAULT_MIN_SNAPSHOTS_TO_KEEP
DEFAULT_MAX_SNAPSHOT_AGE_MS = DeprecatedConstants().DEFAULT_MAX_SNAPSHOT_AGE_MS


def catch_exception() -> Callable: # type: ignore
def decorator(func: Callable) -> Callable: # type: ignore
@wraps(func)
Expand Down
8 changes: 4 additions & 4 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@


def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> AWSRequest:
if TOKEN not in properties:
raise SignError("Signer set, but token is not available")

signer_url = properties.get(S3_SIGNER_URI, properties["uri"]).rstrip("/")
signer_endpoint = properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)

signer_headers = {"Authorization": f"Bearer {properties[TOKEN]}"}
signer_headers = {}
if token := properties.get(TOKEN):
signer_headers = {"Authorization": f"Bearer {token}"}

signer_body = {
"method": request.method,
"region": request.context["client_region"],
Expand Down
183 changes: 1 addition & 182 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.deprecated import deprecated, deprecation_message
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
Expand Down Expand Up @@ -1532,187 +1532,6 @@ def _record_batches_from_scan_tasks_and_deletes(
total_row_count += len(batch)


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
)
def project_table(
tasks: Iterable[FileScanTask],
table_metadata: TableMetadata,
io: FileIO,
row_filter: BooleanExpression,
projected_schema: Schema,
case_sensitive: bool = True,
limit: Optional[int] = None,
) -> pa.Table:
"""Resolve the right columns based on the identifier.
Args:
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
table_metadata (TableMetadata): The table metadata of the table that's being queried
io (FileIO): A FileIO to open streams to the object store
row_filter (BooleanExpression): The expression for filtering rows.
projected_schema (Schema): The output schema.
case_sensitive (bool): Case sensitivity when looking up column names.
limit (Optional[int]): Limit the number of records.
Raises:
ResolveError: When an incompatible query is done.
"""
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
if isinstance(io, PyArrowFileIO):
fs = io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO

if isinstance(io, FsspecFileIO):
from pyarrow.fs import PyFileSystem

fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

projected_field_ids = {
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))

deletes_per_file = _read_all_delete_files(fs, tasks)
executor = ExecutorFactory.get_or_create()
futures = [
executor.submit(
_task_to_table,
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
deletes_per_file.get(task.file.file_path),
case_sensitive,
table_metadata.name_mapping(),
use_large_types,
)
for task in tasks
]
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if limit is not None and total_row_count >= limit:
break

# by now, we've either completed all tasks or satisfied the limit
if limit is not None:
_ = [f.cancel() for f in futures if not f.done()]

tables = [f.result() for f in completed_futures if f.result()]

if len(tables) < 1:
return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema, include_field_ids=False))

result = pa.concat_tables(tables, promote_options="permissive")

if limit is not None:
return result.slice(0, limit)

return result


@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="project_table is deprecated. Use ArrowScan.to_record_batches instead.",
)
def project_batches(
tasks: Iterable[FileScanTask],
table_metadata: TableMetadata,
io: FileIO,
row_filter: BooleanExpression,
projected_schema: Schema,
case_sensitive: bool = True,
limit: Optional[int] = None,
) -> Iterator[pa.RecordBatch]:
"""Resolve the right columns based on the identifier.
Args:
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
table_metadata (TableMetadata): The table metadata of the table that's being queried
io (FileIO): A FileIO to open streams to the object store
row_filter (BooleanExpression): The expression for filtering rows.
projected_schema (Schema): The output schema.
case_sensitive (bool): Case sensitivity when looking up column names.
limit (Optional[int]): Limit the number of records.
Raises:
ResolveError: When an incompatible query is done.
"""
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
if isinstance(io, PyArrowFileIO):
fs = io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO

if isinstance(io, FsspecFileIO):
from pyarrow.fs import PyFileSystem

fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

projected_field_ids = {
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))

deletes_per_file = _read_all_delete_files(fs, tasks)

total_row_count = 0

for task in tasks:
# stop early if limit is satisfied
if limit is not None and total_row_count >= limit:
break
batches = _task_to_record_batches(
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
deletes_per_file.get(task.file.file_path),
case_sensitive,
table_metadata.name_mapping(),
use_large_types,
)
for batch in batches:
if limit is not None:
if total_row_count >= limit:
break
elif total_row_count + len(batch) >= limit:
batch = batch.slice(0, limit - total_row_count)
yield batch
total_row_count += len(batch)


def _to_requested_schema(
requested_schema: Schema,
file_schema: Schema,
Expand Down
Loading

0 comments on commit c4e6be9

Please sign in to comment.