Skip to content

Commit

Permalink
Rename Delete on Blob endpoint. (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenapranav authored Jun 26, 2024
1 parent 8937ae5 commit d078fb1
Show file tree
Hide file tree
Showing 37 changed files with 3,771 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,26 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS,
DefaultValue = DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS)
private long blobCopyProgressPollWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION,
DefaultValue = DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION)
private long blobAtomicRenameLeaseRefreshDuration;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1413,4 +1433,24 @@ public boolean getIsChecksumValidationEnabled() {
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}

public long getAtomicRenameLeaseRefreshDuration() {
return blobAtomicRenameLeaseRefreshDuration;
}

public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,18 @@ public boolean rename(final Path src, final Path dst) throws IOException {

qualifiedDstPath = makeQualified(adjustedDst);

abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
getAbfsStore().rename(qualifiedSrcPath, qualifiedDstPath, tracingContext,
null
);
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
checkException(
src,
ex,
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_ALREADY_EXISTS,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
Expand Down Expand Up @@ -595,8 +599,9 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
final Duration waitTime = rateLimiting.acquire(1);

try {
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
qualifiedDstPath, tracingContext, sourceEtag);
final boolean recovered = getAbfsStore().rename(qualifiedSrcPath,
qualifiedDstPath, tracingContext, sourceEtag
);
return Pair.of(recovered, waitTime);
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
Expand Down Expand Up @@ -627,10 +632,11 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.DELETE, tracingHeaderFormat,
listener);
abfsStore.delete(qualifiedPath, recursive, tracingContext);
getAbfsStore().delete(qualifiedPath, recursive, tracingContext);
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND,
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND);
return false;
}

Expand All @@ -647,7 +653,8 @@ public FileStatus[] listStatus(final Path f) throws IOException {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat,
listener);
FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext);
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath,
tracingContext);
return result;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
Expand Down Expand Up @@ -777,7 +784,9 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
FileStatus fileStatus = getAbfsStore().getFileStatus(qualifiedPath,
tracingContext);
return fileStatus;
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;

Expand All @@ -175,7 +176,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private final Map<AbfsLease, Object> leaseRefs;

private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private Set<String> azureInfiniteLeaseDirSet;
private volatile Trilean isNamespaceEnabled;
private final AuthType authType;
Expand Down Expand Up @@ -243,8 +243,6 @@ public AzureBlobFileSystemStore(
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);

this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
Expand Down Expand Up @@ -719,8 +717,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
Expand Down Expand Up @@ -803,6 +800,16 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
Expand All @@ -815,7 +822,6 @@ public void createDirectory(final Path path, final FsPermission permission,
permission,
umask,
isNamespaceEnabled);

