From b30020714900c3bd217fedcc919a99524cd1a4bf Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 27 Nov 2024 11:20:37 -0500 Subject: [PATCH 1/3] fix: convert object value to linked hash map for Iceberg record --- .../parquet/AirbyteValueToIcebergRecord.kt | 20 +++-- .../AirbyteValueToIcebergRecordTest.kt | 79 ++++++++++++++++--- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt index b5c6a12d8b0e..dee32874ecf6 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt @@ -28,21 +28,25 @@ class AirbyteValueToIcebergRecord { if (type.isStructType) { type.asStructType().asSchema() } else { - throw IllegalArgumentException("ObjectValue should be mapped to ObjectType") + throw IllegalArgumentException("ObjectValue should be mapped to StructType") + } + return recordSchema + .columns() + .filter { column -> airbyteValue.values.containsKey(column.name()) } + .associate { column -> + column.name() to + convert( + airbyteValue.values[column.name()]!!, + column.type(), + ) } - val associate = recordSchema.columns().associate { it.name() to it.type() } - val record = GenericRecord.create(recordSchema) - airbyteValue.values.forEach { (name, value) -> - associate[name]?.let { field -> record.setField(name, convert(value, field)) } - } - return record } is ArrayValue -> { val elementType = if (type.isListType) { type.asListType().elementType() } else { - throw IllegalArgumentException("ArrayValue should be mapped to ArrayType") + throw IllegalArgumentException("ArrayValue should be mapped to ListType") } val array: MutableList = mutableListOf() diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt index 54ce80dc8f98..a9e4b225ccd9 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt @@ -4,16 +4,28 @@ package io.airbyte.cdk.load.data.icerberg.parquet -import io.airbyte.cdk.load.data.* +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.iceberg.parquet.AirbyteValueToIcebergRecord import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord import io.airbyte.protocol.models.Jsons import java.math.BigDecimal import org.apache.iceberg.Schema -import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Types import org.apache.iceberg.types.Types.NestedField -import org.junit.jupiter.api.Assertions.* +import org.apache.iceberg.types.Types.StructType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -31,9 +43,9 @@ class AirbyteValueToIcebergRecordTest { val objectValue = ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))) - val result = converter.convert(objectValue, schema.asStruct()) as GenericRecord - assertEquals(42L, result.getField("id")) - assertEquals("John Doe", result.getField("name")) + val result = converter.convert(objectValue, schema.asStruct()) as LinkedHashMap<*, *> + assertEquals(42L, result["id"]) + assertEquals("John Doe", result["name"]) } @Test @@ -134,14 +146,60 @@ class AirbyteValueToIcebergRecordTest { val schema = Schema( NestedField.required(1, "id", Types.LongType.get()), - NestedField.optional(2, "name", Types.StringType.get()) + NestedField.optional(2, "name", Types.StringType.get()), + NestedField.required( + 3, + "meta", + Types.StructType.of( + NestedField.required(4, "sync_id", Types.IntegerType.get()), + NestedField.required( + 5, + "changes", + StructType.of( + NestedField.required(6, "change", Types.StringType.get()), + NestedField.required(7, "reason", Types.StringType.get()), + ) + ) + ) + ) ) val objectValue = - ObjectValue(linkedMapOf("id" to IntegerValue(123L), "name" to StringValue("John Doe"))) + ObjectValue( + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("John Doe"), + "meta" to + ObjectValue( + linkedMapOf( + "sync_id" to IntegerValue(123L), + "changes" to + ObjectValue( + linkedMapOf( + "change" to StringValue("insert"), + "reason" to StringValue("reason"), + ) + ) + ) + ) + ) + ) val result = objectValue.toIcebergRecord(schema) assertEquals(123L, result.getField("id")) assertEquals("John Doe", result.getField("name")) + assertEquals(123L, (result.getField("meta") as Map<*, *>)["sync_id"] as Long) + assertEquals( + 2, + ((result.getField("meta") as Map<*, *>)["changes"] as LinkedHashMap<*, *>).size + ) + assertEquals( + "insert", + ((result.getField("meta") as Map<*, *>)["changes"] as LinkedHashMap<*, *>)["change"] + ) + assertEquals( + "reason", + ((result.getField("meta") as Map<*, *>)["changes"] as LinkedHashMap<*, *>)["reason"] + ) } @Test @@ -149,7 +207,10 @@ class AirbyteValueToIcebergRecordTest { val schema = Schema(NestedField.required(1, "id", Types.LongType.get())) val objectValue = ObjectValue( - linkedMapOf("id" to IntegerValue(123L), "name" to StringValue("Should be ignored")) + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("Should be ignored"), + ) ) val result = objectValue.toIcebergRecord(schema) From 5e13f570b6a20429daaed066e336673c13121966 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 27 Nov 2024 11:57:59 -0500 Subject: [PATCH 2/3] Remove invalid import --- .../data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt index 4ab8f1134080..15a59bdf71d1 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt @@ -7,7 +7,6 @@ package io.airbyte.cdk.load.data.icerberg.parquet import io.airbyte.cdk.load.data.ArrayValue import io.airbyte.cdk.load.data.BooleanValue import io.airbyte.cdk.load.data.DateValue -import io.airbyte.cdk.load.data.IntValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberValue From c4927231460171f4d5b61713d41e7a01a00cad41 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 27 Nov 2024 12:29:56 -0500 Subject: [PATCH 3/3] Avoid potential NPE --- .../load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt index 7c5cf42a9e61..45e9b4fb414f 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt @@ -35,7 +35,7 @@ class AirbyteValueToIcebergRecord { .associate { column -> column.name() to convert( - airbyteValue.values[column.name()]!!, + airbyteValue.values.getOrDefault(column.name(), NullValue), column.type(), ) }