Skip to content

Commit

Permalink
HADOOP-18516: [ABFS][Authentication] Support Fixed SAS Token for ABFS…
Browse files Browse the repository at this point in the history
… Authentication (#6552)


Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 authored May 30, 2024
1 parent d00b3ac commit d8b485a
Show file tree
Hide file tree
Showing 14 changed files with 611 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Field;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.util.Preconditions;

Expand Down Expand Up @@ -1025,33 +1026,63 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
}
}

/**
* Returns the SASTokenProvider implementation to be used to generate SAS token.<br>
* Users can choose between a custom implementation of {@link SASTokenProvider}
* or an in house implementation {@link FixedSASTokenProvider}.<br>
* For Custom implementation "fs.azure.sas.token.provider.type" needs to be provided.<br>
* For Fixed SAS Token use "fs.azure.sas.fixed.token" needs to be provided.<br>
* In case both are provided, Preference will be given to Custom implementation.<br>
* Avoid using a custom tokenProvider implementation just to read the configured
* fixed token, as this could create confusion. Also,implementing the SASTokenProvider
* requires relying on the raw configurations. It is more stable to depend on
* the AbfsConfiguration with which a filesystem is initialized, and eliminate
* chances of dynamic modifications and spurious situations.<br>
* @return sasTokenProvider object based on configurations provided
* @throws AzureBlobFileSystemException
*/
public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType != AuthType.SAS) {
throw new SASTokenProviderException(String.format(
"Invalid auth type: %s is being used, expecting SAS", authType));
"Invalid auth type: %s is being used, expecting SAS.", authType));
}

try {
String configKey = FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
Class<? extends SASTokenProvider> sasTokenProviderClass =
getTokenProviderClass(authType, configKey, null,
SASTokenProvider.class);

Preconditions.checkArgument(sasTokenProviderClass != null,
String.format("The configuration value for \"%s\" is invalid.", configKey));

SASTokenProvider sasTokenProvider = ReflectionUtils
.newInstance(sasTokenProviderClass, rawConfig);
Preconditions.checkArgument(sasTokenProvider != null,
String.format("Failed to initialize %s", sasTokenProviderClass));

LOG.trace("Initializing {}", sasTokenProviderClass.getName());
sasTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", sasTokenProviderClass.getName());
return sasTokenProvider;
Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
null, SASTokenProvider.class);
String configuredFixedToken = this.getTrimmedPasswordString(FS_AZURE_SAS_FIXED_TOKEN, EMPTY_STRING);

if (customSasTokenProviderImplementation == null && configuredFixedToken.isEmpty()) {
throw new SASTokenProviderException(String.format(
"At least one of the \"%s\" and \"%s\" must be set.",
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, FS_AZURE_SAS_FIXED_TOKEN));
}

// Prefer Custom SASTokenProvider Implementation if configured.
if (customSasTokenProviderImplementation != null) {
LOG.trace("Using Custom SASTokenProvider implementation because it is given precedence when it is set.");
SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
customSasTokenProviderImplementation, rawConfig);
if (sasTokenProvider == null) {
throw new SASTokenProviderException(String.format(
"Failed to initialize %s", customSasTokenProviderImplementation));
}
LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName());
sasTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", customSasTokenProviderImplementation.getName());
return sasTokenProvider;
} else {
LOG.trace("Using FixedSASTokenProvider implementation");
FixedSASTokenProvider fixedSASTokenProvider = new FixedSASTokenProvider(configuredFixedToken);
return fixedSASTokenProvider;
}
} catch (SASTokenProviderException e) {
throw e;
} catch (Exception e) {
throw new TokenAccessProviderException("Unable to load SAS token provider class: " + e, e);
throw new SASTokenProviderException(
"Unable to load SAS token provider class: " + e, e);
}
}

