diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt index dcd03e59b259..36c13a7db30e 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data import com.fasterxml.jackson.databind.JsonNode import java.math.BigDecimal +import java.math.BigInteger import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -23,6 +24,7 @@ sealed interface AirbyteValue { is Boolean -> BooleanValue(value) is Int -> IntegerValue(value.toLong()) is Long -> IntegerValue(value) + is BigInteger -> IntegerValue(value) is Double -> NumberValue(BigDecimal.valueOf(value)) is BigDecimal -> NumberValue(value) is LocalDate -> DateValue(value.toString()) @@ -60,7 +62,8 @@ value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable { +value class IntegerValue(val value: BigInteger) : AirbyteValue, Comparable { + constructor(value: Long): this(BigInteger.valueOf(value)) override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt index 683a82aaa8d0..85685a272ba4 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.load.data.* import io.airbyte.cdk.util.Jsons import java.math.BigDecimal +import java.math.BigInteger /** * Converts from json to airbyte value, performing the minimum validation necessary to marshal to a @@ -90,10 +91,10 @@ class JsonToAirbyteValue { private fun toInteger(json: JsonNode): IntegerValue { val longVal = when { - json.isBoolean -> if (json.asBoolean()) 1L else 0L - json.isIntegralNumber -> json.asLong() - json.isFloatingPointNumber -> json.asDouble().toLong() - json.isTextual -> json.asText().toLong() + json.isBoolean -> if (json.asBoolean()) BigInteger.ONE else BigInteger.ZERO + json.isIntegralNumber -> json.bigIntegerValue() + json.isFloatingPointNumber -> json.bigIntegerValue() + json.isTextual -> json.asText().toBigInteger() else -> throw IllegalArgumentException("Could not convert $json to Integer") } return IntegerValue(longVal) @@ -103,8 +104,8 @@ class JsonToAirbyteValue { val numVal = when { json.isBoolean -> BigDecimal(if (json.asBoolean()) 1.0 else 0.0) - json.isIntegralNumber -> json.asLong().toBigDecimal() - json.isFloatingPointNumber -> json.asDouble().toBigDecimal() + json.isIntegralNumber -> json.decimalValue() + json.isFloatingPointNumber -> json.decimalValue() json.isTextual -> json.asText().toBigDecimal() else -> throw IllegalArgumentException("Could not convert $json to Number") } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt index 1a010a8dbd9b..9eed76f7eb0a 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt @@ -28,16 +28,16 @@ class AirbyteValueWithMetaToOutputRecord { Instant.ofEpochMilli( (value.values[DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT] as IntegerValue) - .value + .value.toLong() ), loadedAt = null, data = value.values[DestinationRecord.Meta.COLUMN_NAME_DATA] as ObjectValue, generationId = (value.values[DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID] as IntegerValue) - .value, + .value.toLong(), airbyteMeta = OutputRecord.Meta( - syncId = (meta.values["sync_id"] as IntegerValue).value, + syncId = (meta.values["sync_id"] as IntegerValue).value.toLong(), changes = (meta.values["changes"] as ArrayValue) .values diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 8f01fa5041be..f605e471dbd7 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -52,6 +52,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage import java.math.BigDecimal +import java.math.BigInteger import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -83,6 +84,8 @@ data class StronglyTyped( val topLevelFloatLosesPrecision: Boolean = true, /** Whether floats nested inside objects/arrays are represented as float64. */ val nestedFloatLosesPrecision: Boolean = true, + /** Whether the destination supports integers larger than int64 */ + val integerCanBeLarge: Boolean = true, ) : AllTypesBehavior data object Untyped : AllTypesBehavior @@ -1561,15 +1564,17 @@ abstract class BasicFunctionalityIntegrationTest( ), // A record with all fields unset makeRecord("""{"id": 3}"""), - // A record that verifies floating-point behavior. - // 67.174118 cannot be represented as a standard float64 - // (it turns into 67.17411800000001). + // A record that verifies numeric behavior. + // 99999999999999999999999999999999 is out of range for int64. + // 50000.0000000000000001 can't be represented as a standard float64, + // and gets rounded off. makeRecord( """ { "id": 4, - "struct": {"foo": 67.174118}, - "number": 67.174118 + "struct": {"foo": 50000.0000000000000001}, + "number": 50000.0000000000000001 + "integer": 99999999999999999999999999999999 } """.trimIndent(), ), @@ -1595,22 +1600,28 @@ abstract class BasicFunctionalityIntegrationTest( val nestedFloat: BigDecimal val topLevelFloat: BigDecimal + val bigInt: BigInteger? val badValuesData: Map val badValuesChanges: MutableList when (allTypesBehavior) { is StronglyTyped -> { nestedFloat = if (allTypesBehavior.nestedFloatLosesPrecision) { - BigDecimal("67.17411800000001") + BigDecimal("50000.0") } else { - BigDecimal("67.174118") + BigDecimal("50000.0000000000000001") } topLevelFloat = if (allTypesBehavior.topLevelFloatLosesPrecision) { - BigDecimal("67.17411800000001") + BigDecimal("50000.0") } else { - BigDecimal("67.174118") + BigDecimal("50000.0000000000000001") } + bigInt = if (allTypesBehavior.integerCanBeLarge) { + BigInteger("99999999999999999999999999999999") + } else { + null + } badValuesData = mapOf( "id" to 5, @@ -1650,8 +1661,9 @@ abstract class BasicFunctionalityIntegrationTest( .toMutableList() } Untyped -> { - nestedFloat = BigDecimal("67.174118") - topLevelFloat = BigDecimal("67.174118") + nestedFloat = BigDecimal("50000.0000000000000001") + topLevelFloat = BigDecimal("50000.0000000000000001") + bigInt = BigInteger("99999999999999999999999999999999") badValuesData = mapOf( "id" to 5, @@ -1727,6 +1739,7 @@ abstract class BasicFunctionalityIntegrationTest( "id" to 4, "struct" to mapOf("foo" to nestedFloat), "number" to topLevelFloat, + "integer" to bigInt, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index fb8f9885ce08..3955cc67af89 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -14,7 +14,7 @@ import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test val avroAllTypesBehavior = - StronglyTyped(topLevelFloatLosesPrecision = false, nestedFloatLosesPrecision = false) + StronglyTyped(topLevelFloatLosesPrecision = true, nestedFloatLosesPrecision = true) abstract class S3V2WriteTest( path: String,