Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Permit backed futures to prevent timeouts during upload bursts #12159

Merged
merged 6 commits into from
May 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,22 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null,
null
) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Generic stats of repository-s3 plugin.
*/
public class GenericStatsMetricPublisher {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
gbbafna marked this conversation as resolved.
Show resolved Hide resolved

private final AtomicLong normalPriorityQSize = new AtomicLong();
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
private final AtomicLong lowPriorityQSize = new AtomicLong();
private final AtomicInteger lowPriorityPermits = new AtomicInteger();
private final long normalPriorityQCapacity;
private final int maxNormalPriorityPermits;
private final long lowPriorityQCapacity;
private final int maxLowPriorityPermits;

public GenericStatsMetricPublisher(
long normalPriorityQCapacity,
int maxNormalPriorityPermits,
long lowPriorityQCapacity,
int maxLowPriorityPermits
) {
this.normalPriorityQCapacity = normalPriorityQCapacity;
this.maxNormalPriorityPermits = maxNormalPriorityPermits;
this.lowPriorityQCapacity = lowPriorityQCapacity;
this.maxLowPriorityPermits = maxLowPriorityPermits;
}

public void updateNormalPriorityQSize(long qSize) {
normalPriorityQSize.addAndGet(qSize);
}

public void updateLowPriorityQSize(long qSize) {
lowPriorityQSize.addAndGet(qSize);
}

Check warning on line 48 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java#L47-L48

Added lines #L47 - L48 were not covered by tests

public void updateNormalPermits(boolean increment) {
if (increment) {
normalPriorityPermits.incrementAndGet();
} else {
normalPriorityPermits.decrementAndGet();
}
}

public void updateLowPermits(boolean increment) {
if (increment) {
lowPriorityPermits.incrementAndGet();
} else {
lowPriorityPermits.decrementAndGet();
}
}

public long getNormalPriorityQSize() {
return normalPriorityQSize.get();
}

public int getAcquiredNormalPriorityPermits() {
return normalPriorityPermits.get();
}

public long getLowPriorityQSize() {
return lowPriorityQSize.get();
}

public int getAcquiredLowPriorityPermits() {
return lowPriorityPermits.get();
}

Map<String, Long> stats() {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, Long> results = new HashMap<>();
results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity);
results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity);
results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits);
results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits);
return results;

Check warning on line 88 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java#L83-L88

Added lines #L83 - L88 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

Expand Down Expand Up @@ -218,7 +219,14 @@
writeContext.getMetadata()
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
vikasvb90 marked this conversation as resolved.
Show resolved Hide resolved
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -258,23 +266,55 @@
} else {
s3AsyncClient = amazonS3Reference.get().client();
}
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});

if (writeContext.getWritePriority() == WritePriority.URGENT
|| writeContext.getWritePriority() == WritePriority.HIGH
|| blobStore.isPermitBackedTransferEnabled() == false) {
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);

Check warning on line 273 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L273

Added line #L273 was not covered by tests
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
blobStore.getLowPrioritySizeBasedBlockingQ()
.produce(

Check warning on line 276 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L275-L276

Added lines #L275 - L276 were not covered by tests
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)

Check warning on line 279 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L278-L279

Added lines #L278 - L279 were not covered by tests
)
);
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
blobStore.getNormalPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else {
throw new IllegalStateException("Cannot perform upload for other priority types.");

Check warning on line 291 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L291

Added line #L291 was not covered by tests
}
}
} catch (Exception e) {
logger.info("exception error from blob container for file {}", writeContext.getFileName());
throw new IOException(e);
}
}

private CompletableFuture<Void> createFileCompletableFuture(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
StreamContext streamContext,
ActionListener<Void> completionListener
) {
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
return completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}

@ExperimentalApi
@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -56,6 +57,7 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
Expand All @@ -77,6 +79,8 @@

private volatile boolean uploadRetryEnabled;

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand All @@ -94,6 +98,9 @@
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

S3BlobStore(
S3Service service,
Expand All @@ -109,7 +116,10 @@
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
Expand All @@ -128,6 +138,10 @@
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -141,6 +155,7 @@
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());

Check warning on line 158 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L158

Added line #L158 was not covered by tests
}

@Override
Expand Down Expand Up @@ -168,6 +183,10 @@
return uploadRetryEnabled;
}

public boolean isPermitBackedTransferEnabled() {
return permitBackedTransferEnabled;
}

public String bucket() {
return bucket;
}
Expand All @@ -184,6 +203,14 @@
return bulkDeletesSize;
}

public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
return normalPrioritySizeBasedBlockingQ;
}

public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
return lowPrioritySizeBasedBlockingQ;

Check warning on line 211 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L211

Added line #L211 was not covered by tests
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand All @@ -201,7 +228,9 @@

@Override
public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
stats.putAll(genericStatsMetricPublisher.stats());
return stats;

Check warning on line 233 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L231-L233

Added lines #L231 - L233 were not covered by tests
}

@Override
Expand All @@ -211,6 +240,7 @@
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());

Check warning on line 243 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L243

Added line #L243 was not covered by tests
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
return extendedStats;
}

Expand Down
Loading
Loading