diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index afe8779868..b70fd779e9 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -17,39 +17,41 @@ from __future__ import annotations import math -from abc import ABC, abstractmethod +from abc import ABC +from abc import abstractmethod from enum import Enum from types import TracebackType -from typing import ( - Any, - Callable, - Dict, - Iterator, - List, - Literal, - Optional, - Type, -) - -from pyiceberg.avro.file import AvroFile, AvroOutputFile +from typing import Any, Generator +from typing import Callable +from typing import Dict +from typing import Iterator +from typing import List +from typing import Literal +from typing import Optional +from typing import Type + +from pyiceberg.avro.file import AvroFile +from pyiceberg.avro.file import AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError -from pyiceberg.io import FileIO, InputFile, OutputFile +from pyiceberg.io import FileIO +from pyiceberg.io import InputFile +from pyiceberg.io import OutputFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.typedef import EMPTY_DICT, Record, TableVersion -from pyiceberg.types import ( - BinaryType, - BooleanType, - IntegerType, - ListType, - LongType, - MapType, - NestedField, - PrimitiveType, - StringType, - StructType, -) +from pyiceberg.typedef import EMPTY_DICT +from pyiceberg.typedef import Record +from pyiceberg.typedef import TableVersion +from pyiceberg.types import BinaryType +from pyiceberg.types import BooleanType +from pyiceberg.types import IntegerType +from pyiceberg.types import ListType +from pyiceberg.types import LongType +from pyiceberg.types import MapType +from pyiceberg.types import NestedField +from pyiceberg.types import PrimitiveType +from pyiceberg.types import StringType +from pyiceberg.types import StructType UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -99,7 +101,9 @@ def __repr__(self) -> str: DATA_FILE_TYPE: Dict[int, StructType] = { 1: StructType( - NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), + NestedField( + field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" + ), NestedField( field_id=101, name="file_format", @@ -114,9 +118,15 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" + field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" + ), + NestedField( + field_id=104, + name="file_size_in_bytes", + field_type=LongType(), + required=True, + doc="Total file size in bytes", ), NestedField( field_id=105, @@ -169,7 +179,11 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" + field_id=131, + name="key_metadata", + field_type=BinaryType(), + required=False, + doc="Encryption key metadata blob", ), NestedField( field_id=132, @@ -189,7 +203,9 @@ def __repr__(self) -> str: doc="File format name: avro, orc, or parquet", initial_default=DataFileContent.DATA, ), - NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), + NestedField( + field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" + ), NestedField( field_id=101, name="file_format", @@ -204,9 +220,15 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" + field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" + ), + NestedField( + field_id=104, + name="file_size_in_bytes", + field_type=LongType(), + required=True, + doc="Total file size in bytes", ), NestedField( field_id=108, @@ -251,7 +273,11 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" + field_id=131, + name="key_metadata", + field_type=BinaryType(), + required=False, + doc="Encryption key metadata blob", ), NestedField( field_id=132, @@ -279,28 +305,34 @@ def __repr__(self) -> str: def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType: - data_file_partition_type = StructType(*[ - NestedField( - field_id=field.field_id, - name=field.name, - field_type=field.field_type, - required=field.required, - ) - for field in partition_type.fields - ]) + data_file_partition_type = StructType( + *[ + NestedField( + field_id=field.field_id, + name=field.name, + field_type=field.field_type, + required=field.required, + ) + for field in partition_type.fields + ] + ) - return StructType(*[ - NestedField( - field_id=102, - name="partition", - field_type=data_file_partition_type, - required=True, - doc="Partition data tuple, schema based on the partition spec", - ) - if field.field_id == 102 - else field - for field in DATA_FILE_TYPE[format_version].fields - ]) + return StructType( + *[ + ( + NestedField( + field_id=102, + name="partition", + field_type=data_file_partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", + ) + if field.field_id == 102 + else field + ) + for field in DATA_FILE_TYPE[format_version].fields + ] + ) class DataFile(Record): @@ -381,14 +413,18 @@ def __eq__(self, other: Any) -> bool: ), } -MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()} +MANIFEST_ENTRY_SCHEMAS_STRUCT = { + format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items() +} def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema: - return Schema(*[ - NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field - for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields - ]) + return Schema( + *[ + NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field + for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields + ] + ) class ManifestEntry(Record): @@ -458,7 +494,9 @@ def update(self, value: Any) -> None: self._min = min(self._min, value) -def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: +def construct_partition_summaries( + spec: PartitionSpec, schema: Schema, partitions: List[Record] +) -> List[PartitionFieldSummary]: types = [field.field_type for field in spec.partition_type(schema).fields] field_stats = [PartitionFieldStats(field_type) for field_type in types] for partition_keys in partitions: @@ -482,7 +520,9 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition NestedField(512, "added_rows_count", LongType(), required=False), NestedField(513, "existing_rows_count", LongType(), required=False), NestedField(514, "deleted_rows_count", LongType(), required=False), - NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField( + 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False + ), NestedField(519, "key_metadata", BinaryType(), required=False), ), 2: Schema( @@ -499,12 +539,16 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition NestedField(512, "added_rows_count", LongType(), required=True), NestedField(513, "existing_rows_count", LongType(), required=True), NestedField(514, "deleted_rows_count", LongType(), required=True), - NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField( + 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False + ), NestedField(519, "key_metadata", BinaryType(), required=False), ), } -MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} +MANIFEST_LIST_FILE_STRUCTS = { + format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items() +} POSITIONAL_DELETE_SCHEMA = Schema( @@ -623,12 +667,16 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED - if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + if entry.data_sequence_number is None and ( + manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED + ): entry.data_sequence_number = manifest.sequence_number # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED - if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + if entry.file_sequence_number is None and ( + manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED + ): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number @@ -656,7 +704,12 @@ class ManifestWriter(ABC): _partitions: List[Record] def __init__( - self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + meta: Dict[str, str] = EMPTY_DICT, ) -> None: self.closed = False self._spec = spec @@ -765,17 +818,74 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: class RollingManifestWriter: - _current_writer: ManifestWriter - _supplier: Callable[[], ManifestWriter] - - def __init__(self, supplier: Callable[[], ManifestWriter], target_file_size_in_bytes, target_number_of_rows) -> None: - pass + closed: bool + _supplier: Generator[ManifestWriter, None, None] + _manifest_files: list[ManifestFile] + _target_file_size_in_bytes: int + _target_number_of_rows: int + _current_writer: Optional[ManifestWriter] + _current_file_rows: int - def _should_roll_to_new_file(self) -> bool: ... + def __init__( + self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows + ) -> None: + self._closed = False + self._manifest_files = [] + self._supplier = supplier + self._target_file_size_in_bytes = target_file_size_in_bytes + self._target_number_of_rows = target_number_of_rows + self._current_writer = None + self._current_file_rows = 0 + + def __enter__(self) -> RollingManifestWriter: + self._get_current_writer().__enter__() + return self - def to_manifest_files(self) -> list[ManifestFile]: ... + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.closed = True + if self._current_writer: + self._current_writer.__exit__(exc_type, exc_value, traceback) + + def _get_current_writer(self) -> ManifestWriter: + if not self._current_writer: + self._current_writer = next(self._supplier) + self._current_writer.__enter__() + return self._current_writer + if self._should_roll_to_new_file(): + self._close_current_writer() + return self._current_writer + + def _should_roll_to_new_file(self) -> bool: + if not self._current_writer: + return False + return ( + self._current_file_rows >= self._target_number_of_rows + or len(self._current_writer._output_file) >= self._target_file_size_in_bytes + ) - def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: ... + def _close_current_writer(self): + if self._current_writer: + self._current_writer.__exit__(None, None, None) + current_file = self._current_writer.to_manifest_file() + self._manifest_files.append(current_file) + self._current_writer = None + self._current_file_rows = 0 + + def to_manifest_files(self) -> list[ManifestFile]: + self._close_current_writer() + self._closed = True + return self._manifest_files + + def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: + if self._closed: + raise RuntimeError("Cannot add entry to closed manifest writer") + self._get_current_writer().add_entry(entry) + return self class ManifestWriterV1(ManifestWriter): @@ -896,7 +1006,11 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id super().__init__( format_version=1, output_file=output_file, - meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}, + meta={ + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id), + "format-version": "1", + }, ) def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: @@ -909,7 +1023,9 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): + def __init__( + self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int + ): super().__init__( format_version=2, output_file=output_file,