Skip to content

Commit

Permalink
HADOOP-18948. S3A. Add option fs.s3a.directory.operations.purge.uploa…
Browse files Browse the repository at this point in the history
…ds to purge on rename/delete (apache#6218)


S3A directory delete and rename will optionally abort all pending multipart uploads
in their under their to-be-deleted paths when.

fs.s3a.directory.operations.purge.upload is true

It is off by default.

The filesystems hasPathCapability("fs.s3a.directory.operations.purge.upload")
probe will return true when this feature is enabled.

Multipart uploads may accrue from interrupted data writes, uncommitted 
staging/magic committer jobs and other operations/applications. On AWS S3
lifecycle rules are the recommended way to clean these; this change improves
support for stores which lack these rules.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored and ahmarsuhail committed Dec 5, 2023
1 parent e14b08b commit 726e217
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ public final class StoreStatisticNames {
public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
"object_multipart_aborted";

/**
* Object multipart list request.
* Value :{@value}.
*/
public static final String OBJECT_MULTIPART_UPLOAD_LIST =
"object_multipart_list";

/**
* Object put/multipart upload count.
* Value :{@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,4 +1318,19 @@ private Constants() {
* The bucket region header.
*/
public static final String BUCKET_REGION_HEADER = "x-amz-bucket-region";

/**
* Should directory operations purge uploads?
* This adds at least one parallelized list operation to the call,
* plus the overhead of deletions.
* Value: {@value}.
*/
public static final String DIRECTORY_OPERATIONS_PURGE_UPLOADS =
"fs.s3a.directory.operations.purge.uploads";

/**
* Default value of {@link #DIRECTORY_OPERATIONS_PURGE_UPLOADS}: {@value}.
*/
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.store.audit.AuditSpan;

import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;


Expand Down Expand Up @@ -66,7 +66,7 @@ private MultipartUtils() { }
* @param maxKeys maximum batch size to request at a time from S3.
* @return an iterator of matching uploads
*/
static MultipartUtils.UploadIterator listMultipartUploads(
static RemoteIterator<MultipartUpload> listMultipartUploads(
final StoreContext storeContext,
S3Client s3,
@Nullable String prefix,
Expand Down Expand Up @@ -196,7 +196,7 @@ private void requestNextBatch() throws IOException {

listing = invoker.retry("listMultipartUploads", prefix, true,
trackDurationOfOperation(storeContext.getInstrumentation(),
MULTIPART_UPLOAD_LIST.getSymbol(),
OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(),
() -> s3.listMultipartUploads(requestBuilder.build())));
LOG.debug("Listing found {} upload(s)",
listing.uploads().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;

/**
Expand Down Expand Up @@ -384,6 +385,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private SignerManager signerManager;
private S3AInternals s3aInternals;

/**
* Do directory operations purge pending uploads?
*/
private boolean dirOperationsPurgeUploads;

/**
* Page size for deletions.
*/
Expand Down Expand Up @@ -565,6 +571,9 @@ public void initialize(URI name, Configuration originalConf)
//check but do not store the block size
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
// should the delete also purge uploads.
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT);

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
Expand Down Expand Up @@ -1231,7 +1240,7 @@ public void abortOutstandingMultipartUploads(long seconds)
purgeBefore);
invoker.retry("Purging multipart uploads", bucket, true,
() -> {
MultipartUtils.UploadIterator uploadIterator =
RemoteIterator<MultipartUpload> uploadIterator =
MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);

while (uploadIterator.hasNext()) {
Expand Down Expand Up @@ -2280,12 +2289,14 @@ private long innerRename(Path source, Path dest)

// Initiate the rename.
// this will call back into this class via the rename callbacks
final StoreContext storeContext = createStoreContext();
RenameOperation renameOperation = new RenameOperation(
createStoreContext(),
storeContext,
src, srcKey, p.getLeft(),
dst, dstKey, p.getRight(),
new OperationCallbacksImpl(),
pageSize);
new OperationCallbacksImpl(storeContext),
pageSize,
dirOperationsPurgeUploads);
return renameOperation.execute();
}

Expand All @@ -2306,8 +2317,19 @@ private final class OperationCallbacksImpl implements OperationCallbacks {
/** Audit Span at time of creation. */
private final AuditSpan auditSpan;

private OperationCallbacksImpl() {
auditSpan = getActiveAuditSpan();
private final StoreContext storeContext;

private OperationCallbacksImpl(final StoreContext storeContext) {
this.storeContext = requireNonNull(storeContext);
this.auditSpan = storeContext.getActiveAuditSpan();
}

/**
* Get the audit span.
* @return the span
*/
private AuditSpan getAuditSpan() {
return auditSpan;
}

@Override
Expand Down Expand Up @@ -2407,7 +2429,29 @@ public RemoteIterator<S3AFileStatus> listObjects(
Listing.ACCEPT_ALL_BUT_S3N,
auditSpan));
}
}

/**
* Abort multipart uploads under a path.
* @param prefix prefix for uploads to abort
* @return a count of aborts
* @throws IOException trouble; FileNotFoundExceptions are swallowed.
*/
@Override
@Retries.RetryTranslated
public long abortMultipartUploadsUnderPrefix(String prefix)
throws IOException {
getAuditSpan().activate();
// this deactivates the audit span somehow
final RemoteIterator<MultipartUpload> uploads =
S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix);
// so reactivate it.
getAuditSpan().activate();
return foreach(uploads, upload ->
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
S3AFileSystem.this.abortMultipartUpload(upload)));
}

} // end OperationCallbacksImpl

