-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename Delete on Blob endpoint. #117
Conversation
This reverts commit 0005ca9.
…ception from BlobRenaameHandler
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
throws IOException { | ||
final FsPermission umask, | ||
TracingContext tracingContext) | ||
throws AzureBlobFileSystemException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any specific reason for changing the exception type ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted.
@@ -1121,6 +1152,7 @@ public FileStatus getFileStatus(final Path path, | |||
path, | |||
isNamespaceEnabled); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increases git diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
* and resume that if it exists. | ||
*/ | ||
if (isAtomicRenameKey(path.toUri().getPath())) { | ||
getClient().takeGetPathStatusAtomicRenameKeyAction(path, tracingContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a special handling only for blob client, can this be done in the client class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, although the action if its atomic key was being done in blobClient. But, now have exported the conditioning to the blobClient as well.
encryptionContext)); | ||
if (isAtomicRenameKey(entryPath.toUri().getPath()) | ||
&& entryPath.toUri().getPath().endsWith(RenameAtomicity.SUFFIX)) { | ||
getClient().takeListPathAtomicRenameKeyAction(entryPath, (int) contentLength, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this scenario. Can this be handled in blob client class itself ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, although the action if its atomic key was being done in blobClient. But, now have exported the conditioning to the blobClient as well.
@@ -1994,7 +2046,7 @@ public static final class Permissions { | |||
private final String permission; | |||
private final String umask; | |||
|
|||
Permissions(boolean isNamespaceEnabled, FsPermission permission, | |||
public Permissions(boolean isNamespaceEnabled, FsPermission permission, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this made public for some test usage ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been reverted.
AbfsRestOperation putBlobOp = abfsClient.createPath(path.toUri().getPath(), | ||
true, | ||
true, permissions, false, null, null, tracingContext); | ||
String eTag = putBlobOp.getResult().getResponseHeader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extractEtagHeader can be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
// PutBlock on the path. | ||
byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH]; | ||
String blockId = new String(Base64.encodeBase64(blockIdByteArray), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some unique string in the blockId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, would have a random byte array.
} | ||
|
||
@VisibleForTesting | ||
void createRenamePendingJson(Path path, byte[] bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why explicit calls to create, append and flush, why not use fs.create which returns OuputStream and just call write, hysnc and flush on that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats what for I added the callbacks. We cannot have cycle of classes, right? I dont think client having a pointer to fileSystem would be good. What you feel?
// Make file contents as a string. Again, quote file names, escaping | ||
// characters as appropriate. | ||
String contents = "{\n" | ||
+ " FormatVersion: \"1.0\",\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FormatVersion, OldFolderName etc should all come from Constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will create a new class for serialization and deserialization to json string. That would standardize the json.
* @return JSON string which represents the operation. | ||
*/ | ||
private String makeRenamePendingFileContents(String eTag) { | ||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some thought...
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
Outdated
Show resolved
Hide resolved
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
Outdated
Show resolved
Hide resolved
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Outdated
Show resolved
Hide resolved
...-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
Show resolved
Hide resolved
...ols/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
Outdated
Show resolved
Hide resolved
@@ -921,8 +1006,8 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive, | |||
*/ | |||
final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive, | |||
isNamespaceEnabled) && xMsVersion.compareTo( | |||
ApiVersion.AUG_03_2023) < 0) | |||
? createDefaultHeaders(ApiVersion.AUG_03_2023) | |||
AbfsHttpConstants.ApiVersion.AUG_03_2023) < 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have imported AbfsConstatnt.ApiVersion
We don;t need to qualify it now.
Thoughts on this?
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java
Outdated
Show resolved
Hide resolved
final boolean recursive, | ||
final AbfsBlobClient abfsBlobClient, | ||
final TracingContext tracingContext) { | ||
super(path, abfsBlobClient, abfsBlobClient.getAbfsConfiguration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already passing client, maybe we can get required configs from client itself in base class instead of passing them here.
Same for BlobRenameHandler as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be sending different config-keys. But, will simplify this. +1
...ols/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
Outdated
Show resolved
Hide resolved
...ols/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
Show resolved
Hide resolved
#120 * change exception type; other review comments; * comments wip * sync checks review wip * reviews comments at higher level taken * atomic method abstraction to client * Permissions not to have public constructor * added javadocs small fix : sync should return fix + lease can be sync and async. test fix3 fix2 fix1 * javadocs
deserialization needs empty constructor concurrency control to get availableSize no manual json parsing in renameAtomicity
false); | ||
|
||
public ListActionTaker(Path path, | ||
AbfsClient abfsClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this earlier, but looks like this should also be AbfsBlobClient
…BlobClient always. No casting required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits only
public void createDirectory(final Path path, final FsPermission permission, | ||
final FsPermission umask, TracingContext tracingContext) | ||
final FsPermission umask, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can avoid this git diff. Line is not that long..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
@@ -1027,22 +1035,19 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr | |||
* @param tracingContext trace context | |||
* @param sourceEtag etag of source file. may be null or empty | |||
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce git diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
* @return true if recovery was needed and succeeded. | ||
*/ | ||
public boolean rename(final Path source, | ||
final Path destination, | ||
final TracingContext tracingContext, | ||
final String sourceEtag) throws | ||
IOException { | ||
final String sourceEtag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduce git diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); | ||
long countAggregate = 0; | ||
boolean shouldContinue; | ||
|
||
if (isAtomicRenameKey(source.getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed from DFS flow?? Was this not doing anything there??
@@ -1081,7 +1096,7 @@ public boolean rename(final Path source, | |||
} | |||
|
|||
public void delete(final Path path, final boolean recursive, | |||
TracingContext tracingContext) throws AzureBlobFileSystemException { | |||
TracingContext tracingContext) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted.
@@ -720,7 +715,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |||
} | |||
} | |||
|
|||
String eTag = extractEtagHeader(op.getResult()); | |||
String eTag = op.getResult() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be extractEtagHeader only right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken.
@@ -1167,6 +1187,8 @@ public FileStatus getFileStatus(final Path path, | |||
|
|||
perfInfo.registerSuccess(true); | |||
|
|||
getClient().takeGetPathStatusAtomicRenameKeyAction(path, tracingContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function does not do any action for dfs, possible to move this also to blob endpoint ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an expectation in blob that fs.getPathStatus and fs.listPath can resume a pending rename if the pending-json is found. True that there is no action required in dfs. As far as store is concerned, the implementation is abstracted out to client.
But, the reason why the invocation of the abstract method cannot be pushed down further in client.getFileStatus and client.listPath is that these functions get called in multiple place other than filesystem API call flows. But, as our expectation for resume to only happen is at filesystem api level, invoking the abstract method in store makes sense.
To push this behavior in the client, the client.getPathStatus and client.listPath would have to be overloaded to take a new arg which tells if the method is getting called from the API call flow or not, which I think would be antipattern, but open to your opinions on it.
entry.eTag(), | ||
encryptionContext)); | ||
final boolean actionTakenOnRenamePendingJson | ||
= getClient().takeListPathAtomicRenameKeyAction(entryPath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
throw new AbfsRestOperationException( | ||
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), | ||
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), | ||
"Path had to be recovered from atomic rename operation.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exception message should come from constants class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken.
} | ||
} finally { | ||
if (srcAbfsLease != null) { | ||
srcAbfsLease.cancelTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
release lease ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Made a small change here.
so, when the operation is successful -> src blob would be deleted. -> no need to release the lease. But, when failure happens -> we need to release. Good catch!
now, when successful -> cancel timer
when failure -> release lease + cancel timer -> done via free method.
LGTM. Thanks for taking the suggestions. |
Enable rename and delete over Blob endpoint. The endpoint doesnot support rename API and not directory-delete.
There shall be no change until client layer, wherein, in the AbfsBlobClient#rename(), we will create an instance of BlobRenameHandler, and in AbfsBlobClient#delete(), we will create an instance of BlobDeleteHandler. These two new classes would own the orchestration logic for doing rename and delete, also they would own the HDFS contract prechecks. DFS endpoint performs the contract checks but not blob endpoint, hence, the handler would have to do prechecks and the postchecks.
How to directory operations?
In both of them, there is a need of listing that needs to be done by the client and then take actions on them. But, there is a problem with iterative listing and then taking action:
For the above point, the pr introduces a logic of producer and consumer. Where, the producer would keep producing the blob properties on a separate thread and enqueue them in a queue. The consumer would be owning an executorservice, it would keep polling the queue, and keep submitting to the executorService. The pr would take care of the consumer lag should not be more than a configuration.
High level code design:
Whenever the handlers are executed, they call listRecursiveAndTakeAction to handler directory level operations.
Atomic Rename:
This would have to be implemented in BlobRenameHandler. This is required, because the rename over blob for a directory is not an O(1) operation in reference to the client, but it needs an orchestration to copy/delete each blob in the directory path. There is a need of atomic rename which is not switched on for every kind of directory, but there is a config in which the developer can provide what directory they want to enable for atomic rename. The config is
fs.azure.atomic.rename.key
with default value is "/hbase" (on trunk in abfs). This feature is implemented in wasb for hbase case only. The need of why this feature was not implemented in abfs was that the dfs endpoint’s API call would be O(1). Since, we are bringing in blob endpoint integeration, this change has to be implemented.What the feature is required to do:
a. subDirEtag
b. oldFolderName
c. newFolderName
a. When doing listPath on a path gives the renamePending json as a subPath
b. When getPathStatus has to be done on the subDir of an atomcDirectory, client would also do getPathStatus on subDir-renamePending.json
a. Rerun the rename as per the json
b. Once, rename orchestration is done, delete the json.