-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename Delete on Blob endpoint. #117
Changes from all commits
c036404
ea2bef1
870361b
c2bacc3
0005ca9
c3372af
75edc7b
f17b0b7
eb642bc
798e23e
8282cc4
fe41cca
0ad1214
4d48fe8
0b9b032
4ffef8f
d5acd2f
710ca79
df84b40
2c67654
e030dc7
bab4bb8
0fc7ca8
ceb2dc7
5661eb2
1b3ee3f
97b6b03
ca6feac
f96afce
ce181de
2fc652e
71f8405
a865e34
32add3b
52c1822
9359ae0
e8b48f1
ed6ecdc
7d574a5
896204d
6115a08
328e097
ab31f75
e446e75
a2cc587
0f6db9d
d22b99c
27cd964
5adb6f0
4d995a3
a50dab1
91fb94a
747a51d
20155f3
07dfbdb
8756887
3ec178b
7fda902
21d0e4f
29c3c41
e134292
2e4fb2d
aa9866b
c7099db
d0cb5b9
a961ab1
2792325
182967e
a705138
736b32a
652c15b
5303166
ec869ee
6f26b81
8c5af2e
ebde8d9
61da75d
5f92b37
8cb4c6f
dd6f5dc
5afeaef
e816e96
7b53575
d6de0c4
603caea
52bcd11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,6 +152,7 @@ | |
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; | ||
|
||
|
@@ -175,7 +176,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { | |
private final Map<AbfsLease, Object> leaseRefs; | ||
|
||
private final AbfsConfiguration abfsConfiguration; | ||
private final Set<String> azureAtomicRenameDirSet; | ||
private Set<String> azureInfiniteLeaseDirSet; | ||
private volatile Trilean isNamespaceEnabled; | ||
private final AuthType authType; | ||
|
@@ -243,8 +243,6 @@ public AzureBlobFileSystemStore( | |
} | ||
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup); | ||
|
||
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( | ||
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); | ||
updateInfiniteLeaseDirs(); | ||
this.authType = abfsConfiguration.getAuthType(accountName); | ||
boolean usingOauth = (authType == AuthType.OAuth); | ||
|
@@ -719,8 +717,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |
} | ||
} | ||
|
||
String eTag = op.getResult() | ||
.getResponseHeader(HttpHeaderConfigurations.ETAG); | ||
String eTag = extractEtagHeader(op.getResult()); | ||
|
||
try { | ||
// overwrite only if eTag matches with the file properties fetched befpre | ||
|
@@ -803,6 +800,16 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( | |
.build(); | ||
} | ||
|
||
/** | ||
* Creates a directory. | ||
* | ||
* @param path Path of the directory to create. | ||
* @param permission Permission of the directory. | ||
* @param umask Umask of the directory. | ||
* @param tracingContext tracing context | ||
* | ||
* @throws AzureBlobFileSystemException server error. | ||
*/ | ||
public void createDirectory(final Path path, final FsPermission permission, | ||
final FsPermission umask, TracingContext tracingContext) | ||
throws IOException { | ||
|
@@ -815,7 +822,6 @@ public void createDirectory(final Path path, final FsPermission permission, | |
permission, | ||
umask, | ||
isNamespaceEnabled); | ||
|
||
boolean overwrite = | ||
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite(); | ||
Permissions permissions = new Permissions(isNamespaceEnabled, | ||
|
@@ -1042,16 +1048,11 @@ public boolean rename(final Path source, | |
final Path destination, | ||
final TracingContext tracingContext, | ||
final String sourceEtag) throws | ||
IOException { | ||
IOException { | ||
final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); | ||
long countAggregate = 0; | ||
boolean shouldContinue; | ||
|
||
if (isAtomicRenameKey(source.getName())) { | ||
LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," | ||
+" create and delete operations are atomic if Namespace is enabled for your Azure Storage account."); | ||
} | ||
|
||
LOG.debug("renameAsync filesystem: {} source: {} destination: {}", | ||
getClient().getFileSystem(), | ||
source, | ||
|
@@ -1070,11 +1071,21 @@ public boolean rename(final Path source, | |
final AbfsClientRenameResult abfsClientRenameResult = | ||
getClient().renamePath(sourceRelativePath, destinationRelativePath, | ||
continuation, tracingContext, sourceEtag, false, | ||
isNamespaceEnabled); | ||
isNamespaceEnabled); | ||
|
||
|
||
AbfsRestOperation op = abfsClientRenameResult.getOp(); | ||
perfInfo.registerResult(op.getResult()); | ||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); | ||
/* | ||
* Blob endpoint does not have a rename API. The AbfsBlobClient would | ||
* perform the copy and delete operation for renaming a path. | ||
* As it would not be one operation, hence, the client would not return | ||
* AbfsRestOperation object. | ||
*/ | ||
if (op != null) { | ||
perfInfo.registerResult(op.getResult()); | ||
continuation = op.getResult() | ||
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); | ||
} | ||
perfInfo.registerSuccess(true); | ||
countAggregate++; | ||
shouldContinue = continuation != null && !continuation.isEmpty(); | ||
|
@@ -1090,7 +1101,7 @@ public boolean rename(final Path source, | |
} | ||
|
||
public void delete(final Path path, final boolean recursive, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); | ||
long countAggregate = 0; | ||
boolean shouldContinue = true; | ||
|
@@ -1108,8 +1119,16 @@ public void delete(final Path path, final boolean recursive, | |
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) { | ||
AbfsRestOperation op = getClient().deletePath(relativePath, recursive, | ||
continuation, tracingContext, getIsNamespaceEnabled(tracingContext)); | ||
perfInfo.registerResult(op.getResult()); | ||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); | ||
/* | ||
* Blob endpoint does not have a directory delete API. The AbfsBlobClient would | ||
* perform multiple operation to delete a path, hence, the client would not return | ||
* AbfsRestOperation object. | ||
*/ | ||
if (op != null) { | ||
perfInfo.registerResult(op.getResult()); | ||
continuation = op.getResult() | ||
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); | ||
} | ||
perfInfo.registerSuccess(true); | ||
countAggregate++; | ||
shouldContinue = continuation != null && !continuation.isEmpty(); | ||
|
@@ -1176,6 +1195,8 @@ public FileStatus getFileStatus(final Path path, | |
|
||
perfInfo.registerSuccess(true); | ||
|
||
getClient().takeGetPathStatusAtomicRenameKeyAction(path, tracingContext); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function does not do any action for dfs, possible to move this also to blob endpoint ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is an expectation in blob that fs.getPathStatus and fs.listPath can resume a pending rename if the pending-json is found. True that there is no action required in dfs. As far as store is concerned, the implementation is abstracted out to client. But, the reason why the invocation of the abstract method cannot be pushed down further in client.getFileStatus and client.listPath is that these functions get called in multiple place other than filesystem API call flows. But, as our expectation for resume to only happen is at filesystem api level, invoking the abstract method in store makes sense. To push this behavior in the client, the client.getPathStatus and client.listPath would have to be overloaded to take a new arg which tells if the method is getting called from the API call flow or not, which I think would be antipattern, but open to your opinions on it. |
||
|
||
return new VersionedFileStatus( | ||
transformedOwner, | ||
transformedGroup, | ||
|
@@ -1196,6 +1217,7 @@ public FileStatus getFileStatus(final Path path, | |
/** | ||
* @param path The list path. | ||
* @param tracingContext Tracks identifiers for request header | ||
* | ||
* @return the entries in the path. | ||
* */ | ||
@Override | ||
|
@@ -1212,6 +1234,7 @@ public FileStatus[] listStatus(final Path path, TracingContext tracingContext) t | |
* all entries after this non-existent entry in lexical order: | ||
* listStatus(Path("/folder"), "cfile") will return "/folder/hfile" and "/folder/ifile". | ||
* @param tracingContext Tracks identifiers for request header | ||
* | ||
* @return the entries in the path start from "startFrom" in lexical order. | ||
* */ | ||
@InterfaceStability.Unstable | ||
|
@@ -1290,20 +1313,26 @@ public String listStatus(final Path path, final String startFrom, | |
Path entryPath = new Path(File.separator + entry.name()); | ||
entryPath = entryPath.makeQualified(this.uri, entryPath); | ||
|
||
fileStatuses.add( | ||
new VersionedFileStatus( | ||
owner, | ||
group, | ||
fsPermission, | ||
hasAcl, | ||
contentLength, | ||
isDirectory, | ||
1, | ||
blockSize, | ||
lastModifiedMillis, | ||
entryPath, | ||
entry.eTag(), | ||
encryptionContext)); | ||
final boolean actionTakenOnRenamePendingJson | ||
= getClient().takeListPathAtomicRenameKeyAction(entryPath, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
(int) contentLength, | ||
tracingContext); | ||
if (!actionTakenOnRenamePendingJson) { | ||
fileStatuses.add( | ||
new VersionedFileStatus( | ||
owner, | ||
group, | ||
fsPermission, | ||
hasAcl, | ||
contentLength, | ||
isDirectory, | ||
1, | ||
blockSize, | ||
lastModifiedMillis, | ||
entryPath, | ||
entry.eTag(), | ||
encryptionContext)); | ||
} | ||
} | ||
|
||
perfInfo.registerSuccess(true); | ||
|
@@ -1706,10 +1735,6 @@ public void access(final Path path, final FsAction mode, | |
} | ||
} | ||
|
||
public boolean isAtomicRenameKey(String key) { | ||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet); | ||
} | ||
|
||
public boolean isInfiniteLeaseKey(String key) { | ||
if (azureInfiniteLeaseDirSet.isEmpty()) { | ||
return false; | ||
|
@@ -1865,7 +1890,7 @@ private boolean parseIsDirectory(final String resourceType) { | |
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); | ||
} | ||
|
||
private boolean isKeyForDirectorySet(String key, Set<String> dirSet) { | ||
public static boolean isKeyForDirectorySet(String key, Set<String> dirSet) { | ||
for (String dir : dirSet) { | ||
if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) { | ||
return true; | ||
|
@@ -2142,7 +2167,8 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo | |
if (!enableInfiniteLease) { | ||
return null; | ||
} | ||
AbfsLease lease = new AbfsLease(getClient(), relativePath, tracingContext); | ||
AbfsLease lease = new AbfsLease(getClient(), relativePath, true, | ||
INFINITE_LEASE_DURATION, null, tracingContext); | ||
leaseRefs.put(lease, null); | ||
return lease; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed from DFS flow?? Was this not doing anything there??