Skip to content

Commit

Permalink
Merge pull request #1649 from AnandInguva:oauth
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 644485735
  • Loading branch information
cloud-teleport committed Jun 18, 2024
2 parents b44c940 + cc6ce1a commit 3f76534
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 15 deletions.
2 changes: 1 addition & 1 deletion v2/kafka-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<kafka-clients.version>2.6.3</kafka-clients.version>
<kafka-clients.version>3.7.0</kafka-clients.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.kafka.auth;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import java.io.IOException;
import java.time.Instant;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.kafka.common.security.oauthbearer.internals.secured.BasicOAuthBearerToken;

/**
* A callback handler that provides a Google OAuth token to a Kafka client.
*
* <p>This callback handler is used by the Kafka client to authenticate to a Google's Kafka server
* using OAuth.
*/
public class GcpLoginCallbackHandler implements AuthenticateCallbackHandler {
public static final String JWT_SUBJECT_CLAIM = "sub";
public static final String JWT_ISSUED_AT_CLAIM = "iat";
public static final String JWT_SCOPE_CLAIM = "scope";
public static final String JWT_EXP_CLAIM = "exp";

/** A stub Google credentials class that exposes the account name. Used only for testing. */
public abstract static class StubGoogleCredentials extends GoogleCredentials {
abstract String getAccount();
}

public static final String GOOGLE_CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
private static final String HEADER =
new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_OAUTH2_TOKEN"));

private boolean configured = false;
private final GoogleCredentials credentials;

public GcpLoginCallbackHandler() {
try {
this.credentials =
GoogleCredentials.getApplicationDefault().createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
} catch (IOException e) {
throw new IllegalStateException("Failed to create Google credentials", e);
}
}

@VisibleForTesting
GcpLoginCallbackHandler(GoogleCredentials credentials) {
this.credentials = credentials;
}

@Override
public void configure(
Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!Objects.equals(saslMechanism, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) {
throw new IllegalArgumentException(
String.format("Unexpected SASL mechanism: %s", saslMechanism));
}
configured = true;
}

private boolean isConfigured() {
return configured;
}

@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
if (!isConfigured()) {
throw new IllegalStateException("Callback handler not configured");
}

for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
handleTokenCallback((OAuthBearerTokenCallback) callback);
} else {
throw new UnsupportedCallbackException(callback);
}
}
}

private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
String subject = "";
// The following credentials are the ones that support the getAccount() or similar method to
// obtain the principal name. Namely, the ones that can be obtained with two-legged
// authentication, which do not involve user authentication, such as service account
// credentials.
if (credentials instanceof ComputeEngineCredentials) {
subject = ((ComputeEngineCredentials) credentials).getAccount();
} else if (credentials instanceof ServiceAccountCredentials) {
subject = ((ServiceAccountCredentials) credentials).getClientEmail();
} else if (credentials instanceof ExternalAccountCredentials) {
subject = ((ExternalAccountCredentials) credentials).getServiceAccountEmail();
} else if (credentials instanceof ImpersonatedCredentials) {
subject = ((ImpersonatedCredentials) credentials).getAccount();
} else if (credentials instanceof StubGoogleCredentials) {
subject = ((StubGoogleCredentials) credentials).getAccount();
} else {
throw new IOException("Unknown credentials type: " + credentials.getClass().getName());
}
credentials.refreshIfExpired();
var googleAccessToken = credentials.getAccessToken();
String kafkaToken = getKafkaAccessToken(googleAccessToken, subject);

var now = Instant.now();
OAuthBearerToken token =
new BasicOAuthBearerToken(
kafkaToken,
ImmutableSet.of("kafka"),
googleAccessToken.getExpirationTime().toInstant().toEpochMilli(),
subject,
now.toEpochMilli());
callback.token(token);
}

private static String b64Encode(String data) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(data.getBytes(UTF_8));
}

