Skip to content

Commit

Permalink
Permit backed futures to prevent timeouts during upload bursts
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed Feb 5, 2024
1 parent 90a815e commit 8005bdf
Show file tree
Hide file tree
Showing 15 changed files with 972 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,26 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Number of retries in case of a transfer failure.
*/
public static Setting<Integer> S3_MAX_TRANSFER_RETRIES = Setting.intSetting(
"s3_max_transfer_retries",
3,
Setting.Property.NodeScope
);

/**
* Percentage of total available permits to be available for priority transfers.
*/
public static Setting<Integer> S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting(
"s3_priority_permit_alloc_perc",
70,
21,
80,
Setting.Property.NodeScope
);

/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand All @@ -69,6 +70,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
Expand All @@ -81,6 +84,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion";
private static final String PRIORITY_STREAM_READER = "priority_stream_reader";
private static final String FUTURE_COMPLETION = "future_completion";
private static final String REMOTE_TRANSFER_RETRY = "remote_transfer_retry";
private static final String STREAM_READER = "stream_reader";

protected final S3Service service;
Expand All @@ -91,6 +95,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService remoteTransferRetryPool;
private ScheduledExecutorService scheduler;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -120,6 +126,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
TimeValue.timeValueMinutes(5)
)
);
executorBuilders.add(
new ScalingExecutorBuilder(REMOTE_TRANSFER_RETRY, allocatedProcessors(settings), allocatedProcessors(settings) * 2, TimeValue.timeValueMinutes(5))
);
return executorBuilders;
}

Expand Down Expand Up @@ -189,6 +198,8 @@ public Collection<Object> createComponents(
threadPool.executor(STREAM_READER),
new AsyncTransferEventLoopGroup(normalEventLoopThreads)
);
this.remoteTransferRetryPool = threadPool.executor(REMOTE_TRANSFER_RETRY);
this.scheduler = threadPool.scheduler();
return Collections.emptyList();
}

Expand All @@ -204,7 +215,14 @@ protected S3Repository createRepository(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader()
urgentExecutorBuilder.getStreamReader(),
new PermitBackedRetryableFutureUtils<>(
S3Repository.S3_MAX_TRANSFER_RETRIES.get(clusterService.getSettings()),
// High permit allocation because each op acquiring permit performs disk IO, computation and network IO.
Math.max(allocatedProcessors(clusterService.getSettings()) * 5, 10),
((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings()))/100,
remoteTransferRetryPool, scheduler)

);
return new S3Repository(
metadata,
Expand Down Expand Up @@ -263,7 +281,9 @@ public List<Setting<?>> getSettings() {
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED
S3Repository.UPLOAD_RETRY_ENABLED,
S3Repository.S3_MAX_TRANSFER_RETRIES,
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.repositories.s3.async;

import org.opensearch.repositories.s3.StatsMetricPublisher;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
Expand All @@ -24,7 +25,6 @@
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand All @@ -35,6 +35,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

/**
* Responsible for handling parts of the original multipart request
Expand All @@ -56,8 +57,8 @@ public class AsyncPartsHandler {
* @param inputStreamContainers Checksum containers
* @param statsMetricPublisher sdk metric publisher
* @return list of completable futures
* @throws IOException thrown in case of an IO error
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static List<CompletableFuture<CompletedPart>> uploadParts(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
Expand All @@ -69,35 +70,48 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher,
boolean uploadRetryEnabled
) throws IOException {
boolean uploadRetryEnabled,
PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils
) throws InterruptedException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx);
inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength()));
UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder()
.bucket(uploadRequest.getBucket())
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
uploadPart(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
completedParts,
inputStreamContainers,
futures,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest,
uploadRetryEnabled
);
int finalPartIdx = partIdx;
Supplier<CompletableFuture<CompletedPart>> partFutureSupplier = () -> {
InputStreamContainer inputStreamContainer;
try {
inputStreamContainer = streamContext.provideStream(finalPartIdx);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
inputStreamContainers.set(finalPartIdx, new CheckedContainer(inputStreamContainer.getContentLength()));
UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder()
.bucket(uploadRequest.getBucket())
.partNumber(finalPartIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
return uploadPart(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
completedParts,
inputStreamContainers,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest,
uploadRetryEnabled
);
};

CompletableFuture<CompletedPart> retryableFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture(
partFutureSupplier, uploadRequest.getWritePriority(), requestContext);
futures.add(retryableFuture);
}

return futures;
Expand Down Expand Up @@ -133,26 +147,21 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa
}));
}

public static InputStream maybeRetryInputStream(
InputStream inputStream,
WritePriority writePriority,
boolean uploadRetryEnabled,
long contentLength
) {
public static InputStream maybeRetryInputStream(InputStream inputStream, WritePriority writePriority, boolean uploadRetryEnabled,
long contentLength) {
if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new BufferedInputStream(inputStream, (int) (contentLength + 1));
}
return inputStream;
}

private static void uploadPart(
private static CompletableFuture<CompletedPart> uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
List<CompletableFuture<CompletedPart>> futures,
UploadPartRequest uploadPartRequest,
InputStreamContainer inputStreamContainer,
UploadRequest uploadRequest,
Expand All @@ -169,12 +178,8 @@ private static void uploadPart(
streamReadExecutor = executorService;
}

InputStream inputStream = maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRetryEnabled,
uploadPartRequest.contentLength()
);
InputStream inputStream = maybeRetryInputStream(inputStreamContainer.getInputStream(), uploadRequest.getWritePriority(),
uploadRetryEnabled, uploadPartRequest.contentLength());
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
Expand All @@ -183,19 +188,19 @@ private static void uploadPart(
);

CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
Expand All @@ -205,9 +210,9 @@ private static void uploadPart(
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
return convertFuture;
}

private static CompletedPart convertUploadPartResponse(
Expand Down
Loading

0 comments on commit 8005bdf

Please sign in to comment.