Skip to content

Commit

Permalink
Fix the Avro tests
Browse files Browse the repository at this point in the history
When writing V1, the sequence-numbers should be None.
For V2 they will be written and read into the original value.
  • Loading branch information
Fokko committed Mar 27, 2024
1 parent f1c2ef2 commit 609a342
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
4 changes: 3 additions & 1 deletion pyiceberg/utils/schema_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ def field(self, field: NestedField, field_result: AvroType) -> AvroType:
"type": field_result if field.required else ["null", field_result],
}

if field.optional:
if field.write_default is not None:
result["default"] = field.write_default # type: ignore
elif field.optional:
result["default"] = None

if field.doc is not None:
Expand Down
39 changes: 33 additions & 6 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import inspect
from copy import copy
from datetime import date, datetime, time
from enum import Enum
from tempfile import TemporaryDirectory
Expand Down Expand Up @@ -111,7 +112,7 @@ def todict(obj: Any) -> Any:
return obj.value
elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes):
return [todict(v) for v in obj]
elif hasattr(obj, "__dict__"):
elif isinstance(obj, Record):
return {key: todict(value) for key, value in inspect.getmembers(obj) if not callable(value) and not key.startswith("_")}
else:
return obj
Expand Down Expand Up @@ -258,8 +259,6 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
sort_order_id=4,
spec_id=3,
)
if format_version == 1:
data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE

entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
Expand All @@ -277,16 +276,44 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
with open(tmp_avro_file, "wb") as out:
writer(out, schema, [todict(entry)])

# Read as V2
with avro.AvroFile[ManifestEntry](
PyArrowFileIO().new_input(tmp_avro_file),
MANIFEST_ENTRY_SCHEMAS[format_version],
{-1: ManifestEntry, 2: DataFile},
input_file=PyArrowFileIO().new_input(tmp_avro_file),
read_schema=MANIFEST_ENTRY_SCHEMAS[2],
read_types={-1: ManifestEntry, 2: DataFile},
) as avro_reader:
it = iter(avro_reader)
avro_entry = next(it)

assert entry == avro_entry

# Read as the original version
with avro.AvroFile[ManifestEntry](
input_file=PyArrowFileIO().new_input(tmp_avro_file),
read_schema=MANIFEST_ENTRY_SCHEMAS[format_version],
read_types={-1: ManifestEntry, 2: DataFile},
) as avro_reader:
it = iter(avro_reader)
avro_entry = next(it)

if format_version == 1:
v1_datafile = copy(data_file)
# Not part of V1
v1_datafile.equality_ids = None

assert avro_entry == ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
# Not part of v1
data_sequence_number=None,
file_sequence_number=None,
data_file=v1_datafile,
)
elif format_version == 2:
assert entry == avro_entry
else:
raise ValueError(f"Unsupported version: {format_version}")


@pytest.mark.parametrize("is_required", [True, False])
def test_all_primitive_types(is_required: bool) -> None:
Expand Down

0 comments on commit 609a342

Please sign in to comment.