Skip to content

Commit

Permalink
test: dummy test without rollover
Browse files Browse the repository at this point in the history
  • Loading branch information
felixscherz committed May 4, 2024
1 parent 63466a5 commit d5204ea
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ def _get_current_writer(self) -> ManifestWriter:
return self._current_writer
if self._should_roll_to_new_file():
self._close_current_writer()
return self._get_current_writer()
return self._current_writer

def _should_roll_to_new_file(self) -> bool:
Expand Down Expand Up @@ -885,6 +886,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
if self._closed:
raise RuntimeError("Cannot add entry to closed manifest writer")
self._get_current_writer().add_entry(entry)
self._current_file_rows += entry.data_file.record_count
return self


Expand Down
179 changes: 179 additions & 0 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
RollingManifestWriter,
read_manifest_list,
write_manifest,
write_manifest_list,
Expand Down Expand Up @@ -476,6 +477,184 @@ def test_write_manifest(
assert data_file.sort_order_id == 0


@pytest.mark.parametrize("format_version", [1, 2])
def test_rolling_manifest_writer(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_file = snapshot.manifests(io)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
test_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
)
test_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"),
PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"),
spec_id=demo_manifest_file.partition_spec_id,
)
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
tmp_avro_file = tmpdir + "/test_write_manifest-1.avro"
output = io.new_output(tmp_avro_file)
def supplier():
i = 0
while True:
i += 1
tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro"
output = io.new_output(tmp_avro_file)
yield write_manifest(
format_version=format_version,
spec=test_spec,
schema=test_schema,
output_file=output,
snapshot_id=8744736658442914487,
)
with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer:
for entry in manifest_entries:
writer.add_entry(entry)
new_manifest = writer.to_manifest_files()[0]
with pytest.raises(RuntimeError):
# It is already closed
writer.add_entry(manifest_entries[0])

expected_metadata = {
"schema": test_schema.model_dump_json(),
"partition-spec": test_spec.model_dump_json(),
"partition-spec-id": str(test_spec.spec_id),
"format-version": str(format_version),
}
_verify_metadata_with_fastavro(
tmp_avro_file,
expected_metadata,
)
new_manifest_entries = new_manifest.fetch_manifest_entry(io)

manifest_entry = new_manifest_entries[0]

assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file

assert data_file.content is DataFileContent.DATA
assert (
data_file.file_path
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925)
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"2020-04-01 00:00",
3: b"2020-04-01 00:12",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"2020-04-30 23:5:",
3: b"2020-05-01 00:41",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0


@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest_list(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
Expand Down

0 comments on commit d5204ea

Please sign in to comment.