/**
* Callbacks from {@link Listing}.
Expand Down Expand Up @@ -3368,14 +3412,17 @@ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOEx
// span covers delete, getFileStatus, fake directory operations.
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
path.toString(), null)) {
// SC will include active span
final StoreContext storeContext = createStoreContext();
boolean outcome = trackDuration(getDurationTrackerFactory(),
INVOCATION_DELETE.getSymbol(),
new DeleteOperation(
createStoreContext(),
storeContext,
innerGetFileStatus(path, true, StatusProbeEnum.ALL),
recursive,
new OperationCallbacksImpl(),
pageSize));
new OperationCallbacksImpl(storeContext),
pageSize,
dirOperationsPurgeUploads));
if (outcome) {
try {
maybeCreateFakeParentDirectory(path);
Expand Down Expand Up @@ -5148,13 +5195,39 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status)
@InterfaceAudience.Private
@Retries.RetryTranslated
@AuditEntryPoint
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
public RemoteIterator<MultipartUpload> listUploads(@Nullable String prefix)
throws IOException {
// span is picked up retained in the listing.
checkNotClosed();
try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(),
prefix, null)) {
return listUploadsUnderPrefix(createStoreContext(), prefix);
}
}

/**
* List any pending multipart uploads whose keys begin with prefix, using
* an iterator that can handle an unlimited number of entries.
* See {@link #listMultipartUploads(String)} for a non-iterator version of
* this.
* @param storeContext store conext.
* @param prefix optional key prefix to search
* @return Iterator over multipart uploads.
* @throws IOException on failure
*/
@InterfaceAudience.Private
@Retries.RetryTranslated
public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
final StoreContext storeContext,
final @Nullable String prefix)
throws IOException {
// span is picked up retained in the listing.
return trackDurationAndSpan(MULTIPART_UPLOAD_LIST, prefix, null, () ->
MultipartUtils.listMultipartUploads(
createStoreContext(), s3Client, prefix, maxKeys
));
String p = prefix;
if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
p = prefix + "/";
}
// duration tracking is done in iterator.
return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
}

/**
Expand All @@ -5176,9 +5249,10 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
}
String p = prefix;
return invoker.retry("listMultipartUploads", p, true, () -> {
ListMultipartUploadsRequest.Builder requestBuilder = getRequestFactory()
.newListMultipartUploadsRequestBuilder(p);
return s3Client.listMultipartUploads(requestBuilder.build()).uploads();
final ListMultipartUploadsRequest request = getRequestFactory()
.newListMultipartUploadsRequestBuilder(p).build();
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
s3Client.listMultipartUploads(request).uploads());
});
}

Expand All @@ -5187,37 +5261,35 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
* Retry policy: none.
* @param destKey destination key
* @param uploadId Upload ID
* @throws IOException IO failure, including any uprated SdkException
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
s3Client.abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build());
@Retries.OnceTranslated
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
s3Client.abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build()));
}

/**
* Abort a multipart upload.
* Retry policy: none.
* @param upload the listed upload to abort.
* @throws IOException IO failure, including any uprated SdkException
*/
@Retries.OnceRaw
void abortMultipartUpload(MultipartUpload upload) {
String destKey;
String uploadId;
destKey = upload.key();
uploadId = upload.uploadId();
if (LOG.isInfoEnabled()) {
@Retries.OnceTranslated
public void abortMultipartUpload(MultipartUpload upload) throws IOException {
String destKey = upload.key();
String uploadId = upload.uploadId();
if (LOG.isDebugEnabled()) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
uploadId, destKey, upload.initiator(),
df.format(Date.from(upload.initiated())));
}
s3Client.abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build());
abortMultipartUpload(destKey, uploadId);
}

/**
Expand Down Expand Up @@ -5263,13 +5335,17 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;

// Do directory operations purge uploads.
case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
return dirOperationsPurgeUploads;

// etags are avaialable in listings, but they
// are not consistent across renames.
// therefore, only availability is declared
case CommonPathCapabilities.ETAGS_AVAILABLE:
return true;

/*
/*
* Marker policy capabilities are handed off.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
Expand Down Expand Up @@ -5539,7 +5615,7 @@ public MarkerToolOperations createMarkerToolOperations(final String target)
throws IOException {
createSpan("marker-tool-scan", target,
null);
return new MarkerToolOperationsImpl(new OperationCallbacksImpl());
return new MarkerToolOperationsImpl(new OperationCallbacksImpl(createStoreContext()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ public enum Statistic {
StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_ABORTED,
"Object multipart upload aborted",
TYPE_DURATION),
OBJECT_PUT_REQUESTS(
OBJECT_MULTIPART_UPLOAD_LIST(
StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_LIST,
"Object multipart list request issued",
TYPE_DURATION), OBJECT_PUT_REQUESTS(
StoreStatisticNames.OBJECT_PUT_REQUEST,
"Object put/multipart upload count",
TYPE_DURATION),
Expand Down
Loading

0 comments on commit 726e217

Please sign in to comment.