Skip to content

Commit

Permalink
Added stats for permits and queue in repository-s3 for permit backed …
Browse files Browse the repository at this point in the history
…transfers

Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed May 4, 2024
1 parent a518a08 commit 58d730b
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Generic stats of repository-s3 plugin.
*/
public class GenericStatsMetricPublisher {

private final AtomicLong normalPriorityQSize = new AtomicLong();
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
private final AtomicLong lowPriorityQSize = new AtomicLong();
private final AtomicInteger lowPriorityPermits = new AtomicInteger();

public void updateNormalPriorityQSize(long qSize) {
normalPriorityQSize.addAndGet(qSize);
}

public void updateLowPriorityQSize(long qSize) {
lowPriorityQSize.addAndGet(qSize);
}

public void updateNormalPermits(boolean increment) {
if (increment) {
normalPriorityPermits.incrementAndGet();
} else {
normalPriorityPermits.decrementAndGet();
}
}

public void updateLowPermits(boolean increment) {
if (increment) {
lowPriorityPermits.incrementAndGet();
} else {
lowPriorityPermits.decrementAndGet();
}
}

public long getNormalPriorityQSize() {
return normalPriorityQSize.get();
}

public int getAcquiredNormalPriorityPermits() {
return normalPriorityPermits.get();
}

public long getLowPriorityQSize() {
return lowPriorityQSize.get();
}

public int getAcquiredLowPriorityPermits() {
return lowPriorityPermits.get();
}

Map<String, Long> stats() {
final Map<String, Long> results = new HashMap<>();
results.put("NormalPriorityQSize", normalPriorityQSize.get());
results.put("LowPriorityQSize", lowPriorityQSize.get());
results.put("AcquiredNormalPriorityPermits", (long) normalPriorityPermits.get());
results.put("AcquiredLowPriorityPermits", (long) lowPriorityPermits.get());
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
&& blobStore.getNormalPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)) {
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -266,7 +267,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
s3AsyncClient = amazonS3Reference.get().client();
}

if (writeContext.getWritePriority() == WritePriority.URGENT || writeContext.getWritePriority() == WritePriority.HIGH) {
if (writeContext.getWritePriority() == WritePriority.URGENT
|| writeContext.getWritePriority() == WritePriority.HIGH
|| blobStore.isPermitBackedTransferEnabled() == false) {
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
blobStore.getLowPrioritySizeBasedBlockingQ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
Expand All @@ -78,6 +79,8 @@ class S3BlobStore implements BlobStore {

private volatile boolean uploadRetryEnabled;

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand All @@ -97,6 +100,7 @@ class S3BlobStore implements BlobStore {
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

S3BlobStore(
S3Service service,
Expand All @@ -114,7 +118,8 @@ class S3BlobStore implements BlobStore {
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
Expand All @@ -135,6 +140,8 @@ class S3BlobStore implements BlobStore {
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -148,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand Down Expand Up @@ -175,6 +183,10 @@ public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public boolean isPermitBackedTransferEnabled() {
return permitBackedTransferEnabled;
}

public String bucket() {
return bucket;
}
Expand Down Expand Up @@ -216,7 +228,9 @@ public void close() throws IOException {

@Override
public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
stats.putAll(genericStatsMetricPublisher.stats());
return stats;
}

@Override
Expand All @@ -226,6 +240,7 @@ public Map<Metric, Map<String, Long>> extendedStats() {
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
return extendedStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Whether large uploads need to be redirected to slow sync s3 client.
*/
static final Setting<Boolean> PERMIT_BACKED_TRANSFER_ENABLED = Setting.boolSetting(
"permit_backed_transfer_enabled",
true,
Setting.Property.NodeScope
);

/**
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
*/
Expand Down Expand Up @@ -287,6 +296,7 @@ class S3Repository extends MeteredBlobStoreRepository {
private final Path pluginConfigPath;
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private final GenericStatsMetricPublisher genericStatsMetricPublisher;

private volatile int bulkDeletesSize;

Expand Down Expand Up @@ -320,7 +330,8 @@ class S3Repository extends MeteredBlobStoreRepository {
multipartUploadEnabled,
Path.of(""),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
new GenericStatsMetricPublisher()
);
}

Expand All @@ -341,7 +352,8 @@ class S3Repository extends MeteredBlobStoreRepository {
final boolean multipartUploadEnabled,
Path pluginConfigPath,
final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
final GenericStatsMetricPublisher genericStatsMetricPublisher
) {
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.service = service;
Expand All @@ -354,6 +366,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.normalExecutorBuilder = normalExecutorBuilder;
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;

validateRepositoryMetadata(metadata);
readRepositoryMetadata();
Expand Down Expand Up @@ -418,7 +431,8 @@ protected S3BlobStore createBlobStore() {
priorityExecutorBuilder,
normalExecutorBuilder,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -236,21 +237,25 @@ public Collection<Object> createComponents(
this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB),
normalTransferQConsumerService,
normalPriorityConsumers
normalPriorityConsumers,
genericStatsMetricPublisher,
SizeBasedBlockingQ.QueueEventType.NORMAL
);
int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings());
LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ(
new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB),
lowTransferQConsumerService,
lowPriorityConsumers
lowPriorityConsumers,
genericStatsMetricPublisher
);
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.transferSemaphoresHolder = new TransferSemaphoresHolder(
// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10),
((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100,
S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()),
TimeUnit.MINUTES
TimeUnit.MINUTES,
genericStatsMetricPublisher
);

return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ);
Expand All @@ -259,8 +264,13 @@ public Collection<Object> createComponents(
// New class because in core, components are injected via guice only by instance creation due to which
// same binding types fail.
private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ {
public LowPrioritySizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) {
super(capacity, executorService, consumers);
public LowPrioritySizeBasedBlockingQ(
ByteSizeValue capacity,
ExecutorService executorService,
int consumers,
GenericStatsMetricPublisher genericStatsMetricPublisher
) {
super(capacity, executorService, consumers, genericStatsMetricPublisher, QueueEventType.LOW);
}
}

Expand Down Expand Up @@ -293,7 +303,8 @@ protected S3Repository createRepository(
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
configPath,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
);
}

Expand Down Expand Up @@ -339,7 +350,8 @@ public List<Setting<?>> getSettings() {
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED,
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT,
S3Repository.PERMIT_BACKED_TRANSFER_ENABLED
);
}

Expand Down
Loading

0 comments on commit 58d730b

Please sign in to comment.