Skip to content

Commit

Permalink
Merge branch 'master' into matteogp/builder-contribute/source-navan
Browse files Browse the repository at this point in the history
  • Loading branch information
matteogp authored Nov 27, 2024
2 parents dbba7ee + 03adb62 commit 8a98d53
Show file tree
Hide file tree
Showing 145 changed files with 1,965 additions and 637 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class MockBasicFunctionalityIntegrationTest :
Expand Down Expand Up @@ -91,4 +92,6 @@ class MockBasicFunctionalityIntegrationTest :
override fun testBasicTypes() {
super.testBasicTypes()
}

@Test @Disabled override fun testBasicWriteFile() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,12 @@ object MockDestinationDataDumper : DestinationDataDumper {
MockStreamLoader.getFilename(stream.descriptor.namespace, stream.descriptor.name)
)
}

override fun dumpFile(
spec: ConfigurationSpecification,
stream: DestinationStream
): List<String> {
// Not needed since the test is disabled for file transfer
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
Expand All @@ -14,6 +15,8 @@ import jakarta.inject.Singleton
* for usability.
*/
data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()) {
private val log = KotlinLogging.logger {}

private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

Expand All @@ -23,6 +26,7 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
"Catalog must have at least one stream: check that files are in the correct location."
)
}
log.info { "Destination catalog initialized: $streams" }
}

fun getStream(name: String, namespace: String?): DestinationStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
FieldType(IntegerType, nullable = false)
)
if (flatten) {
(schema as ObjectType).properties.forEach { (name, field) -> properties[name] = field }
if (schema is ObjectType) {
schema.properties.forEach { (name, field) -> properties[name] = field }
} else if (schema is ObjectTypeWithEmptySchema) {
// Do nothing: no fields to add
} else {
throw IllegalStateException(
"Cannot flatten without an object schema (schema type: $schema)"
)
}
} else {
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ value class IntegerValue(val value: BigInteger) : AirbyteValue, Comparable<Integ
override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value)
}

@JvmInline
value class IntValue(val value: Int) : AirbyteValue, Comparable<IntValue> {
override fun compareTo(other: IntValue): Int = value.compareTo(other.value)
}

