Skip to content

Commit

Permalink
HADOOP-19207: [ABFS][FNSOverBlob] Response Handling of Blob Endpoint …
Browse files Browse the repository at this point in the history
…APIs and Metadata APIs (#7210)

Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 authored Dec 26, 2024
1 parent d5b836b commit 7e67358
Show file tree
Hide file tree
Showing 32 changed files with 2,351 additions and 421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ public class AbfsConfiguration{
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

/**
* Constructor for AbfsConfiguration for specified service type.
* @param rawConfig used to initialize the configuration.
* @param accountName the name of the azure storage account.
* @param fsConfiguredServiceType service type configured for the file system.
* @throws IllegalAccessException if the field is not accessible.
* @throws IOException if an I/O error occurs.
*/
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
AbfsServiceType fsConfiguredServiceType)
Expand Down Expand Up @@ -445,6 +453,13 @@ public AbfsConfiguration(final Configuration rawConfig,
}
}

/**
* Constructor for AbfsConfiguration for default service type i.e. DFS.
* @param rawConfig used to initialize the configuration.
* @param accountName the name of the azure storage account.
* @throws IllegalAccessException if the field is not accessible.
* @throws IOException if an I/O error occurs.
*/
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, IOException {
this(rawConfig, accountName, AbfsServiceType.DFS);
Expand All @@ -470,7 +485,7 @@ public Trilean getIsNamespaceEnabledAccount() {
* @return the service type.
*/
public AbfsServiceType getFsConfiguredServiceType() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
}

/**
Expand All @@ -479,7 +494,7 @@ public AbfsServiceType getFsConfiguredServiceType() {
* @return the service type.
*/
public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
}

/**
Expand All @@ -488,7 +503,7 @@ public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
* @return the service type.
*/
public AbfsServiceType getIngressServiceType() {
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
return getCaseInsensitiveEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
}

/**
Expand All @@ -515,7 +530,7 @@ public void validateConfiguredServiceType(boolean isHNSEnabled)
}
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account");
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for HNS Account");
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
Expand Down Expand Up @@ -712,6 +727,28 @@ public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
rawConfig.getEnum(name, defaultValue));
}

/**
* Returns the account-specific enum value if it exists, then
* looks for an account-agnostic value in case-insensitive manner.
* @param name Account-agnostic configuration key
* @param defaultValue Value returned if none is configured
* @param <T> Enum type
* @return enum value if one exists, else null
*/
public <T extends Enum<T>> T getCaseInsensitiveEnum(String name, T defaultValue) {
String configValue = getString(name, null);
if (configValue != null) {
for (T enumConstant : defaultValue.getDeclaringClass().getEnumConstants()) { // Step 3: Iterate over enum constants
if (enumConstant.name().equalsIgnoreCase(configValue)) {
return enumConstant;
}
}
// No match found
throw new IllegalArgumentException("No enum constant " + defaultValue.getDeclaringClass().getCanonicalName() + "." + configValue);
}
return defaultValue;
}

