-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Producer condition change + Rename Azcopy tests #127
Changes from 35 commits
993d4f6
b3ee83b
c7f348e
f4ec53a
2ca1b0e
b93a862
3c841e2
2105ca9
12e89de
09f40f8
ed652c8
c09c89b
ba9c62c
e8d5a35
37c70f1
0ee6532
299c86d
12d68ad
138662e
4cf26e5
76177f9
43e5793
bfc9732
10fe6b6
6a93c4c
e0cc653
04bc2aa
c203ce0
00cec1c
f618c73
3401e5c
0158163
9bd4eb5
8422d3e
4158fbf
efc70fa
c0ef826
c37f21d
74de696
6a37197
b2a78d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -362,6 +362,14 @@ public FSDataOutputStream create(final Path f, | |
final short replication, | ||
final long blockSize, | ||
final Progressable progress) throws IOException { | ||
return createInternal(f, permission, overwrite, blockSize, false); | ||
} | ||
|
||
private FSDataOutputStream createInternal(final Path f, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this refraction we are doing only for Blob Endpoint. this new parameter is not used by DFS Client. Can we have these handling only in ABFSBlobClient? Let's discuss this once offline if its possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new field is just to tell BlobClient, if the createPath is for createNonRecursive HDFS API or create HDFS API. The required orchestration for blob on createNonRecursive is done in blobClient only. This field is just to propagate the information to the client about what HDFS API has invoked it. |
||
final FsPermission permission, | ||
final boolean overwrite, | ||
final long blockSize, | ||
final boolean isNonRecursiveCreate) throws IOException { | ||
LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}", | ||
f, | ||
permission, | ||
|
@@ -382,9 +390,10 @@ public FSDataOutputStream create(final Path f, | |
try { | ||
TracingContext tracingContext = new TracingContext(clientCorrelationId, | ||
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener); | ||
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite, | ||
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, | ||
overwrite, | ||
permission == null ? FsPermission.getFileDefault() : permission, | ||
FsPermission.getUMask(getConf()), tracingContext); | ||
FsPermission.getUMask(getConf()), isNonRecursiveCreate, tracingContext); | ||
statIncrement(FILES_CREATED); | ||
return new FSDataOutputStream(outputStream, statistics); | ||
} catch (AzureBlobFileSystemException ex) { | ||
|
@@ -417,7 +426,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe | |
+ f.getName() + " because parent folder does not exist."); | ||
} | ||
|
||
return create(f, permission, overwrite, bufferSize, replication, blockSize, progress); | ||
return createInternal(f, permission, overwrite, blockSize, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like these parameters were not used. Should we still keep them to reduce unnecessary diffs?? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken. |
||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -603,7 +603,7 @@ public void deleteFilesystem(TracingContext tracingContext) | |
public OutputStream createFile(final Path path, | ||
final FileSystem.Statistics statistics, final boolean overwrite, | ||
final FsPermission permission, final FsPermission umask, | ||
TracingContext tracingContext) throws IOException { | ||
final boolean isRecursiveCreate, TracingContext tracingContext) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parameter passed to store has name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems buggy as well... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 with the confusion. It has to be all about nonRecursiveCreate. Can you please explain what seems wrong here please :). |
||
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { | ||
AbfsClient createClient = getClientHandler().getIngressClient(); | ||
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); | ||
|
@@ -644,6 +644,7 @@ public OutputStream createFile(final Path path, | |
new Permissions(isNamespaceEnabled, permission, umask), | ||
isAppendBlob, | ||
contextEncryptionAdapter, | ||
isRecursiveCreate, | ||
tracingContext | ||
); | ||
|
||
|
@@ -654,6 +655,7 @@ public OutputStream createFile(final Path path, | |
isAppendBlob, | ||
null, | ||
contextEncryptionAdapter, | ||
isRecursiveCreate, | ||
tracingContext, isNamespaceEnabled); | ||
|
||
} | ||
|
@@ -678,27 +680,32 @@ public OutputStream createFile(final Path path, | |
/** | ||
* Conditional create overwrite flow ensures that create overwrites is done | ||
* only if there is match for eTag of existing file. | ||
* | ||
* @param relativePath | ||
* @param statistics | ||
* @param permissions contains permission and umask | ||
* @param isAppendBlob | ||
* @param isRecursiveCreate | ||
* | ||
* @return | ||
* | ||
* @throws AzureBlobFileSystemException | ||
*/ | ||
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath, | ||
final FileSystem.Statistics statistics, | ||
final Permissions permissions, | ||
final boolean isAppendBlob, | ||
final ContextEncryptionAdapter contextEncryptionAdapter, | ||
final TracingContext tracingContext) throws IOException { | ||
final boolean isRecursiveCreate, final TracingContext tracingContext) throws IOException { | ||
AbfsRestOperation op; | ||
AbfsClient createClient = getClientHandler().getIngressClient(); | ||
try { | ||
// Trigger a create with overwrite=false first so that eTag fetch can be | ||
// avoided for cases when no pre-existing file is present (major portion | ||
// of create file traffic falls into the case of no pre-existing file). | ||
op = createClient.createPath(relativePath, true, false, permissions, | ||
isAppendBlob, null, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext)); | ||
isAppendBlob, null, contextEncryptionAdapter, isRecursiveCreate, | ||
tracingContext, getIsNamespaceEnabled(tracingContext)); | ||
|
||
} catch (AbfsRestOperationException e) { | ||
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { | ||
|
@@ -722,7 +729,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |
try { | ||
// overwrite only if eTag matches with the file properties fetched befpre | ||
op = createClient.createPath(relativePath, true, true, permissions, | ||
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext)); | ||
isAppendBlob, eTag, contextEncryptionAdapter, isRecursiveCreate, | ||
tracingContext, getIsNamespaceEnabled(tracingContext)); | ||
} catch (AbfsRestOperationException ex) { | ||
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { | ||
// Is a parallel access case, as file with eTag was just queried | ||
|
@@ -827,7 +835,8 @@ public void createDirectory(final Path path, final FsPermission permission, | |
Permissions permissions = new Permissions(isNamespaceEnabled, | ||
permission, umask); | ||
final AbfsRestOperation op = createClient.createPath(getRelativePath(path), | ||
false, overwrite, permissions, false, null, null, tracingContext, isNamespaceEnabled); | ||
false, overwrite, permissions, false, null, null, false, | ||
tracingContext, isNamespaceEnabled); | ||
perfInfo.registerResult(op.getResult()).registerSuccess(true); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,6 +117,7 @@ | |
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ZERO; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SIXTY_SECONDS; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; | ||
|
@@ -312,9 +313,13 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) | |
/** | ||
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob></a>. | ||
* Creates a file or directory(marker file) at specified path. | ||
* | ||
* @param path of the directory to be created. | ||
* @param isRecursiveCreate | ||
* @param tracingContext | ||
* | ||
* @return executed rest operation containing response from server. | ||
* | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
*/ | ||
@Override | ||
|
@@ -325,11 +330,31 @@ public AbfsRestOperation createPath(final String path, | |
final boolean isAppendBlob, | ||
final String eTag, | ||
final ContextEncryptionAdapter contextEncryptionAdapter, | ||
final boolean isRecursiveCreate, | ||
final TracingContext tracingContext, final boolean isNamespaceEnabled) | ||
throws AzureBlobFileSystemException { | ||
return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag, | ||
contextEncryptionAdapter, tracingContext, isNamespaceEnabled, false); | ||
AbfsLease abfsLease = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be missing something... Or was it just missed earlier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yet it was missed. In case of createNonRecursive on atomic path, we have to take lease on the parent directory. |
||
if (isRecursiveCreate && abfsConfiguration.isLeaseOnCreateNonRecursive()) { | ||
abfsLease = takeAbfsLease(new Path(path).getParent().toUri().getPath(), SIXTY_SECONDS, tracingContext); | ||
} | ||
try { | ||
return createPath(path, isFile, overwrite, permissions, isAppendBlob, | ||
eTag, | ||
contextEncryptionAdapter, tracingContext, isNamespaceEnabled, false); | ||
} finally { | ||
if(abfsLease != null) { | ||
abfsLease.free(); | ||
} | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
public AbfsLease takeAbfsLease(final String path, | ||
final long timeDuration, | ||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
return new AbfsLease(this, path, false, timeDuration, null, tracingContext); | ||
} | ||
|
||
/** | ||
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob></a>. | ||
* Creates a file or directory(marker file) at specified path. | ||
|
@@ -1489,7 +1514,7 @@ public boolean isAtomicRenameKey(String key) { | |
* | ||
* @throws AzureBlobFileSystemException server error or the path is renamePending json file and action is taken. | ||
*/ | ||
public void takeGetPathStatusAtomicRenameKeyAction(final Path path, | ||
private void takeGetPathStatusAtomicRenameKeyAction(final Path path, | ||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
if (path == null || path.isRoot() || !isAtomicRenameKey(path.toUri().getPath())) { | ||
return; | ||
|
@@ -1583,7 +1608,7 @@ public boolean takeListPathAtomicRenameKeyAction(final Path path, | |
} | ||
|
||
@VisibleForTesting | ||
RenameAtomicity getRedoRenameAtomicity(final Path path, int fileLen, | ||
public RenameAtomicity getRedoRenameAtomicity(final Path path, int fileLen, | ||
final TracingContext tracingContext) { | ||
RenameAtomicity renameAtomicity = new RenameAtomicity(path, | ||
fileLen, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,6 @@ | |
import com.fasterxml.jackson.core.JsonToken; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; | ||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; | ||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; | ||
|
@@ -321,15 +320,19 @@ public AbfsRestOperation listPath(final String relativePath, | |
/** | ||
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create></a>. | ||
* Create a path (file or directory) in the current filesystem. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No changes in this file, additional changes can be removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taken. |
||
* @param path to be created inside the filesystem. | ||
* @param isFile to specify if the created path is file or directory. | ||
* @param overwrite to specify if the path should be overwritten if it already exists. | ||
* @param permissions to specify the permissions of the path. | ||
* @param isAppendBlob to specify if the path to be created is an append blob. | ||
* @param eTag to specify conditional headers. | ||
* @param contextEncryptionAdapter to provide encryption context. | ||
* @param isRecursiveCreate | ||
* @param tracingContext | ||
* | ||
* @return executed rest operation containing response from server. | ||
* | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
*/ | ||
@Override | ||
|
@@ -340,6 +343,7 @@ public AbfsRestOperation createPath(final String path, | |
final boolean isAppendBlob, | ||
final String eTag, | ||
final ContextEncryptionAdapter contextEncryptionAdapter, | ||
final boolean isRecursiveCreate, | ||
final TracingContext tracingContext, final boolean isNamespaceEnabled) | ||
throws AzureBlobFileSystemException { | ||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: boolean variable. Better to change to
isLeaseOnCreateNonRecursiveEnabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken.