Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19233: ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint #7265

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,30 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS)
private long blobCopyProgressPollWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION, DefaultValue = DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION)
private long blobAtomicRenameLeaseRefreshDuration;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;
Expand Down Expand Up @@ -1508,4 +1532,28 @@ public boolean getIsChecksumValidationEnabled() {
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}

public long getAtomicRenameLeaseRefreshDuration() {
return blobAtomicRenameLeaseRefreshDuration;
}

public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ public boolean rename(final Path src, final Path dst) throws IOException {
src,
ex,
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
Expand Down Expand Up @@ -649,7 +651,10 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
abfsStore.delete(qualifiedPath, recursive, tracingContext);
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
checkException(f,
ex,
AzureServiceErrorCode.PATH_NOT_FOUND,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;

Expand Down Expand Up @@ -188,7 +189,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private final Map<AbfsLease, Object> leaseRefs;

private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private Set<String> azureInfiniteLeaseDirSet;
private volatile Trilean isNamespaceEnabled;
private final AuthType authType;
Expand Down Expand Up @@ -256,8 +256,6 @@ public AzureBlobFileSystemStore(
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);

this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
Expand Down Expand Up @@ -745,8 +743,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
Expand Down Expand Up @@ -825,6 +822,16 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
Expand Down Expand Up @@ -1063,11 +1070,6 @@ public boolean rename(final Path source,
long countAggregate = 0;
boolean shouldContinue;

if (isAtomicRenameKey(source.getName())) {
LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
+" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
}

LOG.debug("renameAsync filesystem: {} source: {} destination: {}",
getClient().getFileSystem(),
source,
Expand All @@ -1089,8 +1091,17 @@ public boolean rename(final Path source,
isNamespaceEnabled);

AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
/*
* Blob endpoint does not have a rename API. The AbfsBlobClient would
* perform the copy and delete operation for renaming a path.
* As it would not be one operation, hence, the client would not return
* AbfsRestOperation object.
*/
if (op != null) {
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
}
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
Expand Down Expand Up @@ -1124,8 +1135,16 @@ public void delete(final Path path, final boolean recursive,
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = getClient().deletePath(relativePath, recursive,
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
/*
* Blob endpoint does not have a directory delete API. The AbfsBlobClient would
* perform multiple operation to delete a path, hence, the client would not return
* AbfsRestOperation object.
*/
if (op != null) {
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
}
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
Expand Down Expand Up @@ -1722,10 +1741,6 @@ public void access(final Path path, final FsAction mode,
}
}

public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}

public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
Expand Down Expand Up @@ -1924,7 +1939,7 @@ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsPro
return properties;
}

private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
public static boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
for (String dir : dirSet) {
if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
return true;
Expand Down Expand Up @@ -2203,7 +2218,8 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(getClient(), relativePath, tracingContext);
AbfsLease lease = new AbfsLease(getClient(), relativePath, true,
INFINITE_LEASE_DURATION, null, tracingContext);
leaseRefs.put(lease, null);
return lease;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,5 +290,13 @@ public static ApiVersion getCurrentVersion() {
public static final String JDK_FALLBACK = "JDK_fallback";
public static final String KEEP_ALIVE_CACHE_CLOSED = "KeepAliveCache is closed";

public static final String COPY_STATUS_SUCCESS = "success";
public static final String COPY_STATUS_PENDING = "pending";
public static final String COPY_STATUS_ABORTED = "aborted";
public static final String COPY_STATUS_FAILED = "failed";

public static final String ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION =
"Path had to be recovered from atomic rename operation.";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,5 +359,27 @@ public static String accountProperty(String property, String account) {
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE = "fs.azure.apache.http.client.max.cache.connection.size";
/**Maximum idle time for a ApacheHttpClient-connection: {@value}*/
public static final String FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL = "fs.azure.apache.http.client.idle.connection.ttl";

/**
* Blob copy API is an async API, this configuration defines polling duration
* for checking copy status {@value}
*/
public static final String FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = "fs.azure.blob.copy.progress.wait.millis";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the comments formatting constant

/**Blob rename lease refresh duration {@value}*/
public static final String FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**
* Maximum consumer lag (count of blob information which is yet to be taken for operation)
* in blob listing which can be tolerated before making producer to wait for
* consumer lag to become tolerable: {@value}.
*/
public static final String FS_AZURE_CONSUMER_MAX_LAG = "fs.azure.blob.dir.list.consumer.max.lag";
/**Maximum number of thread per blob-rename orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,19 @@ public final class FileSystemConfigurations {

public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5;

public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;

public static final long DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION = 60_000L;

public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 2 * DEFAULT_AZURE_LIST_MAX_RESULTS;

public static final int DEFAULT_FS_AZURE_CONSUMER_MAX_LAG = DEFAULT_AZURE_LIST_MAX_RESULTS;

public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = 5;

public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 5;

public static final int BLOCK_ID_LENGTH = 60;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,9 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";

public static final String X_MS_COPY_ID = "x-ms-copy-id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs for new constants

public static final String X_MS_COPY_STATUS_DESCRIPTION = "x-ms-copy-status-description";
public static final String X_MS_COPY_STATUS = "x-ms-copy-status";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public enum AzureServiceErrorCode {
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null),
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null),
DIRECTORY_NOT_EMPTY_DELETE("DirectoryNotEmpty", HttpURLConnection.HTTP_CONFLICT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming could be changed

"The recursive query parameter value must be true to delete a non-empty directory"),
INGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
Expand All @@ -54,10 +56,13 @@ public enum AzureServiceErrorCode {
OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"The server is currently unable to receive requests. Please retry your request."),
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
INVALID_RENAME_DESTINATION("InvalidRenameDestinationPath", HttpURLConnection.HTTP_BAD_REQUEST, null),
AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),
MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST,
"The MD5 value specified in the request did not match with the MD5 value calculated by the server."),
COPY_BLOB_FAILED("CopyBlobFailed", HttpURLConnection.HTTP_INTERNAL_ERROR, null),
COPY_BLOB_ABORTED("CopyBlobAborted", HttpURLConnection.HTTP_INTERNAL_ERROR, null),
UNKNOWN(null, -1, null);

private final String errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.enums;

/**
* Enum for BlobCopyProgress.
*/
public enum BlobCopyProgress {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc for class and enums

/**
* Copy is successful.
*/
SUCCESS,
/**
* Copy is failed.
*/
FAILURE,
/**
* Copy is aborted.
*/
ABORTED,
/**
* Copy is pending.
*/
PENDING;
}
Loading
Loading