From 25ec4748592dbc80a51e805ef65a958fdfe2506c Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sun, 21 Apr 2024 19:40:31 +0200 Subject: [PATCH] test: dummy test without rollover --- pyiceberg/manifest.py | 2 + tests/utils/test_manifest.py | 179 +++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index b70fd779e9..0e4ea97af0 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -858,6 +858,7 @@ def _get_current_writer(self) -> ManifestWriter: return self._current_writer if self._should_roll_to_new_file(): self._close_current_writer() + return self._get_current_writer() return self._current_writer def _should_roll_to_new_file(self) -> bool: @@ -885,6 +886,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: if self._closed: raise RuntimeError("Cannot add entry to closed manifest writer") self._get_current_writer().add_entry(entry) + self._current_file_rows += entry.data_file.record_count return self diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 8bb03cd80e..756f9e0510 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -31,6 +31,7 @@ ManifestEntryStatus, ManifestFile, PartitionFieldSummary, + RollingManifestWriter, read_manifest_list, write_manifest, write_manifest_list, @@ -476,6 +477,184 @@ def test_write_manifest( assert data_file.sort_order_id == 0 +@pytest.mark.parametrize("format_version", [1, 2]) +def test_rolling_manifest_writer( + generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion +) -> None: + io = load_file_io() + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + demo_manifest_file = snapshot.manifests(io)[0] + manifest_entries = demo_manifest_file.fetch_manifest_entry(io) + test_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False) + ) + test_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"), + PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), + spec_id=demo_manifest_file.partition_spec_id, + ) + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/test_write_manifest.avro" + tmp_avro_file = tmpdir + "/test_write_manifest-1.avro" + output = io.new_output(tmp_avro_file) + def supplier(): + i = 0 + while True: + i += 1 + tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro" + output = io.new_output(tmp_avro_file) + yield write_manifest( + format_version=format_version, + spec=test_spec, + schema=test_schema, + output_file=output, + snapshot_id=8744736658442914487, + ) + with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer: + for entry in manifest_entries: + writer.add_entry(entry) + new_manifest = writer.to_manifest_files()[0] + with pytest.raises(RuntimeError): + # It is already closed + writer.add_entry(manifest_entries[0]) + + expected_metadata = { + "schema": test_schema.model_dump_json(), + "partition-spec": test_spec.model_dump_json(), + "partition-spec-id": str(test_spec.spec_id), + "format-version": str(format_version), + } + _verify_metadata_with_fastavro( + tmp_avro_file, + expected_metadata, + ) + new_manifest_entries = new_manifest.fetch_manifest_entry(io) + + manifest_entry = new_manifest_entries[0] + + assert manifest_entry.status == ManifestEntryStatus.ADDED + assert manifest_entry.snapshot_id == 8744736658442914487 + assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 + assert isinstance(manifest_entry.data_file, DataFile) + + data_file = manifest_entry.data_file + + assert data_file.content is DataFileContent.DATA + assert ( + data_file.file_path + == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" + ) + assert data_file.file_format == FileFormat.PARQUET + assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) + assert data_file.record_count == 19513 + assert data_file.file_size_in_bytes == 388872 + assert data_file.column_sizes == { + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + } + assert data_file.value_counts == { + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + } + assert data_file.null_value_counts == { + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + } + assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} + assert data_file.lower_bounds == { + 2: b"2020-04-01 00:00", + 3: b"2020-04-01 00:12", + 7: b"\x03\x00\x00\x00", + 8: b"\x01\x00\x00\x00", + 10: b"\xf6(\\\x8f\xc2\x05S\xc0", + 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", + 15: b")\\\x8f\xc2\xf5(\x08\xc0", + 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", + 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", + } + assert data_file.upper_bounds == { + 2: b"2020-04-30 23:5:", + 3: b"2020-05-01 00:41", + 7: b"\t\x01\x00\x00", + 8: b"\t\x01\x00\x00", + 10: b"\xcd\xcc\xcc\xcc\xcc,_@", + 11: b"\x1f\x85\xebQ\\\xe2\xfe@", + 13: b"\x00\x00\x00\x00\x00\x00\x12@", + 14: b"\x00\x00\x00\x00\x00\x00\xe0?", + 15: b"q=\n\xd7\xa3\xf01@", + 16: b"\x00\x00\x00\x00\x00`B@", + 17: b"333333\xd3?", + 18: b"\x00\x00\x00\x00\x00\x18b@", + 19: b"\x00\x00\x00\x00\x00\x00\x04@", + } + assert data_file.key_metadata is None + assert data_file.split_offsets == [4] + assert data_file.equality_ids is None + assert data_file.sort_order_id == 0 + + @pytest.mark.parametrize("format_version", [1, 2]) def test_write_manifest_list( generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion