From 94182257234ee6733aea987449ec4bb5038ad021 Mon Sep 17 00:00:00 2001 From: Dippatel98 Date: Thu, 6 Jun 2024 18:44:47 +0000 Subject: [PATCH] Add TLS authentication support for Kafka Templates --- .../options/KafkaToBigQueryFlexOptions.java | 18 ------------- .../v2/templates/KafkaToBigQueryFlex.java | 9 +------ .../teleport/v2/templates/KafkaToGcsFlex.java | 27 +------------------ 3 files changed, 2 insertions(+), 52 deletions(-) diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java index 5612d56da2..52066c10d2 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java @@ -18,7 +18,6 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions; import com.google.cloud.teleport.v2.kafka.options.SchemaRegistryOptions; -import com.google.cloud.teleport.v2.kafka.values.KafkaAuthenticationMethod; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.Default; @@ -43,23 +42,6 @@ public interface KafkaToBigQueryFlexOptions void setReadBootstrapServerAndTopic(String value); - @TemplateParameter.Enum( - name = "kafkaReadAuthenticationMode", - order = 2, - groupName = "Source", - enumOptions = { - @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN), - @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE), - }, - description = "Kafka Read Authentication Mode", - helpText = - "The mode of authentication to use with the Kafka cluster. " - + "Use NONE for no authentication" - + " or SASL_PLAIN for SASL/PLAIN username and password. " - + " Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.") - @Default.String(KafkaAuthenticationMethod.SASL_PLAIN) - String getKafkaReadAuthenticationMode(); - @TemplateParameter.Boolean( order = 3, groupName = "Source", diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java index 89541a114d..982ed82680 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java @@ -106,14 +106,7 @@ "The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.", "The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format." }, - skipOptions = { - "useStorageWriteApi", - "kafkaReadKeystoreLocation", - "kafkaReadTruststoreLocation", - "kafkaReadTruststorePasswordSecretId", - "kafkaReadKeystorePasswordSecretId", - "kafkaReadKeyPasswordSecretId" - }) + skipOptions = {"useStorageWriteApi"}) public class KafkaToBigQueryFlex { /* Logger for class. */ diff --git a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java index 3d5da96153..b255e8e485 100644 --- a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java +++ b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java @@ -23,7 +23,6 @@ import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform; import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig; import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils; -import com.google.cloud.teleport.v2.kafka.values.KafkaAuthenticationMethod; import com.google.cloud.teleport.v2.transforms.WriteTransform; import java.util.HashMap; import java.util.List; @@ -48,14 +47,7 @@ optionsClass = KafkaToGcsFlex.KafkaToGcsOptions.class, flexContainerName = "kafka-to-gcs-flex", contactInformation = "https://cloud.google.com/support", - requirements = {"The output Google Cloud Storage directory must exist."}, - skipOptions = { - "kafkaReadKeystoreLocation", - "kafkaReadTruststoreLocation", - "kafkaReadTruststorePasswordSecretId", - "kafkaReadKeystorePasswordSecretId", - "kafkaReadKeyPasswordSecretId" - }) + requirements = {"The output Google Cloud Storage directory must exist."}) public class KafkaToGcsFlex { public interface KafkaToGcsOptions extends PipelineOptions, DataflowPipelineOptions, KafkaReadOptions, SchemaRegistryOptions { @@ -72,23 +64,6 @@ public interface KafkaToGcsOptions void setReadBootstrapServerAndTopic(String value); - @TemplateParameter.Enum( - name = "kafkaReadAuthenticationMode", - order = 19, - groupName = "Source", - enumOptions = { - @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN), - @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE), - }, - description = "Kafka Read Authentication Mode", - helpText = - "The mode of authentication to use with the Kafka cluster. " - + "Use NONE for no authentication and " - + "SASL_PLAIN for SASL/PLAIN username and password. " - + " Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.") - @Default.String(KafkaAuthenticationMethod.SASL_PLAIN) - String getKafkaReadAuthenticationMode(); - @TemplateParameter.Duration( order = 20, optional = true,