Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer condition change + Rename Azcopy tests #127

Merged
merged 41 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
993d4f6
Azcopy Helper
Jun 11, 2024
b3ee83b
Test Infra For Implicit Scenarios
Jun 25, 2024
c7f348e
Assert Azcopy creates implicit paths
Jun 25, 2024
f4ec53a
Azcopy helper Refactored
Jul 1, 2024
2ca1b0e
Azcopy helper Test
Jul 1, 2024
b93a862
Singleton AzcopyHelper
Jul 2, 2024
3c841e2
Unused Iports
Jul 2, 2024
2105ca9
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 2, 2024
12e89de
explicit and implicit tests for rename
saxenapranav Jul 2, 2024
09f40f8
Azcopy Helper
Jun 11, 2024
ed652c8
Test Infra For Implicit Scenarios
Jun 25, 2024
c09c89b
Assert Azcopy creates implicit paths
Jun 25, 2024
ba9c62c
Azcopy helper Refactored
Jul 1, 2024
e8d5a35
Azcopy helper Test
Jul 1, 2024
37c70f1
Singleton AzcopyHelper
Jul 2, 2024
0ee6532
Unused Iports
Jul 2, 2024
299c86d
Resolved Comments
Jul 2, 2024
12d68ad
Azcopy Helper Works only on Blob Endpoint
Jul 2, 2024
138662e
test fixed
saxenapranav Jul 3, 2024
4cf26e5
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 3, 2024
76177f9
fixed some test; braught in tests
saxenapranav Jul 4, 2024
43e5793
handler implicit src dir. added new test in ITestExplicitImplicitRename
saxenapranav Jul 8, 2024
bfc9732
Merge branch 'wasbDepCodeReview' into sp/azcopyTests
saxenapranav Jul 8, 2024
10fe6b6
make azcopy commands in parallel
saxenapranav Jul 8, 2024
6a93c4c
create a method that can parallely create dir and file
saxenapranav Jul 8, 2024
e0cc653
parallelised test's az copy create file/dir
saxenapranav Jul 8, 2024
04bc2aa
code improvement wip
saxenapranav Jul 8, 2024
c203ce0
added javadocs; some test refactors
saxenapranav Jul 8, 2024
00cec1c
test run improvement + refactor
saxenapranav Jul 8, 2024
f618c73
assume if blob endpoint in testAtomicityRedoInvalidFile
saxenapranav Jul 9, 2024
3401e5c
add test for asserting getPathStatus and getListStatus doesnt resume …
saxenapranav Jul 9, 2024
0158163
maxConsumptionLag javadocs; from config
saxenapranav Jul 10, 2024
9bd4eb5
test added
saxenapranav Jul 16, 2024
8422d3e
refactors to get createNonRecursive running
saxenapranav Jul 23, 2024
4158fbf
added test
saxenapranav Jul 23, 2024
efc70fa
nit refactors
saxenapranav Jul 25, 2024
c0ef826
correct way of implementation for createNonRecursive. The aim is to h…
saxenapranav Jul 25, 2024
c37f21d
flow of statistic incremenet
saxenapranav Jul 26, 2024
74de696
clean changes
saxenapranav Jul 26, 2024
6a37197
asf
saxenapranav Jul 26, 2024
b2a78d9
nits
saxenapranav Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE;

