diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java index 0024f10cae..89541a114d 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; @@ -366,6 +368,16 @@ public static PipelineResult runJsonPipeline( String bootstrapServers, Map kafkaConfig) { + // Register the coder for pipeline + FailsafeElementCoder, String> coder = + FailsafeElementCoder.of( + KvCoder.of( + NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())), + NullableCoder.of(StringUtf8Coder.of())); + + CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder); + PCollectionTuple convertedTableRows; convertedTableRows = pipeline