boolean overwrite =
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
Expand Down Expand Up @@ -1042,16 +1048,11 @@ public boolean rename(final Path source,
final Path destination,
final TracingContext tracingContext,
final String sourceEtag) throws
IOException {
IOException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue;

if (isAtomicRenameKey(source.getName())) {
LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
+" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
}

LOG.debug("renameAsync filesystem: {} source: {} destination: {}",
getClient().getFileSystem(),
source,
Expand All @@ -1070,11 +1071,21 @@ public boolean rename(final Path source,
final AbfsClientRenameResult abfsClientRenameResult =
getClient().renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag, false,
isNamespaceEnabled);
isNamespaceEnabled);


AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
/*
* Blob endpoint does not have a rename API. The AbfsBlobClient would
* perform the copy and delete operation for renaming a path.
* As it would not be one operation, hence, the client would not return
* AbfsRestOperation object.
*/
if (op != null) {
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
}
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
Expand All @@ -1090,7 +1101,7 @@ public boolean rename(final Path source,
}

public void delete(final Path path, final boolean recursive,
TracingContext tracingContext) throws AzureBlobFileSystemException {
TracingContext tracingContext) throws AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue = true;
Expand All @@ -1108,8 +1119,16 @@ public void delete(final Path path, final boolean recursive,
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = getClient().deletePath(relativePath, recursive,
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
/*
* Blob endpoint does not have a directory delete API. The AbfsBlobClient would
* perform multiple operation to delete a path, hence, the client would not return
* AbfsRestOperation object.
*/
if (op != null) {
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
}
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
Expand Down Expand Up @@ -1176,6 +1195,8 @@ public FileStatus getFileStatus(final Path path,

perfInfo.registerSuccess(true);

getClient().takeGetPathStatusAtomicRenameKeyAction(path, tracingContext);

return new VersionedFileStatus(
transformedOwner,
transformedGroup,
Expand All @@ -1196,6 +1217,7 @@ public FileStatus getFileStatus(final Path path,
/**
* @param path The list path.
* @param tracingContext Tracks identifiers for request header
*
* @return the entries in the path.
* */
@Override
Expand All @@ -1212,6 +1234,7 @@ public FileStatus[] listStatus(final Path path, TracingContext tracingContext) t
* all entries after this non-existent entry in lexical order:
* listStatus(Path("/folder"), "cfile") will return "/folder/hfile" and "/folder/ifile".
* @param tracingContext Tracks identifiers for request header
*
* @return the entries in the path start from "startFrom" in lexical order.
* */
@InterfaceStability.Unstable
Expand Down Expand Up @@ -1290,20 +1313,26 @@ public String listStatus(final Path path, final String startFrom,
Path entryPath = new Path(File.separator + entry.name());
entryPath = entryPath.makeQualified(this.uri, entryPath);

fileStatuses.add(
new VersionedFileStatus(
owner,
group,
fsPermission,
hasAcl,
contentLength,
isDirectory,
1,
blockSize,
lastModifiedMillis,
entryPath,
entry.eTag(),
encryptionContext));
final boolean actionTakenOnRenamePendingJson
= getClient().takeListPathAtomicRenameKeyAction(entryPath,
(int) contentLength,
tracingContext);
if (!actionTakenOnRenamePendingJson) {
fileStatuses.add(
new VersionedFileStatus(
owner,
group,
fsPermission,
hasAcl,
contentLength,
isDirectory,
1,
blockSize,
lastModifiedMillis,
entryPath,
entry.eTag(),
encryptionContext));
}
}

perfInfo.registerSuccess(true);
Expand Down Expand Up @@ -1706,10 +1735,6 @@ public void access(final Path path, final FsAction mode,
}
}

public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}

public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
Expand Down Expand Up @@ -1865,7 +1890,7 @@ private boolean parseIsDirectory(final String resourceType) {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}

private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
public static boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
for (String dir : dirSet) {
if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
return true;
Expand Down Expand Up @@ -2142,7 +2167,8 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(getClient(), relativePath, tracingContext);
AbfsLease lease = new AbfsLease(getClient(), relativePath, true,
INFINITE_LEASE_DURATION, null, tracingContext);
leaseRefs.put(lease, null);
return lease;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

public static final String COPY_STATUS_SUCCESS = "success";
public static final String COPY_STATUS_PENDING = "pending";
public static final String COPY_STATUS_ABORTED = "aborted";
public static final String COPY_STATUS_FAILED = "failed";
public static final String XML_TAG_NAME = "Name";
public static final String XML_TAG_BLOB = "Blob";
public static final String XML_TAG_PREFIX = "Prefix";
Expand Down Expand Up @@ -249,5 +253,8 @@ public static ApiVersion getCurrentVersion() {
+ "non-hierarchical-namespace account:"
+ CPK_CONFIG_LIST;

public static final String ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION =
"Path had to be recovered from atomic rename operation.";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,5 +346,20 @@ public static String accountProperty(String property, String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable";

/**
* Blob copy API is an async API, this configuration defines polling duration
* for checking copy status {@value}
*/
public static final String FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = "fs.azure.blob.copy.progress.wait.millis";
/**Blob rename lease refresh duration {@value}*/
public static final String FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**Maximum number of thread per blob-rename orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
private ConfigurationKeys() {}
}
Loading

0 comments on commit d078fb1

Please sign in to comment.