@JvmInline
value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<NumberValue> {
override fun compareTo(other: NumberValue): Int = value.compareTo(other.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,10 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
try {
when (schema) {
is ObjectType -> mapObject(value as ObjectValue, schema, context)
is ObjectTypeWithoutSchema ->
mapObjectWithoutSchema(value as ObjectValue, schema, context)
is ObjectTypeWithEmptySchema ->
mapObjectWithEmptySchema(value as ObjectValue, schema, context)
is ObjectTypeWithoutSchema -> mapObjectWithoutSchema(value, schema, context)
is ObjectTypeWithEmptySchema -> mapObjectWithEmptySchema(value, schema, context)
is ArrayType -> mapArray(value as ArrayValue, schema, context)
is ArrayTypeWithoutSchema ->
mapArrayWithoutSchema(value as ArrayValue, schema, context)
is ArrayTypeWithoutSchema -> mapArrayWithoutSchema(value, schema, context)
is UnionType -> mapUnion(value, schema, context)
is BooleanType -> mapBoolean(value as BooleanValue, context)
is NumberType -> mapNumber(value as NumberValue, context)
Expand Down Expand Up @@ -118,13 +115,13 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
}

open fun mapObjectWithoutSchema(
value: ObjectValue,
value: AirbyteValue,
schema: ObjectTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapObjectWithEmptySchema(
value: ObjectValue,
value: AirbyteValue,
schema: ObjectTypeWithEmptySchema,
context: Context
): Pair<AirbyteValue, Context> = value to context
Expand All @@ -150,7 +147,7 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
}

open fun mapArrayWithoutSchema(
value: ArrayValue,
value: AirbyteValue,
schema: ArrayTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> = value to context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class DestinationRecordToAirbyteValueWithMeta(
Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(stream.generationId),
)
if (flatten) {
properties.putAll((data as ObjectValue).values)
// Special case: if the top-level schema had no columns, do nothing.
if (stream.schema !is ObjectTypeWithEmptySchema) {
properties.putAll((data as ObjectValue).values)
}
} else {
properties[Meta.COLUMN_NAME_DATA] = data
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,49 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.util.serializeToString

class SchemalessTypesToJsonString : AirbyteSchemaIdentityMapper {
override fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType = StringType
override fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType =
StringType
override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = StringType
override fun mapUnknown(schema: UnknownType): AirbyteType = StringType
}

class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() {
override fun mapObjectWithoutSchema(
value: ObjectValue,
value: AirbyteValue,
schema: ObjectTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapObjectWithEmptySchema(
value: ObjectValue,
value: AirbyteValue,
schema: ObjectTypeWithEmptySchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapArrayWithoutSchema(
value: ArrayValue,
value: AirbyteValue,
schema: ArrayTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapUnknown(value: UnknownValue, context: Context): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context

override fun mapUnion(
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> {
if (ObjectTypeWithEmptySchema in schema.options && value is ObjectValue) {
return mapObjectWithEmptySchema(value, ObjectTypeWithEmptySchema, context)
}

if (ObjectTypeWithoutSchema in schema.options && value is ObjectValue) {
return mapObjectWithoutSchema(value, ObjectTypeWithoutSchema, context)
}

if (ArrayTypeWithoutSchema in schema.options && value is ArrayValue) {
return mapArrayWithoutSchema(value, ArrayTypeWithoutSchema, context)
}

if (schema.options.any { it is UnknownType } && value is UnknownValue) {
return mapUnknown(value, context)
}

return super.mapUnion(value, schema, context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() {
override fun mapDate(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> {
value as DateValue
val epochDay = LocalDate.parse(value.value, DATE_TIME_FORMATTER).toEpochDay()
return IntValue(epochDay.toInt()) to context
return IntegerValue(epochDay) to context
}

private fun toMicrosOfDayWithTimezone(timeString: String): Long {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.text.Normalizer
import java.util.regex.Pattern

class Transformations {
companion object {
private const val S3_SAFE_CHARACTERS = "\\p{Alnum}/!_.*')("
private const val S3_SPECIAL_CHARACTERS = "&$@=;:+,?-"
private val S3_CHARACTER_PATTERN =
"[^${S3_SAFE_CHARACTERS}${Pattern.quote(S3_SPECIAL_CHARACTERS)}]"
const val NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN: String = "[^\\p{Alnum}_]"

fun toS3SafeCharacters(input: String): String {
return Normalizer.normalize(input, Normalizer.Form.NFKD)
.replace(
"\\p{M}".toRegex(),
"",
) // P{M} matches a code point that is not a combining mark (unicode)
.replace(S3_CHARACTER_PATTERN.toRegex(), "_")
}

fun toAlphanumericAndUnderscore(s: String): String {
return Normalizer.normalize(s, Normalizer.Form.NFKD)
.replace(
"\\p{M}".toRegex(),
""
) // P{M} matches a code point that is not a combining mark (unicode)
.replace("\\s+".toRegex(), "_")
.replace(NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN.toRegex(), "_")
}

fun toAvroSafeNamespace(namespace: String): String {
val tokens =
namespace.split("\\.".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
return tokens
.map { name: String -> toAlphanumericAndUnderscore(name) }
.joinToString(separator = ".")
}

fun toAvroSafeName(name: String) = toAlphanumericAndUnderscore(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper {
is TimestampTypeWithoutTimezone -> "timestamp_without_timezone"
is TimeTypeWithTimezone -> "time_with_timezone"
is TimeTypeWithoutTimezone -> "time_without_timezone"
is ArrayType,
is ArrayTypeWithoutSchema -> "array"
is ObjectType,
is ArrayType -> "array"
is ObjectType -> "object"
is ArrayTypeWithoutSchema,
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> "object"
is ObjectTypeWithEmptySchema -> "string"
is UnionType -> "union"
is UnknownType -> "unknown"
}
Expand Down Expand Up @@ -65,20 +65,20 @@ class UnionValueToDisjointRecord : AirbyteValueIdentityMapper() {

private fun matches(schema: AirbyteType, value: AirbyteValue): Boolean {
return when (schema) {
is ArrayType,
is ArrayTypeWithoutSchema -> value is ArrayValue
is StringType -> value is StringValue
is BooleanType -> value is BooleanValue
is DateType -> value is DateValue
is IntegerType -> value is IntegerValue
is NumberType -> value is NumberValue
is ObjectType,
is ArrayType -> value is ArrayValue
is ObjectType -> value is ObjectValue
is ArrayTypeWithoutSchema,
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> value is ObjectValue
is StringType -> value is StringValue
is ObjectTypeWithEmptySchema -> value is StringValue
is DateType,
is TimeTypeWithTimezone,
is TimeTypeWithoutTimezone,
is TimestampTypeWithTimezone,
is TimestampTypeWithoutTimezone -> value is TimeValue
is TimestampTypeWithoutTimezone -> value is IntegerValue
is UnionType -> schema.options.any { matches(it, value) }
is UnknownType -> false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class AirbyteValueToJson {
is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value)
is DateValue -> JsonNodeFactory.instance.textNode(value.value)
is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value)
is IntValue -> JsonNodeFactory.instance.numberNode(value.value)
is NullValue -> JsonNodeFactory.instance.nullNode()
is NumberValue -> JsonNodeFactory.instance.numberNode(value.value)
is ObjectValue -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ class JsonSchemaToAirbyteType {
} else {
UnknownType(schema)
}
} else if (schema.isObject && schema.has("\$ref")) {
// TODO: Determine whether we even still need to support this
return when (schema.get("\$ref").asText()) {
"WellKnownTypes.json#/definitions/Integer" -> IntegerType
"WellKnownTypes.json#/definitions/Number" -> NumberType
"WellKnownTypes.json#/definitions/String" -> StringType
"WellKnownTypes.json#/definitions/Boolean" -> BooleanType
"WellKnownTypes.json#/definitions/Date" -> DateType
"WellKnownTypes.json#/definitions/TimestampWithTimezone" ->
TimestampTypeWithTimezone
"WellKnownTypes.json#/definitions/TimestampWithoutTimezone" ->
TimestampTypeWithoutTimezone
"WellKnownTypes.json#/definitions/BinaryData" -> StringType
"WellKnownTypes.json#/definitions/TimeWithTimezone" -> TimeTypeWithTimezone
"WellKnownTypes.json#/definitions/TimeWithoutTimezone" -> TimeTypeWithoutTimezone
else -> UnknownType(schema)
}
} else if (schema.isObject) {
// {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...}
val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf")
Expand Down Expand Up @@ -138,6 +155,9 @@ class JsonSchemaToAirbyteType {
convertInner(it)
}
}
if (unionOptions.isEmpty()) {
return UnknownType(parentSchema)
}
return UnionType.of(unionOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ data class GlobalCheckpoint(
override val destinationStats: Stats? = null,
val checkpoints: List<Checkpoint> = emptyList(),
override val additionalProperties: Map<String, Any>,
val originalTypeField: AirbyteStateMessage.AirbyteStateType? =
AirbyteStateMessage.AirbyteStateType.GLOBAL,
) : CheckpointMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
Expand All @@ -333,7 +335,7 @@ data class GlobalCheckpoint(
override fun asProtocolMessage(): AirbyteMessage {
val stateMessage =
AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
.withType(originalTypeField)
.withGlobal(
AirbyteGlobalState()
.withSharedState(state)
Expand Down Expand Up @@ -433,7 +435,10 @@ class DestinationMessageFactory(
namespace = status.streamDescriptor.namespace,
name = status.streamDescriptor.name,
)
if (message.trace.type == AirbyteTraceMessage.Type.STREAM_STATUS) {
if (
message.trace.type == null ||
message.trace.type == AirbyteTraceMessage.Type.STREAM_STATUS
) {
when (status.status) {
AirbyteStreamStatus.COMPLETE ->
if (fileTransferEnabled) {
Expand All @@ -444,7 +449,7 @@ class DestinationMessageFactory(
} else {
DestinationRecordStreamComplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
}
AirbyteStreamStatus.INCOMPLETE ->
Expand Down Expand Up @@ -476,6 +481,7 @@ class DestinationMessageFactory(
},
additionalProperties = message.state.additionalProperties,
)
null,
AirbyteStateMessage.AirbyteStateType.GLOBAL ->
GlobalCheckpoint(
sourceStats =
Expand All @@ -488,6 +494,7 @@ class DestinationMessageFactory(
fromAirbyteStreamState(it)
},
additionalProperties = message.state.additionalProperties,
originalTypeField = message.state.type,
)
else -> // TODO: Do we still need to handle LEGACY?
Undefined
Expand Down
Loading

0 comments on commit 8a98d53

Please sign in to comment.