Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer condition change + Rename Azcopy tests #127

Merged
merged 41 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
993d4f6
Azcopy Helper
Jun 11, 2024
b3ee83b
Test Infra For Implicit Scenarios
Jun 25, 2024
c7f348e
Assert Azcopy creates implicit paths
Jun 25, 2024
f4ec53a
Azcopy helper Refactored
Jul 1, 2024
2ca1b0e
Azcopy helper Test
Jul 1, 2024
b93a862
Singleton AzcopyHelper
Jul 2, 2024
3c841e2
Unused Iports
Jul 2, 2024
2105ca9
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 2, 2024
12e89de
explicit and implicit tests for rename
saxenapranav Jul 2, 2024
09f40f8
Azcopy Helper
Jun 11, 2024
ed652c8
Test Infra For Implicit Scenarios
Jun 25, 2024
c09c89b
Assert Azcopy creates implicit paths
Jun 25, 2024
ba9c62c
Azcopy helper Refactored
Jul 1, 2024
e8d5a35
Azcopy helper Test
Jul 1, 2024
37c70f1
Singleton AzcopyHelper
Jul 2, 2024
0ee6532
Unused Iports
Jul 2, 2024
299c86d
Resolved Comments
Jul 2, 2024
12d68ad
Azcopy Helper Works only on Blob Endpoint
Jul 2, 2024
138662e
test fixed
saxenapranav Jul 3, 2024
4cf26e5
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 3, 2024
76177f9
fixed some test; braught in tests
saxenapranav Jul 4, 2024
43e5793
handler implicit src dir. added new test in ITestExplicitImplicitRename
saxenapranav Jul 8, 2024
bfc9732
Merge branch 'wasbDepCodeReview' into sp/azcopyTests
saxenapranav Jul 8, 2024
10fe6b6
make azcopy commands in parallel
saxenapranav Jul 8, 2024
6a93c4c
create a method that can parallely create dir and file
saxenapranav Jul 8, 2024
e0cc653
parallelised test's az copy create file/dir
saxenapranav Jul 8, 2024
04bc2aa
code improvement wip
saxenapranav Jul 8, 2024
c203ce0
added javadocs; some test refactors
saxenapranav Jul 8, 2024
00cec1c
test run improvement + refactor
saxenapranav Jul 8, 2024
f618c73
assume if blob endpoint in testAtomicityRedoInvalidFile
saxenapranav Jul 9, 2024
3401e5c
add test for asserting getPathStatus and getListStatus doesnt resume …
saxenapranav Jul 9, 2024
0158163
maxConsumptionLag javadocs; from config
saxenapranav Jul 10, 2024
9bd4eb5
test added
saxenapranav Jul 16, 2024
8422d3e
refactors to get createNonRecursive running
saxenapranav Jul 23, 2024
4158fbf
added test
saxenapranav Jul 23, 2024
efc70fa
nit refactors
saxenapranav Jul 25, 2024
c0ef826
correct way of implementation for createNonRecursive. The aim is to h…
saxenapranav Jul 25, 2024
c37f21d
flow of statistic incremenet
saxenapranav Jul 26, 2024
74de696
clean changes
saxenapranav Jul 26, 2024
6a37197
asf
saxenapranav Jul 26, 2024
b2a78d9
nits
saxenapranav Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE;