private static String getJwt(AccessToken token, String subject) {
return new Gson()
.toJson(
ImmutableMap.of(
JWT_EXP_CLAIM,
token.getExpirationTime().toInstant().getEpochSecond(),
JWT_ISSUED_AT_CLAIM,
Instant.now().getEpochSecond(),
JWT_SCOPE_CLAIM,
"kafka",
JWT_SUBJECT_CLAIM,
subject));
}

private static String getKafkaAccessToken(AccessToken token, String subject) {
return String.join(
".",
b64Encode(HEADER),
b64Encode(getJwt(token, subject)),
b64Encode(token.getTokenValue()));
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.teleport.v2.kafka.auth;
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ final class Offset {
order = 1,
name = "readBootstrapServerAndTopic",
groupName = "Source",
description = "Source Kafka Topic",
helpText = "Kafka Topic to read the input from.")
description = "Source Kafka Bootstrap server and topic",
helpText = "Kafka Bootstrap server and topic to read the input from.",
example = "localhost:9092;topic1,topic2")
String getReadBootstrapServerAndTopic();

void setReadBootstrapServerAndTopic(String value);
Expand Down Expand Up @@ -93,17 +94,26 @@ final class Offset {
name = "kafkaReadAuthenticationMode",
groupName = "Source",
enumOptions = {
@TemplateParameter.TemplateEnumOption(
KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Kafka Source Authentication Mode",
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, "
+ "and TLS for certificate-based authentication. "
+ "Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.")
@Default.String(KafkaAuthenticationMethod.SASL_PLAIN)
("The mode of authentication to use with the Kafka cluster. "
+ "Use "
+ KafkaAuthenticationMethod.NONE
+ " for no authentication, "
+ KafkaAuthenticationMethod.SASL_PLAIN
+ " for SASL/PLAIN username and password, "
+ KafkaAuthenticationMethod.TLS
+ "for certificate-based authentication. "
+ KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
+ " should be used only for Google Cloud Apache Kafka for BigQuery cluster since "
+ "This allow you to authenticate with Google Cloud Apache Kafka for BigQuery using application default credentials"))
@Default.String(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)
String getKafkaReadAuthenticationMode();

void setKafkaReadAuthenticationMode(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,19 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordToGenericRecordFailsafeElementFn
extends DoFn<
KafkaRecord<byte[], byte[]>, FailsafeElement<KafkaRecord<byte[], byte[]>, GenericRecord>>
implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaRecordToGenericRecordFailsafeElementFn.class);

private transient KafkaAvroDeserializer kafkaDeserializer;
private transient BinaryAvroDeserializer binaryDeserializer;
private transient SchemaRegistryClient schemaRegistryClient;

// Flexible options for schema and encoding configuration
private Schema schema;
private String topicName = "fake_topic";
private final String topicName = "fake_topic";
private String schemaRegistryConnectionUrl;
private Map<String, Object> schemaRegistrySslConfig;
private String messageFormat; // "AVRO_BINARY_ENCODING" or "AVRO_CONFLUENT_WIRE_FORMAT"
Expand Down Expand Up @@ -103,9 +99,9 @@ public void processElement(ProcessContext context) {
kafkaDeserializer.deserialize(
element.getTopic(), element.getHeaders(), element.getKV().getValue());
}
context.output(FailsafeElement.of(element, result));
} catch (Exception e) {
LOG.error("Failed during deserialization: " + e.toString());
new RuntimeException("Failed during deserialization: " + e.toString());
}
context.output(FailsafeElement.of(element, result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ private static Map<String, Object> from(
+ " password=\'"
+ SecretManagerUtils.getSecret(passwordSecretId)
+ "\';");
} else if (authMode.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
properties.put(
SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
"com.google.cloud.teleport.v2.kafka.auth.GcpLoginCallbackHandler");
properties.put(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
} else {
throw new RuntimeException("Authentication method not supported: " + authMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public class KafkaAuthenticationMethod {
public static final String NONE = "NONE";
public static final String TLS = "TLS";
public static final String SASL_PLAIN = "SASL_PLAIN";
public static final String APPLICATION_DEFAULT_CREDENTIALS = "APPLICATION_DEFAULT_CREDENTIALS";
}

0 comments on commit 3f76534

Please sign in to comment.