Expand All @@ -1064,14 +1095,14 @@ public EncryptionContextProvider createEncryptionContextProvider() {
Class<? extends EncryptionContextProvider> encryptionContextClass =
getAccountSpecificClass(configKey, null,
EncryptionContextProvider.class);
Preconditions.checkArgument(encryptionContextClass != null, String.format(
Preconditions.checkArgument(encryptionContextClass != null,
"The configuration value for %s is invalid, or config key is not account-specific",
configKey));
configKey);

EncryptionContextProvider encryptionContextProvider =
ReflectionUtils.newInstance(encryptionContextClass, rawConfig);
Preconditions.checkArgument(encryptionContextProvider != null,
String.format("Failed to initialize %s", encryptionContextClass));
"Failed to initialize %s", encryptionContextClass);

LOG.trace("{} init complete", encryptionContextClass.getName());
return encryptionContextProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,10 +1302,9 @@ public void access(final Path path, final FsAction mode) throws IOException {

/**
* Incrementing exists() calls from superclass for statistic collection.
*
* @param f source path.
* @return true if the path exists.
* @throws IOException
* @throws IOException if some issue in checking path.
*/
@Override
public boolean exists(Path f) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ private void initializeClient(URI uri, String fileSystemName,
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
abfsConfiguration.getStorageAccountKey());
} else if (authType == AuthType.SAS) {
LOG.trace("Fetching SAS token provider");
LOG.trace("Fetching SAS Token Provider");
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
} else {
LOG.trace("Fetching token provider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,10 @@ public static String accountProperty(String property, String account) {
public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = "fs.azure.enable.delegation.token";
public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = "fs.azure.delegation.token.provider.type";

/** Key for SAS token provider **/
/** Key for fixed SAS token: {@value}. **/
public static final String FS_AZURE_SAS_FIXED_TOKEN = "fs.azure.sas.fixed.token";

/** Key for SAS token provider: {@value}. **/
public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type";

/** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ public AbfsRestOperation flush(final String path, final long position,
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));

// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
Expand Down Expand Up @@ -1160,6 +1161,7 @@ public AbfsRestOperation read(final String path,
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
Expand Down Expand Up @@ -1471,16 +1473,17 @@ private String appendSASTokenToQuery(String path,
sasToken = cachedSasToken;
LOG.trace("Using cached SAS token.");
}

// if SAS Token contains a prefix of ?, it should be removed
if (sasToken.charAt(0) == '?') {
sasToken = sasToken.substring(1);
}

queryBuilder.setSASToken(sasToken);
LOG.trace("SAS token fetch complete for {} on {}", operation, path);
} catch (Exception ex) {
throw new SASTokenProviderException(String.format("Failed to acquire a SAS token for %s on %s due to %s",
operation,
path,
throw new SASTokenProviderException(String.format(
"Failed to acquire a SAS token for %s on %s due to %s", operation, path,
ex.toString()));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;

/**
* In house implementation of {@link SASTokenProvider} to use a fixed SAS token with ABFS.
* Use this to avoid implementing a Custom Token Provider just to return fixed SAS.
* Fixed SAS Token to be provided using the config "fs.azure.sas.fixed.token".
*/
public class FixedSASTokenProvider implements SASTokenProvider {
private String fixedSASToken;

public FixedSASTokenProvider(final String fixedSASToken) throws SASTokenProviderException {
this.fixedSASToken = fixedSASToken;
if (fixedSASToken == null || fixedSASToken.isEmpty()) {
throw new SASTokenProviderException(
String.format("Configured Fixed SAS Token is Invalid: %s", fixedSASToken));
}
}

@Override
public void initialize(final Configuration configuration,
final String accountName)
throws IOException {
}

/**
* Returns the fixed SAS Token configured.
* @param account the name of the storage account.
* @param fileSystem the name of the fileSystem.
* @param path the file or directory path.
* @param operation the operation to be performed on the path.
* @return Fixed SAS Token
* @throws IOException never
*/
@Override
public String getSASToken(final String account,
final String fileSystem,
final String path,
final String operation) throws IOException {
return fixedSASToken;
}
}
Loading

0 comments on commit d8b485a

Please sign in to comment.