/**
* Configuration for Azure Blob FileSystem.
Expand Down Expand Up @@ -409,6 +411,10 @@ public class AbfsConfiguration{
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;
Expand All @@ -417,6 +423,10 @@ public class AbfsConfiguration{
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_LEASE_CREATE_NON_RECURSIVE, DefaultValue = DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE)
private boolean isLeaseOnCreateNonRecursiveEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1446,11 +1456,19 @@ public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}

public boolean isLeaseOnCreateNonRecursiveEnabled() {
return isLeaseOnCreateNonRecursiveEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ public FSDataOutputStream create(final Path f,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics,
overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
Expand All @@ -406,18 +407,23 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
ERR_CREATE_ON_ROOT,
null);
}
final Path parent = f.getParent();
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
listener);
final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext);

if (parentFileStatus == null) {
throw new FileNotFoundException("Cannot create file "
+ f.getName() + " because parent folder does not exist.");
try {
Path qualifiedPath = makeQualified(f);
OutputStream outputStream = getAbfsStore().createPathNonRecursive(
qualifiedPath, statistics,
overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
return new FSDataOutputStream(outputStream, statistics);
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
}

return create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,24 @@ public void deleteFilesystem(TracingContext tracingContext)
}
}

public OutputStream createPathNonRecursive(final Path path,
final FileSystem.Statistics statistics,
final boolean overwrite,
final FsPermission permission,
final FsPermission umask,
TracingContext tracingContext)
throws IOException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
AbfsRestOperation op = getClient().createNonRecursivePath(
path.toUri().getPath(), true, overwrite,
new Permissions(isNamespaceEnabled, permission,
umask),
false,
null, null, tracingContext, isNamespaceEnabled);
return createAbfsOutputStreamInstance(statistics, tracingContext,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is no check for appendBlob and contextEncryptionAdapter needed for non recursive create ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected.

path.toUri().getPath(), false, null, op);
}

public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
Expand Down Expand Up @@ -658,31 +676,45 @@ public OutputStream createFile(final Path path,

}
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
return createAbfsOutputStreamInstance(statistics, tracingContext,
relativePath,
isAppendBlob, contextEncryptionAdapter, op);
}
}

private AbfsOutputStream createAbfsOutputStreamInstance(final FileSystem.Statistics statistics,
final TracingContext tracingContext,
final String relativePath,
final boolean isAppendBlob,
final ContextEncryptionAdapter contextEncryptionAdapter,
final AbfsRestOperation op) throws IOException {

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}

/**
* Conditional create overwrite flow ensures that create overwrites is done
* only if there is match for eTag of existing file.
*
* @param relativePath
* @param statistics
* @param permissions contains permission and umask
* @param isAppendBlob
*
* @return
*
* @throws AzureBlobFileSystemException
*/
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
Expand All @@ -698,7 +730,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext));
isAppendBlob, null, contextEncryptionAdapter,
tracingContext, getIsNamespaceEnabled(tracingContext));

} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
Expand All @@ -722,7 +755,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext));
isAppendBlob, eTag, contextEncryptionAdapter,
tracingContext, getIsNamespaceEnabled(tracingContext));
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
// Is a parallel access case, as file with eTag was just queried
Expand Down Expand Up @@ -827,7 +861,8 @@ public void createDirectory(final Path path, final FsPermission permission,
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext, isNamespaceEnabled);
false, overwrite, permissions, false, null, null,
tracingContext, isNamespaceEnabled);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,17 @@ public static String accountProperty(String property, String account) {
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**
* Maximum consumer lag (count of blob information which is yet to be taken for operation)
* in blob listing which can be tolerated before making producer to wait for
* consumer lag to become tolerable: {@value}.
*/
public static final String FS_AZURE_CONSUMER_MAX_LAG = "fs.azure.blob.dir.list.consumer.max.lag";
/**Maximum number of thread per blob-rename orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
/**Define if lease to be taken on parent of atomic-directory on non-recursive create path {@value }.*/
public static final String FS_AZURE_LEASE_CREATE_NON_RECURSIVE = "fs.azure.lease.create.non.recursive";
private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60_000;
public static final int SIXTY_SECONDS = 60_000;

// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
Expand Down Expand Up @@ -170,10 +170,12 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;
public static final long
DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION = 60_000L;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 10000;

public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = 5;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 5;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 2 * DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_CONSUMER_MAX_LAG = DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_LISTING_ACTION_THREADS = 5;
public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final boolean DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE = false;
public static final int BLOCK_ID_LENGTH = 60;
private FileSystemConfigurations() {}
}
Loading
Loading