Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer condition change + Rename Azcopy tests #127

Merged
merged 41 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
993d4f6
Azcopy Helper
Jun 11, 2024
b3ee83b
Test Infra For Implicit Scenarios
Jun 25, 2024
c7f348e
Assert Azcopy creates implicit paths
Jun 25, 2024
f4ec53a
Azcopy helper Refactored
Jul 1, 2024
2ca1b0e
Azcopy helper Test
Jul 1, 2024
b93a862
Singleton AzcopyHelper
Jul 2, 2024
3c841e2
Unused Iports
Jul 2, 2024
2105ca9
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 2, 2024
12e89de
explicit and implicit tests for rename
saxenapranav Jul 2, 2024
09f40f8
Azcopy Helper
Jun 11, 2024
ed652c8
Test Infra For Implicit Scenarios
Jun 25, 2024
c09c89b
Assert Azcopy creates implicit paths
Jun 25, 2024
ba9c62c
Azcopy helper Refactored
Jul 1, 2024
e8d5a35
Azcopy helper Test
Jul 1, 2024
37c70f1
Singleton AzcopyHelper
Jul 2, 2024
0ee6532
Unused Iports
Jul 2, 2024
299c86d
Resolved Comments
Jul 2, 2024
12d68ad
Azcopy Helper Works only on Blob Endpoint
Jul 2, 2024
138662e
test fixed
saxenapranav Jul 3, 2024
4cf26e5
Merge branch 'azcopyHelperCode' into sp/azcopyTests
saxenapranav Jul 3, 2024
76177f9
fixed some test; braught in tests
saxenapranav Jul 4, 2024
43e5793
handler implicit src dir. added new test in ITestExplicitImplicitRename
saxenapranav Jul 8, 2024
bfc9732
Merge branch 'wasbDepCodeReview' into sp/azcopyTests
saxenapranav Jul 8, 2024
10fe6b6
make azcopy commands in parallel
saxenapranav Jul 8, 2024
6a93c4c
create a method that can parallely create dir and file
saxenapranav Jul 8, 2024
e0cc653
parallelised test's az copy create file/dir
saxenapranav Jul 8, 2024
04bc2aa
code improvement wip
saxenapranav Jul 8, 2024
c203ce0
added javadocs; some test refactors
saxenapranav Jul 8, 2024
00cec1c
test run improvement + refactor
saxenapranav Jul 8, 2024
f618c73
assume if blob endpoint in testAtomicityRedoInvalidFile
saxenapranav Jul 9, 2024
3401e5c
add test for asserting getPathStatus and getListStatus doesnt resume …
saxenapranav Jul 9, 2024
0158163
maxConsumptionLag javadocs; from config
saxenapranav Jul 10, 2024
9bd4eb5
test added
saxenapranav Jul 16, 2024
8422d3e
refactors to get createNonRecursive running
saxenapranav Jul 23, 2024
4158fbf
added test
saxenapranav Jul 23, 2024
efc70fa
nit refactors
saxenapranav Jul 25, 2024
c0ef826
correct way of implementation for createNonRecursive. The aim is to h…
saxenapranav Jul 25, 2024
c37f21d
flow of statistic incremenet
saxenapranav Jul 26, 2024
74de696
clean changes
saxenapranav Jul 26, 2024
6a37197
asf
saxenapranav Jul 26, 2024
b2a78d9
nits
saxenapranav Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ public class AbfsConfiguration{
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;
Expand Down Expand Up @@ -1446,6 +1450,10 @@ public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ public static String accountProperty(String property, String account) {
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**
* Maximum consumer lag (count of blob information which is yet to be taken for operation)
* in blob listing which can be tolerated before making producer to wait for
* consumer lag to become tolerable: {@value}.
*/
public static final String FS_AZURE_CONSUMER_MAX_LAG = "fs.azure.blob.dir.list.consumer.max.lag";
/**Maximum number of thread per blob-rename orchestration {@value}*/
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration {@value}*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;
public static final long
DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION = 60_000L;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 10000;

public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = 5;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 5;
public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 2 * DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_CONSUMER_MAX_LAG = DEFAULT_AZURE_LIST_MAX_RESULTS;
public static final int DEFAULT_FS_AZURE_LISTING_ACTION_THREADS = 5;
public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
public static final int BLOCK_ID_LENGTH = 60;
private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ public boolean isAtomicRenameKey(String key) {
*
* @throws AzureBlobFileSystemException server error or the path is renamePending json file and action is taken.
*/
public void takeGetPathStatusAtomicRenameKeyAction(final Path path,
private void takeGetPathStatusAtomicRenameKeyAction(final Path path,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
if (path == null || path.isRoot() || !isAtomicRenameKey(path.toUri().getPath())) {
return;
Expand Down Expand Up @@ -1583,7 +1583,7 @@ public boolean takeListPathAtomicRenameKeyAction(final Path path,
}

@VisibleForTesting
RenameAtomicity getRedoRenameAtomicity(final Path path, int fileLen,
public RenameAtomicity getRedoRenameAtomicity(final Path path, int fileLen,
final TracingContext tracingContext) {
RenameAtomicity renameAtomicity = new RenameAtomicity(path,
fileLen,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ public AbfsClientRenameResult execute() throws AzureBlobFileSystemException {
boolean result = false;
if (preCheck(src, dst, pathInformation)) {
RenameAtomicity renameAtomicity = null;
if (pathInformation.getIsDirectory()
&& pathInformation.getIsImplicit()) {
AbfsRestOperation createMarkerOp = abfsClient.createPath(src.toUri().getPath(), false, false,
null,
false, null, null, tracingContext, false);
pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
}
try {
if (isAtomicRename) {
/*
Expand Down Expand Up @@ -263,7 +270,8 @@ private void validateDestinationIsNotSubDir(final Path src,
private void setSrcPathInformation(final Path src,
final PathInformation pathInformation)
throws AzureBlobFileSystemException {
pathInformation.copy(getPathInformation(src, tracingContext));
pathInformation.
copy(getPathInformation(src, tracingContext));
}

/**
Expand Down Expand Up @@ -519,12 +527,13 @@ private PathInformation getPathInformation(Path path,

return new PathInformation(true,
abfsClient.checkIsDir(op.getResult()),
extractEtagHeader(op.getResult()));
extractEtagHeader(op.getResult()),
op.getResult() instanceof AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didnt get the need for instance check here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPathStatus on blobClient will give instance of AbfsHttpOperationWithFixedResultForGetFileStatus for an implicit path. Hence used.

} catch (AzureBlobFileSystemException e) {
if (e instanceof AbfsRestOperationException) {
AbfsRestOperationException ex = (AbfsRestOperationException) e;
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
return new PathInformation(false, false, null);
return new PathInformation(false, false, null, false);
}
}
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS;

/**
* ListActionTaker is an abstract class that provides a way to list the paths
Expand All @@ -48,7 +51,8 @@
*/
public abstract class ListActionTaker {

private static final Logger LOG = LoggerFactory.getLogger(ListActionTaker.class);
private static final Logger LOG = LoggerFactory.getLogger(
ListActionTaker.class);

protected final Path path;

Expand All @@ -75,7 +79,8 @@ public ListActionTaker(Path path,

abstract boolean takeAction(Path path) throws AzureBlobFileSystemException;

private boolean takeAction(List<Path> paths) throws AzureBlobFileSystemException {
private boolean takeAction(List<Path> paths)
throws AzureBlobFileSystemException {
List<Future<Boolean>> futureList = new ArrayList<>();
for (Path path : paths) {
Future<Boolean> future = executorService.submit(() -> {
Expand Down Expand Up @@ -111,12 +116,12 @@ private boolean takeAction(List<Path> paths) throws AzureBlobFileSystemException
* path and supply them to parallel thread for relevant action which is defined
* in {@link #takeAction(Path)}.
*/
public boolean listRecursiveAndTakeAction() throws AzureBlobFileSystemException {
public boolean listRecursiveAndTakeAction()
throws AzureBlobFileSystemException {
AbfsConfiguration configuration = abfsClient.getAbfsConfiguration();
Thread producerThread = null;
try {
ListBlobQueue listBlobQueue = new ListBlobQueue(
configuration.getProducerQueueMaxSize(), getMaxConsumptionParallelism());
ListBlobQueue listBlobQueue = createListBlobQueue(configuration);
producerThread = new Thread(() -> {
try {
produceConsumableList(listBlobQueue);
Expand Down Expand Up @@ -150,43 +155,66 @@ public boolean listRecursiveAndTakeAction() throws AzureBlobFileSystemException
}
}

@VisibleForTesting
protected ListBlobQueue createListBlobQueue(final AbfsConfiguration configuration)
throws InvalidConfigurationValueException {
return new ListBlobQueue(
configuration.getProducerQueueMaxSize(), getMaxConsumptionParallelism(),
configuration.getListingMaxConsumptionLag());
}

private void produceConsumableList(final ListBlobQueue listBlobQueue)
throws AzureBlobFileSystemException {
String continuationToken = null;
do {
List<Path> paths = new ArrayList<>();
final int queueAvailableSize = listBlobQueue.availableSize();
if (queueAvailableSize == 0) {
break;
}
final AbfsRestOperation op;
try {
op = abfsClient.listPath(path.toUri().getPath(),
true,
queueAvailableSize, continuationToken,
tracingContext);
} catch (AzureBlobFileSystemException ex) {
throw ex;
} catch (IOException ex) {
throw new AbfsRestOperationException(-1, null,
"Unknown exception from listing: " + ex.getMessage(), ex);
}

ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
if (retrievedSchema == null) {
continue;
}
continuationToken
= ((BlobListResultSchema) retrievedSchema).getNextMarker();
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
Path entryPath = new Path(ROOT_PATH, entry.name());
if (!entryPath.equals(this.path)) {
paths.add(entryPath);
}
}
listBlobQueue.enqueue(paths);
continuationToken = listAndEnqueue(listBlobQueue, continuationToken);
} while (!producerThreadToBeStopped.get() && continuationToken != null
&& !listBlobQueue.getConsumptionFailed());
listBlobQueue.complete();
}

@VisibleForTesting
protected String listAndEnqueue(final ListBlobQueue listBlobQueue,
String continuationToken) throws AzureBlobFileSystemException {
final int queueAvailableSizeForProduction = Math.min(
DEFAULT_AZURE_LIST_MAX_RESULTS,
listBlobQueue.availableSizeForProduction());
if (queueAvailableSizeForProduction == 0) {
return null;
}
final AbfsRestOperation op;
try {
op = abfsClient.listPath(path.toUri().getPath(),
true,
queueAvailableSizeForProduction, continuationToken,
tracingContext);
} catch (AzureBlobFileSystemException ex) {
throw ex;
} catch (IOException ex) {
throw new AbfsRestOperationException(-1, null,
"Unknown exception from listing: " + ex.getMessage(), ex);
}

ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
if (retrievedSchema == null) {
return continuationToken;
}
continuationToken
= ((BlobListResultSchema) retrievedSchema).getNextMarker();
List<Path> paths = new ArrayList<>();
addPaths(paths, retrievedSchema);
listBlobQueue.enqueue(paths);
return continuationToken;
}

@VisibleForTesting
protected void addPaths(final List<Path> paths,
final ListResultSchema retrievedSchema) {
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
Path entryPath = new Path(ROOT_PATH, entry.name());
if (!entryPath.equals(this.path)) {
paths.add(entryPath);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;

/**
* Data-structure to hold the list of paths to be processed. The paths are
Expand All @@ -49,9 +52,40 @@ class ListBlobQueue {

private volatile AzureBlobFileSystemException failureFromProducer;

ListBlobQueue(int maxSize, int consumeSetSize) {
/**
* Maximum number of entries in the queue allowed for letting the producer to
* produce. If the current size of the queue is greater than or equal to
* maxConsumptionLag, the producer will wait until the current size of the queue
* becomes lesser than maxConsumptionLag. This parameter is used to control the
* behavior of the producer-consumer pattern and preventing producer from
* rapidly producing very small amount of items.
* <p>
* For example, let's say maxSize is 10000 and maxConsumptionLag is 5000.
* The producer will stop producing when the current size of the queue is 5000
* and will wait until the current size of the queue becomes lesser than 5000.
* Once, the size becomes lesser than 5000, producer can produce (maxSize - currentSize)
* of items, which would make the current size of the queue to be 10000. Then again
* it will wait for 5000 items to be consumed before generating next 5000 items.
* <p>
* If this is not used, the producer will keep on producing items as soon as
* the queue become available with small size. Let say, 5 items got consumed,
* producer would make a server call for only 5 items and would populate the queue.
* <p>
* This mechanism would prevent producer making server calls for very small amount
* of items.
*/
private final int maxConsumptionLag;

ListBlobQueue(int maxSize, int consumeSetSize, int maxConsumptionLag)
throws InvalidConfigurationValueException {
this.maxSize = maxSize;
this.maxConsumptionLag = maxConsumptionLag;
this.consumeSetSize = consumeSetSize;

if (maxConsumptionLag >= maxSize) {
throw new InvalidConfigurationValueException(FS_AZURE_CONSUMER_MAX_LAG,
"maxConsumptionLag should be lesser than maxSize");
}
}

void markProducerFailure(AzureBlobFileSystemException failure) {
Expand Down Expand Up @@ -102,27 +136,27 @@ private List<Path> dequeue() {
pathListForConsumption.add(pathQueue.poll());
counter++;
}
if(counter > 0) {
if (counter > 0) {
notify();
}
return pathListForConsumption;
}

private synchronized int size() {
synchronized int size() {
return pathQueue.size();
}

/**
* Returns the available size of the queue. This is calculated by subtracting the current size of the queue
* from its maximum size. If the queue is full, this method will wait until some elements are consumed and
* space becomes available. If consumption has failed, it immediately returns zero. This method is synchronized
* to prevent concurrent modifications of the queue.
* Returns the available size of the queue for production. This is calculated by subtracting
* the current size of the queue from its maximum size. This method waits until
* the current size of the queue becomes lesser than the maxConsumptionLag. This
* method is synchronized to prevent concurrent modifications of the queue.
*
* @return the available size of the queue
* @return the available size of the queue.
*/
synchronized int availableSize() {
while(maxSize - size() <= 0) {
if(isConsumptionFailed) {
synchronized int availableSizeForProduction() {
while (size() >= maxConsumptionLag) {
if (isConsumptionFailed) {
return 0;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
public class PathInformation {
private Boolean pathExists;
private Boolean isDirectory;
private Boolean isImplicit;
private String eTag;

public PathInformation(Boolean pathExists, Boolean isDirectory, String eTag) {
public PathInformation(Boolean pathExists, Boolean isDirectory, String eTag, Boolean isImplicit) {
this.pathExists = pathExists;
this.isDirectory = isDirectory;
this.eTag = eTag;
this.isImplicit = isImplicit;
}

public PathInformation() {
Expand All @@ -36,6 +38,7 @@ public void copy(PathInformation pathInformation) {
this.pathExists = pathInformation.getPathExists();
this.isDirectory = pathInformation.getIsDirectory();
this.eTag = pathInformation.getETag();
this.isImplicit = pathInformation.getIsImplicit();
}

public String getETag() {
Expand All @@ -49,4 +52,12 @@ public Boolean getPathExists() {
public Boolean getIsDirectory() {
return isDirectory;
}

public Boolean getIsImplicit() {
return isImplicit;
}

void setETag(String eTag) {
this.eTag = eTag;
}
}
Loading
Loading