diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java
new file mode 100644
index 0000000000..9092c96f35
--- /dev/null
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.kafka.auth;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.ComputeEngineCredentials;
+import com.google.auth.oauth2.ExternalAccountCredentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.secured.BasicOAuthBearerToken;
+
+/**
+ * A callback handler that provides a Google OAuth token to a Kafka client.
+ *
+ * This callback handler is used by the Kafka client to authenticate to a Google's Kafka server
+ * using OAuth.
+ */
+public class GcpLoginCallbackHandler implements AuthenticateCallbackHandler {
+ public static final String JWT_SUBJECT_CLAIM = "sub";
+ public static final String JWT_ISSUED_AT_CLAIM = "iat";
+ public static final String JWT_SCOPE_CLAIM = "scope";
+ public static final String JWT_EXP_CLAIM = "exp";
+
+ /** A stub Google credentials class that exposes the account name. Used only for testing. */
+ public abstract static class StubGoogleCredentials extends GoogleCredentials {
+ abstract String getAccount();
+ }
+
+ public static final String GOOGLE_CLOUD_PLATFORM_SCOPE =
+ "https://www.googleapis.com/auth/cloud-platform";
+ private static final String HEADER =
+ new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_OAUTH2_TOKEN"));
+
+ private boolean configured = false;
+ private final GoogleCredentials credentials;
+
+ public GcpLoginCallbackHandler() {
+ try {
+ this.credentials =
+ GoogleCredentials.getApplicationDefault().createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to create Google credentials", e);
+ }
+ }
+
+ @VisibleForTesting
+ GcpLoginCallbackHandler(GoogleCredentials credentials) {
+ this.credentials = credentials;
+ }
+
+ @Override
+ public void configure(
+ Map configs, String saslMechanism, List jaasConfigEntries) {
+ if (!Objects.equals(saslMechanism, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) {
+ throw new IllegalArgumentException(
+ String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ }
+ configured = true;
+ }
+
+ private boolean isConfigured() {
+ return configured;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
+ if (!isConfigured()) {
+ throw new IllegalStateException("Callback handler not configured");
+ }
+
+ for (Callback callback : callbacks) {
+ if (callback instanceof OAuthBearerTokenCallback) {
+ handleTokenCallback((OAuthBearerTokenCallback) callback);
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+
+ private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
+ String subject = "";
+ // The following credentials are the ones that support the getAccount() or similar method to
+ // obtain the principal name. Namely, the ones that can be obtained with two-legged
+ // authentication, which do not involve user authentication, such as service account
+ // credentials.
+ if (credentials instanceof ComputeEngineCredentials) {
+ subject = ((ComputeEngineCredentials) credentials).getAccount();
+ } else if (credentials instanceof ServiceAccountCredentials) {
+ subject = ((ServiceAccountCredentials) credentials).getClientEmail();
+ } else if (credentials instanceof ExternalAccountCredentials) {
+ subject = ((ExternalAccountCredentials) credentials).getServiceAccountEmail();
+ } else if (credentials instanceof ImpersonatedCredentials) {
+ subject = ((ImpersonatedCredentials) credentials).getAccount();
+ } else if (credentials instanceof StubGoogleCredentials) {
+ subject = ((StubGoogleCredentials) credentials).getAccount();
+ } else {
+ throw new IOException("Unknown credentials type: " + credentials.getClass().getName());
+ }
+ credentials.refreshIfExpired();
+ var googleAccessToken = credentials.getAccessToken();
+ String kafkaToken = getKafkaAccessToken(googleAccessToken, subject);
+
+ var now = Instant.now();
+ OAuthBearerToken token =
+ new BasicOAuthBearerToken(
+ kafkaToken,
+ ImmutableSet.of("kafka"),
+ googleAccessToken.getExpirationTime().toInstant().toEpochMilli(),
+ subject,
+ now.toEpochMilli());
+ callback.token(token);
+ }
+
+ private static String b64Encode(String data) {
+ return Base64.getUrlEncoder().withoutPadding().encodeToString(data.getBytes(UTF_8));
+ }
+
+ private static String getJwt(AccessToken token, String subject) {
+ return new Gson()
+ .toJson(
+ ImmutableMap.of(
+ JWT_EXP_CLAIM,
+ token.getExpirationTime().toInstant().getEpochSecond(),
+ JWT_ISSUED_AT_CLAIM,
+ Instant.now().getEpochSecond(),
+ JWT_SCOPE_CLAIM,
+ "kafka",
+ JWT_SUBJECT_CLAIM,
+ subject));
+ }
+
+ private static String getKafkaAccessToken(AccessToken token, String subject) {
+ return String.join(
+ ".",
+ b64Encode(HEADER),
+ b64Encode(getJwt(token, subject)),
+ b64Encode(token.getTokenValue()));
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java
new file mode 100644
index 0000000000..b4d3f1ec94
--- /dev/null
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.teleport.v2.kafka.auth;
diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java
index 4e07ad6315..47d3550d9c 100644
--- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java
@@ -35,8 +35,9 @@ final class Offset {
order = 1,
name = "readBootstrapServerAndTopic",
groupName = "Source",
- description = "Source Kafka Topic",
- helpText = "Kafka Topic to read the input from.")
+ description = "Source Kafka Bootstrap server and topic",
+ helpText = "Kafka Bootstrap server and topic to read the input from.",
+ example = "localhost:9092;topic1,topic2")
String getReadBootstrapServerAndTopic();
void setReadBootstrapServerAndTopic(String value);
@@ -93,17 +94,26 @@ final class Offset {
name = "kafkaReadAuthenticationMode",
groupName = "Source",
enumOptions = {
+ @TemplateParameter.TemplateEnumOption(
+ KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Kafka Source Authentication Mode",
helpText =
- "The mode of authentication to use with the Kafka cluster. "
- + "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, "
- + "and TLS for certificate-based authentication. "
- + "Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.")
- @Default.String(KafkaAuthenticationMethod.SASL_PLAIN)
+ ("The mode of authentication to use with the Kafka cluster. "
+ + "Use "
+ + KafkaAuthenticationMethod.NONE
+ + " for no authentication, "
+ + KafkaAuthenticationMethod.SASL_PLAIN
+ + " for SASL/PLAIN username and password, "
+ + KafkaAuthenticationMethod.TLS
+ + "for certificate-based authentication. "
+ + KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
+ + " should be used only for Google Cloud Apache Kafka for BigQuery cluster since "
+ + "This allow you to authenticate with Google Cloud Apache Kafka for BigQuery using application default credentials"))
+ @Default.String(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)
String getKafkaReadAuthenticationMode();
void setKafkaReadAuthenticationMode(String value);
diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java
index 77d5ccab8a..dd29c514ff 100644
--- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java
@@ -29,15 +29,11 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class KafkaRecordToGenericRecordFailsafeElementFn
extends DoFn<
KafkaRecord, FailsafeElement, GenericRecord>>
implements Serializable {
- private static final Logger LOG =
- LoggerFactory.getLogger(KafkaRecordToGenericRecordFailsafeElementFn.class);
private transient KafkaAvroDeserializer kafkaDeserializer;
private transient BinaryAvroDeserializer binaryDeserializer;
@@ -45,7 +41,7 @@ public class KafkaRecordToGenericRecordFailsafeElementFn
// Flexible options for schema and encoding configuration
private Schema schema;
- private String topicName = "fake_topic";
+ private final String topicName = "fake_topic";
private String schemaRegistryConnectionUrl;
private Map schemaRegistrySslConfig;
private String messageFormat; // "AVRO_BINARY_ENCODING" or "AVRO_CONFLUENT_WIRE_FORMAT"
@@ -103,9 +99,9 @@ public void processElement(ProcessContext context) {
kafkaDeserializer.deserialize(
element.getTopic(), element.getHeaders(), element.getKV().getValue());
}
+ context.output(FailsafeElement.of(element, result));
} catch (Exception e) {
- LOG.error("Failed during deserialization: " + e.toString());
+ new RuntimeException("Failed during deserialization: " + e.toString());
}
- context.output(FailsafeElement.of(element, result));
}
}
diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java
index d72d8bd542..fe8516243a 100644
--- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java
@@ -133,6 +133,15 @@ private static Map from(
+ " password=\'"
+ SecretManagerUtils.getSecret(passwordSecretId)
+ "\';");
+ } else if (authMode.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+ properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ properties.put(
+ SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ "com.google.cloud.teleport.v2.kafka.auth.GcpLoginCallbackHandler");
+ properties.put(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
} else {
throw new RuntimeException("Authentication method not supported: " + authMode);
}
diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java
index 8b715a0093..281d7f8507 100644
--- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java
+++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java
@@ -22,4 +22,5 @@ public class KafkaAuthenticationMethod {
public static final String NONE = "NONE";
public static final String TLS = "TLS";
public static final String SASL_PLAIN = "SASL_PLAIN";
+ public static final String APPLICATION_DEFAULT_CREDENTIALS = "APPLICATION_DEFAULT_CREDENTIALS";
}