/**
* Configuration for Azure Blob FileSystem.
Expand Down Expand Up @@ -409,6 +411,10 @@ public class AbfsConfiguration{
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;
Expand All @@ -417,6 +423,10 @@ public class AbfsConfiguration{
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_LEASE_CREATE_NON_RECURSIVE, DefaultValue = DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE)
private boolean isLeaseOnCreateNonRecursiveEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1446,11 +1456,19 @@ public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}

public boolean isLeaseOnCreateNonRecursiveEnabled() {
return isLeaseOnCreateNonRecursiveEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.services.CreateNonRecursiveCheckActionTaker;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -382,7 +383,8 @@ public FSDataOutputStream create(final Path f,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics,
overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
Expand All @@ -406,18 +408,21 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
ERR_CREATE_ON_ROOT,
null);
}
final Path parent = f.getParent();
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
listener);
final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext);

if (parentFileStatus == null) {
throw new FileNotFoundException("Cannot create file "
+ f.getName() + " because parent folder does not exist.");
try {
Path qualifiedPath = makeQualified(f);
try (CreateNonRecursiveCheckActionTaker actionTaker = getAbfsStore().createNonRecursivePreCheck(
qualifiedPath, tracingContext)) {
return create(f, permission, overwrite, bufferSize, replication,
blockSize, progress);
}
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
}

return create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.CreateNonRecursiveCheckActionTaker;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.PathIOException;
Expand Down Expand Up @@ -600,6 +601,14 @@ public void deleteFilesystem(TracingContext tracingContext)
}
}

/**Checks existence of parent of the given path.*/
public CreateNonRecursiveCheckActionTaker createNonRecursivePreCheck(final Path path,
TracingContext tracingContext)
throws IOException {
return getClient().createNonRecursivePreCheck(path.getParent(),
tracingContext);
}

public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
Expand Down Expand Up @@ -658,7 +667,6 @@ public OutputStream createFile(final Path path,

}
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
Expand Down Expand Up @@ -698,7 +706,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext));
isAppendBlob, null, contextEncryptionAdapter,
tracingContext, getIsNamespaceEnabled(tracingContext));

} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
Expand All @@ -722,7 +731,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, getIsNamespaceEnabled(tracingContext));
isAppendBlob, eTag, contextEncryptionAdapter,
tracingContext, getIsNamespaceEnabled(tracingContext));
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
// Is a parallel access case, as file with eTag was just queried
Expand Down Expand Up @@ -827,7 +837,8 @@ public void createDirectory(final Path path, final FsPermission permission,
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext, isNamespaceEnabled);
false, overwrite, permissions, false, null, null,
tracingContext, isNamespaceEnabled);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,17 @@ public static String accountProperty(String property, String account) {
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**
* Maximum consumer lag (count of blob information which is yet to be taken for operation)
* in blob listing which can be tolerated before making producer to wait for
* consumer lag to become tolerable: {@value}.
*/
public static final String FS_AZURE_CONSUMER_MAX_LAG = "fs.azure.blob.dir.list.consumer.max.lag";
/**Maximum number of thread per blob-rename orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
/**Define if lease to be taken on parent of atomic-directory on non-recursive create path {@value }.*/
public static final String FS_AZURE_LEASE_CREATE_NON_RECURSIVE = "fs.azure.lease.create.non.recursive";
private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60_000;
public static final int SIXTY_SECONDS = 60_000;

// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
Expand Down Expand Up @@ -170,10 +170,12 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;
public static final long
DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION = 60_000L;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 10000;

public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = 5;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 5;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 2 * DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_CONSUMER_MAX_LAG = DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_LISTING_ACTION_THREADS = 5;
public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final boolean DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE = false;
public static final int BLOCK_ID_LENGTH = 60;
private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOB_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOCK;
Expand Down Expand Up @@ -117,6 +118,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ZERO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SIXTY_SECONDS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
Expand Down Expand Up @@ -309,6 +311,49 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
return op;
}