/**
* Returns the account-agnostic enum value if it exists, else
* return default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
Expand Down Expand Up @@ -215,16 +216,16 @@ public void initialize(URI uri, Configuration configuration)
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
this.setWorkingDirectory(this.getHomeDirectory());

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
TracingContext initFSTracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.INIT, tracingHeaderFormat, listener);

/*
* Validate the service type configured in the URI is valid for account type used.
* HNS Account Cannot have Blob Endpoint URI.
*/
try {
abfsConfiguration.validateConfiguredServiceType(
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
tryGetIsNamespaceEnabled(initFSTracingContext));
} catch (InvalidConfigurationValueException ex) {
LOG.debug("File system configured with Invalid Service Type", ex);
throw ex;
Expand All @@ -233,34 +234,39 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
* two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
*/
try {
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
abfsConfiguration)) && !tryGetIsNamespaceEnabled(new TracingContext(
initFSTracingContext))) {
throw new PathIOException(uri.getPath(),
CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
}
} catch (InvalidConfigurationValueException ex) {
LOG.debug("Non-Hierarchical Namespace Accounts Cannot Have CPK Enabled", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Failed to determine account type for service type validation", ex);
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
TracingContext createFSTracingContext = new TracingContext(initFSTracingContext);
createFSTracingContext.setOperation(CREATE_FILESYSTEM);
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), createFSTracingContext) == null) {
try {
this.createFileSystem(tracingContext);
this.createFileSystem(createFSTracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
}
}
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
* two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
*/
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
abfsConfiguration))
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
/*
* Close the filesystem gracefully before throwing exception. Graceful close
* will ensure that all resources are released properly.
*/
close();
throw new PathIOException(uri.getPath(),
CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
}

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
Expand Down Expand Up @@ -700,7 +706,7 @@ private void incrementStatistic(AbfsStatistic statistic) {
private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
while (!path.isRoot()) {
String pathToString = path.toString();
if (pathToString.length() != 0) {
if (!pathToString.isEmpty()) {
if (pathToString.charAt(pathToString.length() - 1) == '.') {
throw new IllegalArgumentException(
"ABFS does not allow files or directories to end with a dot.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
Expand Down Expand Up @@ -343,11 +344,13 @@ public void close() throws IOException {
}

byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
return value.getBytes(XMS_PROPERTIES_ENCODING);
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
return getClient().encodeAttribute(value);
}

String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
return new String(value, XMS_PROPERTIES_ENCODING);
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
return getClient().decodeAttribute(value);
}

private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
Expand Down Expand Up @@ -485,9 +488,8 @@ public Hashtable<String, String> getFilesystemProperties(
.getFilesystemProperties(tracingContext);
perfInfo.registerResult(op.getResult());

final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);

parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
// Handling difference in request headers formats between DFS and Blob Clients.
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);

return parsedXmsProperties;
Expand Down Expand Up @@ -533,10 +535,8 @@ public Hashtable<String, String> getPathStatus(final Path path,
perfInfo.registerResult(op.getResult());
contextEncryptionAdapter.destroy();

final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);

parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);

// Handling difference in request headers formats between DFS and Blob Clients.
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);

return parsedXmsProperties;
Expand Down Expand Up @@ -899,10 +899,8 @@ public AbfsInputStream openFileForRead(Path path,
} else {
AbfsHttpOperation op = getClient().getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
contentLength = extractContentLength(op);
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
/*
* For file created with ENCRYPTION_CONTEXT, client shall receive
Expand Down Expand Up @@ -983,17 +981,15 @@ public OutputStream openFileForWrite(final Path path,
.getPathStatus(relativePath, false, tracingContext, null);
perfInfo.registerResult(op.getResult());

final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));

if (parseIsDirectory(resourceType)) {
if (getClient().checkIsDir(op.getResult())) {
throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
"openFileForWrite must be used with files and not directories",
null);
}

final long contentLength = extractContentLength(op.getResult());
final long offset = overwrite ? 0 : contentLength;

perfInfo.registerSuccess(true);
Expand Down Expand Up @@ -1180,8 +1176,8 @@ public FileStatus getFileStatus(final Path path,
contentLength = 0;
resourceIsDir = true;
} else {
contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
contentLength = extractContentLength(result);
resourceIsDir = getClient().checkIsDir(result);
}

final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
Expand Down Expand Up @@ -1256,10 +1252,16 @@ public String listStatus(final Path path, final String startFrom,
startFrom);

final String relativePath = getRelativePath(path);
AbfsClient listingClient = getClient();

if (continuation == null || continuation.isEmpty()) {
// generate continuation token if a valid startFrom is provided.
if (startFrom != null && !startFrom.isEmpty()) {
/*
* Blob Endpoint Does not support startFrom yet. Fallback to DFS Client.
* startFrom remains null for all HDFS APIs. This is only for internal use.
*/
listingClient = getClient(AbfsServiceType.DFS);
continuation = getIsNamespaceEnabled(tracingContext)
? generateContinuationTokenForXns(startFrom)
: generateContinuationTokenForNonXns(relativePath, startFrom);
Expand All @@ -1268,11 +1270,11 @@ public String listStatus(final Path path, final String startFrom,

do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
AbfsRestOperation op = getClient().listPath(relativePath, false,
AbfsRestOperation op = listingClient.listPath(relativePath, false,
abfsConfiguration.getListMaxResults(), continuation,
tracingContext);
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
continuation = listingClient.getContinuationFromResponse(op.getResult());
ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
if (retrievedSchema == null) {
throw new AbfsRestOperationException(
Expand Down Expand Up @@ -1465,7 +1467,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec,
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, useUpn, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String eTag = extractEtagHeader(op.getResult());

final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));

Expand Down Expand Up @@ -1508,7 +1510,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec,
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, isUpnFormat, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String eTag = extractEtagHeader(op.getResult());

final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));

Expand Down Expand Up @@ -1546,7 +1548,7 @@ public void removeDefaultAcl(final Path path, TracingContext tracingContext)
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> defaultAclEntries = new HashMap<>();

Expand Down Expand Up @@ -1590,7 +1592,7 @@ public void removeAcl(final Path path, TracingContext tracingContext)
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String eTag = extractEtagHeader(op.getResult());

final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> newAclEntries = new HashMap<>();
Expand Down Expand Up @@ -1636,7 +1638,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec,
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, isUpnFormat, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
final String eTag = extractEtagHeader(op.getResult());

final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));

Expand Down Expand Up @@ -1859,12 +1861,24 @@ public String getRelativePath(final Path path) {
return relPath;
}

private long parseContentLength(final String contentLength) {
if (contentLength == null) {
return -1;
/**
* Extracts the content length from the HTTP operation's response headers.
*
* @param op The AbfsHttpOperation instance from which to extract the content length.
* This operation contains the HTTP response headers.
* @return The content length as a long value. If the Content-Length header is
* not present or is empty, returns 0.
*/
private long extractContentLength(AbfsHttpOperation op) {
long contentLength;
String contentLengthHeader = op.getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (!contentLengthHeader.equals(EMPTY_STRING)) {
contentLength = Long.parseLong(contentLengthHeader);
} else {
contentLength = 0;
}

return Long.parseLong(contentLength);
return contentLength;
}

private boolean parseIsDirectory(final String resourceType) {
Expand Down
Loading

0 comments on commit 7e67358

Please sign in to comment.