From b6e04deb5f7a202591a2ec14c42104e06bef1623 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 18 Jun 2024 11:42:37 -0400 Subject: [PATCH 1/3] Fix return --- .../KafkaRecordToGenericRecordFailsafeElementFn.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java index 77d5ccab8a..d813f42a9b 100644 --- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java @@ -29,15 +29,11 @@ import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.transforms.DoFn; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KafkaRecordToGenericRecordFailsafeElementFn extends DoFn< KafkaRecord, FailsafeElement, GenericRecord>> implements Serializable { - private static final Logger LOG = - LoggerFactory.getLogger(KafkaRecordToGenericRecordFailsafeElementFn.class); private transient KafkaAvroDeserializer kafkaDeserializer; private transient BinaryAvroDeserializer binaryDeserializer; @@ -45,7 +41,7 @@ public class KafkaRecordToGenericRecordFailsafeElementFn // Flexible options for schema and encoding configuration private Schema schema; - private String topicName = "fake_topic"; + private final String topicName = "fake_topic"; private String schemaRegistryConnectionUrl; private Map schemaRegistrySslConfig; private String messageFormat; // "AVRO_BINARY_ENCODING" or "AVRO_CONFLUENT_WIRE_FORMAT" @@ -102,10 +98,10 @@ public void processElement(ProcessContext context) { (GenericRecord) kafkaDeserializer.deserialize( element.getTopic(), element.getHeaders(), element.getKV().getValue()); + context.output(FailsafeElement.of(element, result)); } } catch (Exception e) { - LOG.error("Failed during deserialization: " + e.toString()); + new RuntimeException("Failed during deserialization: " + e.toString()); } - context.output(FailsafeElement.of(element, result)); } } From 0351ab720fae4381112c836e6c24b2fea88f98b5 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 18 Jun 2024 12:20:14 -0400 Subject: [PATCH 2/3] Add OAuthBearer support --- v2/kafka-common/pom.xml | 2 +- .../kafka/auth/GcpLoginCallbackHandler.java | 175 ++++++++++++++++++ .../teleport/v2/kafka/auth/package-info.java | 17 ++ .../v2/kafka/options/KafkaReadOptions.java | 24 ++- .../teleport/v2/kafka/utils/KafkaConfig.java | 9 + .../values/KafkaAuthenticationMethod.java | 1 + 6 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java create mode 100644 v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java diff --git a/v2/kafka-common/pom.xml b/v2/kafka-common/pom.xml index 52371774fa..8eca70cb6c 100644 --- a/v2/kafka-common/pom.xml +++ b/v2/kafka-common/pom.xml @@ -25,7 +25,7 @@ 4.0.0 - 2.6.3 + 3.7.0 diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java new file mode 100644 index 0000000000..9092c96f35 --- /dev/null +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/GcpLoginCallbackHandler.java @@ -0,0 +1,175 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.kafka.auth; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.ComputeEngineCredentials; +import com.google.auth.oauth2.ExternalAccountCredentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.secured.BasicOAuthBearerToken; + +/** + * A callback handler that provides a Google OAuth token to a Kafka client. + * + *

This callback handler is used by the Kafka client to authenticate to a Google's Kafka server + * using OAuth. + */ +public class GcpLoginCallbackHandler implements AuthenticateCallbackHandler { + public static final String JWT_SUBJECT_CLAIM = "sub"; + public static final String JWT_ISSUED_AT_CLAIM = "iat"; + public static final String JWT_SCOPE_CLAIM = "scope"; + public static final String JWT_EXP_CLAIM = "exp"; + + /** A stub Google credentials class that exposes the account name. Used only for testing. */ + public abstract static class StubGoogleCredentials extends GoogleCredentials { + abstract String getAccount(); + } + + public static final String GOOGLE_CLOUD_PLATFORM_SCOPE = + "https://www.googleapis.com/auth/cloud-platform"; + private static final String HEADER = + new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_OAUTH2_TOKEN")); + + private boolean configured = false; + private final GoogleCredentials credentials; + + public GcpLoginCallbackHandler() { + try { + this.credentials = + GoogleCredentials.getApplicationDefault().createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE); + } catch (IOException e) { + throw new IllegalStateException("Failed to create Google credentials", e); + } + } + + @VisibleForTesting + GcpLoginCallbackHandler(GoogleCredentials credentials) { + this.credentials = credentials; + } + + @Override + public void configure( + Map configs, String saslMechanism, List jaasConfigEntries) { + if (!Objects.equals(saslMechanism, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) { + throw new IllegalArgumentException( + String.format("Unexpected SASL mechanism: %s", saslMechanism)); + } + configured = true; + } + + private boolean isConfigured() { + return configured; + } + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException { + if (!isConfigured()) { + throw new IllegalStateException("Callback handler not configured"); + } + + for (Callback callback : callbacks) { + if (callback instanceof OAuthBearerTokenCallback) { + handleTokenCallback((OAuthBearerTokenCallback) callback); + } else { + throw new UnsupportedCallbackException(callback); + } + } + } + + private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException { + String subject = ""; + // The following credentials are the ones that support the getAccount() or similar method to + // obtain the principal name. Namely, the ones that can be obtained with two-legged + // authentication, which do not involve user authentication, such as service account + // credentials. + if (credentials instanceof ComputeEngineCredentials) { + subject = ((ComputeEngineCredentials) credentials).getAccount(); + } else if (credentials instanceof ServiceAccountCredentials) { + subject = ((ServiceAccountCredentials) credentials).getClientEmail(); + } else if (credentials instanceof ExternalAccountCredentials) { + subject = ((ExternalAccountCredentials) credentials).getServiceAccountEmail(); + } else if (credentials instanceof ImpersonatedCredentials) { + subject = ((ImpersonatedCredentials) credentials).getAccount(); + } else if (credentials instanceof StubGoogleCredentials) { + subject = ((StubGoogleCredentials) credentials).getAccount(); + } else { + throw new IOException("Unknown credentials type: " + credentials.getClass().getName()); + } + credentials.refreshIfExpired(); + var googleAccessToken = credentials.getAccessToken(); + String kafkaToken = getKafkaAccessToken(googleAccessToken, subject); + + var now = Instant.now(); + OAuthBearerToken token = + new BasicOAuthBearerToken( + kafkaToken, + ImmutableSet.of("kafka"), + googleAccessToken.getExpirationTime().toInstant().toEpochMilli(), + subject, + now.toEpochMilli()); + callback.token(token); + } + + private static String b64Encode(String data) { + return Base64.getUrlEncoder().withoutPadding().encodeToString(data.getBytes(UTF_8)); + } + + private static String getJwt(AccessToken token, String subject) { + return new Gson() + .toJson( + ImmutableMap.of( + JWT_EXP_CLAIM, + token.getExpirationTime().toInstant().getEpochSecond(), + JWT_ISSUED_AT_CLAIM, + Instant.now().getEpochSecond(), + JWT_SCOPE_CLAIM, + "kafka", + JWT_SUBJECT_CLAIM, + subject)); + } + + private static String getKafkaAccessToken(AccessToken token, String subject) { + return String.join( + ".", + b64Encode(HEADER), + b64Encode(getJwt(token, subject)), + b64Encode(token.getTokenValue())); + } + + @Override + public void close() {} +} diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java new file mode 100644 index 0000000000..b4d3f1ec94 --- /dev/null +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/auth/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.teleport.v2.kafka.auth; diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java index 4e07ad6315..47d3550d9c 100644 --- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/options/KafkaReadOptions.java @@ -35,8 +35,9 @@ final class Offset { order = 1, name = "readBootstrapServerAndTopic", groupName = "Source", - description = "Source Kafka Topic", - helpText = "Kafka Topic to read the input from.") + description = "Source Kafka Bootstrap server and topic", + helpText = "Kafka Bootstrap server and topic to read the input from.", + example = "localhost:9092;topic1,topic2") String getReadBootstrapServerAndTopic(); void setReadBootstrapServerAndTopic(String value); @@ -93,17 +94,26 @@ final class Offset { name = "kafkaReadAuthenticationMode", groupName = "Source", enumOptions = { + @TemplateParameter.TemplateEnumOption( + KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS), @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN), @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS), @TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE), }, description = "Kafka Source Authentication Mode", helpText = - "The mode of authentication to use with the Kafka cluster. " - + "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, " - + "and TLS for certificate-based authentication. " - + "Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.") - @Default.String(KafkaAuthenticationMethod.SASL_PLAIN) + ("The mode of authentication to use with the Kafka cluster. " + + "Use " + + KafkaAuthenticationMethod.NONE + + " for no authentication, " + + KafkaAuthenticationMethod.SASL_PLAIN + + " for SASL/PLAIN username and password, " + + KafkaAuthenticationMethod.TLS + + "for certificate-based authentication. " + + KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS + + " should be used only for Google Cloud Apache Kafka for BigQuery cluster since " + + "This allow you to authenticate with Google Cloud Apache Kafka for BigQuery using application default credentials")) + @Default.String(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS) String getKafkaReadAuthenticationMode(); void setKafkaReadAuthenticationMode(String value); diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java index d72d8bd542..fe8516243a 100644 --- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java @@ -133,6 +133,15 @@ private static Map from( + " password=\'" + SecretManagerUtils.getSecret(passwordSecretId) + "\';"); + } else if (authMode.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + properties.put( + SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + "com.google.cloud.teleport.v2.kafka.auth.GcpLoginCallbackHandler"); + properties.put( + SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); } else { throw new RuntimeException("Authentication method not supported: " + authMode); } diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java index 8b715a0093..281d7f8507 100644 --- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/values/KafkaAuthenticationMethod.java @@ -22,4 +22,5 @@ public class KafkaAuthenticationMethod { public static final String NONE = "NONE"; public static final String TLS = "TLS"; public static final String SASL_PLAIN = "SASL_PLAIN"; + public static final String APPLICATION_DEFAULT_CREDENTIALS = "APPLICATION_DEFAULT_CREDENTIALS"; } From a8e99f038eaa09bf124f89d3bd1eb2c1b0309a57 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 18 Jun 2024 13:08:13 -0400 Subject: [PATCH 3/3] Fix return --- .../transforms/KafkaRecordToGenericRecordFailsafeElementFn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java index d813f42a9b..dd29c514ff 100644 --- a/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java +++ b/v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/transforms/KafkaRecordToGenericRecordFailsafeElementFn.java @@ -98,8 +98,8 @@ public void processElement(ProcessContext context) { (GenericRecord) kafkaDeserializer.deserialize( element.getTopic(), element.getHeaders(), element.getKV().getValue()); - context.output(FailsafeElement.of(element, result)); } + context.output(FailsafeElement.of(element, result)); } catch (Exception e) { new RuntimeException("Failed during deserialization: " + e.toString()); }