/**{@inheritDoc}*/
@Override
public CreateNonRecursiveCheckActionTaker createNonRecursivePreCheck(Path parentPath, TracingContext tracingContext)
throws IOException {
AbfsLease abfsLease = null;
if (abfsConfiguration.isLeaseOnCreateNonRecursiveEnabled()
&& isAtomicRenameKey(parentPath.toUri().getPath())) {
try {
try {
/*
* Get exclusive lease on parent directory if the path is atomic rename key.
* This is to ensure that rename on the parent path doesn't happen during
* non-recursive create operation.
*/
abfsLease = takeAbfsLease(parentPath.toUri().getPath(), SIXTY_SECONDS,
tracingContext);
} catch (AbfsLease.LeaseException ex) {
if (ex.getCause() instanceof AbfsRestOperationException) {
throw (AbfsRestOperationException) ex.getCause();
}
throw ex;
} finally {
abfsCounters.incrementCounter(CALL_GET_FILE_STATUS, 1);
}
/*
* At this moment we have an exclusive lease on the parent directory, and
* it is ensured that no parallel active rename is taking place. We have to
* resume pending rename action if any on the parent directory.
*/
takeGetPathStatusAtomicRenameKeyAction(parentPath, abfsLease,
tracingContext);
return new CreateNonRecursiveCheckActionTaker(this, parentPath,
abfsLease);
} catch (IOException ex) {
if (abfsLease != null) {
abfsLease.free();
}
throw ex;
}
}
return super.createNonRecursivePreCheck(parentPath, tracingContext);
}

/**
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob></a>.
* Creates a file or directory(marker file) at specified path.
Expand All @@ -327,9 +372,18 @@ public AbfsRestOperation createPath(final String path,
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext, final boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
return createPath(path, isFile, overwrite, permissions, isAppendBlob,
eTag,
contextEncryptionAdapter, tracingContext, isNamespaceEnabled, false);
}

@VisibleForTesting
public AbfsLease takeAbfsLease(final String path,
final long timeDuration,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
return new AbfsLease(this, path, false, timeDuration, null, tracingContext);
}

/**
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob></a>.
* Creates a file or directory(marker file) at specified path.
Expand Down Expand Up @@ -1146,7 +1200,7 @@ public AbfsRestOperation getPathStatus(final String path,
if (tracingContext != null
&& tracingContext.getOpType() == FSOperationType.GET_FILESTATUS
&& op.getResult() != null && checkIsDir(op.getResult())) {
takeGetPathStatusAtomicRenameKeyAction(new Path(path), tracingContext);
takeGetPathStatusAtomicRenameKeyAction(new Path(path), null, tracingContext);
}
return op;
}
Expand Down Expand Up @@ -1485,11 +1539,13 @@ public boolean isAtomicRenameKey(String key) {
* Action to be taken when atomic-key is present on a getPathStatus path.
*
* @param path path of the pendingJson for the atomic path.
* @param pathLease lease on the path
* @param tracingContext tracing context.
*
* @throws AzureBlobFileSystemException server error or the path is renamePending json file and action is taken.
*/
public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
private void takeGetPathStatusAtomicRenameKeyAction(final Path path,
final AbfsLease pathLease,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
if (path == null || path.isRoot() || !isAtomicRenameKey(path.toUri().getPath())) {
return;
Expand All @@ -1515,7 +1571,7 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)),
tracingContext);
tracingContext, pathLease);
renameAtomicity.redo();
renameSrcHasChanged = false;
} catch (AbfsRestOperationException ex) {
Expand Down Expand Up @@ -1563,7 +1619,7 @@ public boolean takeListPathAtomicRenameKeyAction(final Path path,
}
try {
RenameAtomicity renameAtomicity
= getRedoRenameAtomicity(path, renamePendingJsonLen, tracingContext);
= getRedoRenameAtomicity(path, renamePendingJsonLen, tracingContext, null);
renameAtomicity.redo();
} catch (AbfsRestOperationException ex) {
/*
Expand All @@ -1583,13 +1639,13 @@ public boolean takeListPathAtomicRenameKeyAction(final Path path,
}

@VisibleForTesting
RenameAtomicity getRedoRenameAtomicity(final Path path, int fileLen,
final TracingContext tracingContext) {
RenameAtomicity renameAtomicity = new RenameAtomicity(path,
public RenameAtomicity getRedoRenameAtomicity(final Path renamePendingJsonPath, int fileLen,
final TracingContext tracingContext, final AbfsLease sourcePathLease) {
RenameAtomicity renameAtomicity = new RenameAtomicity(renamePendingJsonPath,
fileLen,
tracingContext,
null,
this);
this, sourcePathLease);
return renameAtomicity;
}

Expand Down
Loading
Loading