diff --git a/build.gradle.kts b/build.gradle.kts index ef0c8aa..46ccce5 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -16,7 +16,7 @@ plugins { } // Package version -version = "0.7.8-alpha" +version = "0.7.8" repositories { // Use Maven Central for resolving dependencies. diff --git a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt index fb2ae70..203bbcb 100644 --- a/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt +++ b/src/main/kotlin/com/cultureamp/kafka/connect/plugins/transforms/RedShiftComplexDataTypeTransformer.kt @@ -71,13 +71,13 @@ class RedShiftComplexDataTypeTransformer> : Transformation< } } - private fun convertFieldSchema(orig: Schema, optional: Boolean, defaultFromParent: Any?, complexType: Boolean = false): Schema { + private fun convertFieldSchema(orig: Schema, optional: Boolean, defaultFromParent: Any?): Schema { // Note that we don't use the schema translation cache here. It might save us a bit of effort, but we really // only care about caching top-level schema translations. val builder = SchemaUtil.copySchemaBasics(orig) if (optional) builder.optional() - if (defaultFromParent != null && !complexType) + if (defaultFromParent != null) builder.defaultValue(defaultFromParent) return builder.build() } @@ -103,8 +103,9 @@ class RedShiftComplexDataTypeTransformer> : Transformation< Schema.Type.BOOLEAN -> newSchema.field(fieldName, convertFieldSchema(field.schema(), optional, fieldDefaultValue)) Schema.Type.STRING -> newSchema.field(fieldName, convertFieldSchema(field.schema(), optional, fieldDefaultValue)) Schema.Type.BYTES -> newSchema.field(fieldName, convertFieldSchema(field.schema(), optional, fieldDefaultValue)) - Schema.Type.ARRAY -> newSchema.field(fieldName, convertFieldSchema(SchemaBuilder.string().build(), optional, fieldDefaultValue, true)) - Schema.Type.MAP -> newSchema.field(fieldName, convertFieldSchema(SchemaBuilder.string().build(), optional, fieldDefaultValue, true)) + // ARRAY and MAP are converted to string, hence the default values are not set for these types + Schema.Type.ARRAY -> newSchema.field(fieldName, convertFieldSchema(SchemaBuilder.string().build(), optional, null)) + Schema.Type.MAP -> newSchema.field(fieldName, convertFieldSchema(SchemaBuilder.string().build(), optional, null)) Schema.Type.STRUCT -> buildUpdatedSchema(field.schema(), fieldName, newSchema, optional) else -> throw DataException( "Flatten transformation does not support " + field.schema().type() +