Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Permit backed futures to prevent timeouts during upload bursts #12159

Merged
merged 6 commits into from
May 13, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Generic stats of repository-s3 plugin.
*/
public class GenericStatsMetricPublisher {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
gbbafna marked this conversation as resolved.
Show resolved Hide resolved

private final AtomicLong normalPriorityQSize = new AtomicLong();
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
private final AtomicLong lowPriorityQSize = new AtomicLong();
private final AtomicInteger lowPriorityPermits = new AtomicInteger();

public void updateNormalPriorityQSize(long qSize) {
normalPriorityQSize.addAndGet(qSize);
}

public void updateLowPriorityQSize(long qSize) {
lowPriorityQSize.addAndGet(qSize);
}

public void updateNormalPermits(boolean increment) {
if (increment) {
normalPriorityPermits.incrementAndGet();
} else {
normalPriorityPermits.decrementAndGet();
}
}

public void updateLowPermits(boolean increment) {
if (increment) {
lowPriorityPermits.incrementAndGet();
} else {
lowPriorityPermits.decrementAndGet();
}
}

public long getNormalPriorityQSize() {
return normalPriorityQSize.get();
}

public int getAcquiredNormalPriorityPermits() {
return normalPriorityPermits.get();
}

public long getLowPriorityQSize() {
return lowPriorityQSize.get();
}

public int getAcquiredLowPriorityPermits() {
return lowPriorityPermits.get();
}

Map<String, Long> stats() {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, Long> results = new HashMap<>();
results.put("NormalPriorityQSize", normalPriorityQSize.get());
results.put("LowPriorityQSize", lowPriorityQSize.get());
results.put("AcquiredNormalPriorityPermits", (long) normalPriorityPermits.get());
results.put("AcquiredLowPriorityPermits", (long) lowPriorityPermits.get());
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
&& blobStore.getNormalPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)) {
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -266,7 +267,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
s3AsyncClient = amazonS3Reference.get().client();
}

if (writeContext.getWritePriority() == WritePriority.URGENT || writeContext.getWritePriority() == WritePriority.HIGH) {
if (writeContext.getWritePriority() == WritePriority.URGENT
|| writeContext.getWritePriority() == WritePriority.HIGH
|| blobStore.isPermitBackedTransferEnabled() == false) {
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
blobStore.getLowPrioritySizeBasedBlockingQ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
Expand All @@ -78,6 +79,8 @@ class S3BlobStore implements BlobStore {

private volatile boolean uploadRetryEnabled;

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand All @@ -97,6 +100,7 @@ class S3BlobStore implements BlobStore {
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

S3BlobStore(
S3Service service,
Expand All @@ -114,7 +118,8 @@ class S3BlobStore implements BlobStore {
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
Expand All @@ -135,6 +140,8 @@ class S3BlobStore implements BlobStore {
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -148,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand Down Expand Up @@ -175,6 +183,10 @@ public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public boolean isPermitBackedTransferEnabled() {
return permitBackedTransferEnabled;
}

public String bucket() {
return bucket;
}
Expand Down Expand Up @@ -216,7 +228,9 @@ public void close() throws IOException {

@Override
public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
stats.putAll(genericStatsMetricPublisher.stats());
return stats;
}

@Override
Expand All @@ -226,6 +240,7 @@ public Map<Metric, Map<String, Long>> extendedStats() {
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
return extendedStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Whether large uploads need to be redirected to slow sync s3 client.
*/
static final Setting<Boolean> PERMIT_BACKED_TRANSFER_ENABLED = Setting.boolSetting(
"permit_backed_transfer_enabled",
true,
Setting.Property.NodeScope
);

/**
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
*/
Expand Down Expand Up @@ -287,6 +296,7 @@ class S3Repository extends MeteredBlobStoreRepository {
private final Path pluginConfigPath;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

private volatile int bulkDeletesSize;

Expand Down Expand Up @@ -320,7 +330,8 @@ class S3Repository extends MeteredBlobStoreRepository {
multipartUploadEnabled,
Path.of(""),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
new GenericStatsMetricPublisher()
);
}

Expand All @@ -341,7 +352,8 @@ class S3Repository extends MeteredBlobStoreRepository {
final boolean multipartUploadEnabled,
Path pluginConfigPath,
final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
final GenericStatsMetricPublisher genericStatsMetricPublisher
) {
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.service = service;
Expand All @@ -354,6 +366,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.normalExecutorBuilder = normalExecutorBuilder;
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;

validateRepositoryMetadata(metadata);
readRepositoryMetadata();
Expand Down Expand Up @@ -418,7 +431,8 @@ protected S3BlobStore createBlobStore() {
priorityExecutorBuilder,
normalExecutorBuilder,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -236,21 +237,25 @@ public Collection<Object> createComponents(
this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB),
normalTransferQConsumerService,
normalPriorityConsumers
normalPriorityConsumers,
genericStatsMetricPublisher,
SizeBasedBlockingQ.QueueEventType.NORMAL
);
int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings());
LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ(
new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB),
lowTransferQConsumerService,
lowPriorityConsumers
lowPriorityConsumers,
genericStatsMetricPublisher
);
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.transferSemaphoresHolder = new TransferSemaphoresHolder(
// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10),
((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100,
S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()),
TimeUnit.MINUTES
TimeUnit.MINUTES,
genericStatsMetricPublisher
);

return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ);
Expand All @@ -259,8 +264,13 @@ public Collection<Object> createComponents(
// New class because in core, components are injected via guice only by instance creation due to which
// same binding types fail.
private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ {
public LowPrioritySizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) {
super(capacity, executorService, consumers);
public LowPrioritySizeBasedBlockingQ(
ByteSizeValue capacity,
ExecutorService executorService,
int consumers,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
super(capacity, executorService, consumers, genericStatsMetricPublisher, QueueEventType.LOW);
}
}

Expand Down Expand Up @@ -293,7 +303,8 @@ protected S3Repository createRepository(
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
configPath,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
);
}

Expand Down Expand Up @@ -339,7 +350,8 @@ public List<Setting<?>> getSettings() {
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED,
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT,
S3Repository.PERMIT_BACKED_TRANSFER_ENABLED
);
}

Expand Down
Loading
Loading