diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 1fc13359e5cd4..f00cda7bd36ec 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -312,22 +312,7 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository( - metadata, - registry, - service, - clusterService, - recoverySettings, - null, - null, - null, - null, - null, - false, - null, - null, - null - ) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java deleted file mode 100644 index 136fd68223354..0000000000000 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Generic stats of repository-s3 plugin. - */ -public class GenericStatsMetricPublisher { - - private final AtomicLong normalPriorityQSize = new AtomicLong(); - private final AtomicInteger normalPriorityPermits = new AtomicInteger(); - private final AtomicLong lowPriorityQSize = new AtomicLong(); - private final AtomicInteger lowPriorityPermits = new AtomicInteger(); - private final long normalPriorityQCapacity; - private final int maxNormalPriorityPermits; - private final long lowPriorityQCapacity; - private final int maxLowPriorityPermits; - - public GenericStatsMetricPublisher( - long normalPriorityQCapacity, - int maxNormalPriorityPermits, - long lowPriorityQCapacity, - int maxLowPriorityPermits - ) { - this.normalPriorityQCapacity = normalPriorityQCapacity; - this.maxNormalPriorityPermits = maxNormalPriorityPermits; - this.lowPriorityQCapacity = lowPriorityQCapacity; - this.maxLowPriorityPermits = maxLowPriorityPermits; - } - - public void updateNormalPriorityQSize(long qSize) { - normalPriorityQSize.addAndGet(qSize); - } - - public void updateLowPriorityQSize(long qSize) { - lowPriorityQSize.addAndGet(qSize); - } - - public void updateNormalPermits(boolean increment) { - if (increment) { - normalPriorityPermits.incrementAndGet(); - } else { - normalPriorityPermits.decrementAndGet(); - } - } - - public void updateLowPermits(boolean increment) { - if (increment) { - lowPriorityPermits.incrementAndGet(); - } else { - lowPriorityPermits.decrementAndGet(); - } - } - - public long getNormalPriorityQSize() { - return normalPriorityQSize.get(); - } - - public int getAcquiredNormalPriorityPermits() { - return normalPriorityPermits.get(); - } - - public long getLowPriorityQSize() { - return lowPriorityQSize.get(); - } - - public int getAcquiredLowPriorityPermits() { - return lowPriorityPermits.get(); - } - - Map stats() { - final Map results = new HashMap<>(); - results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity); - results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity); - results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits); - results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits); - return results; - } -} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 4066eec733760..25f361b40636e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -88,7 +88,6 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; @@ -194,14 +193,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobStore.isUploadRetryEnabled() ); try { - // If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload. - // Therefore, redirecting it to slow client. - if ((uploadRequest.getWritePriority() == WritePriority.LOW - && blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false) - || (uploadRequest.getWritePriority() != WritePriority.HIGH - && uploadRequest.getWritePriority() != WritePriority.URGENT - && blobStore.getNormalPrioritySizeBasedBlockingQ() - .isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) { + if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { StreamContext streamContext = SocketAccess.doPrivileged( () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) ); @@ -240,30 +232,16 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } else { s3AsyncClient = amazonS3Reference.get().client(); } - - if (writeContext.getWritePriority() == WritePriority.URGENT - || writeContext.getWritePriority() == WritePriority.HIGH - || blobStore.isPermitBackedTransferEnabled() == false) { - createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener); - } else if (writeContext.getWritePriority() == WritePriority.LOW) { - blobStore.getLowPrioritySizeBasedBlockingQ() - .produce( - new SizeBasedBlockingQ.Item( - writeContext.getFileSize(), - () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) - ) - ); - } else if (writeContext.getWritePriority() == WritePriority.NORMAL) { - blobStore.getNormalPrioritySizeBasedBlockingQ() - .produce( - new SizeBasedBlockingQ.Item( - writeContext.getFileSize(), - () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) - ) - ); - } else { - throw new IllegalStateException("Cannot perform upload for other priority types."); - } + CompletableFuture completableFuture = blobStore.getAsyncTransferManager() + .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); + completableFuture.whenComplete((response, throwable) -> { + if (throwable == null) { + completionListener.onResponse(response); + } else { + Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + completionListener.onFailure(ex); + } + }); } } catch (Exception e) { logger.info("exception error from blob container for file {}", writeContext.getFileName()); @@ -271,24 +249,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } - private CompletableFuture createFileCompletableFuture( - S3AsyncClient s3AsyncClient, - UploadRequest uploadRequest, - StreamContext streamContext, - ActionListener completionListener - ) { - CompletableFuture completableFuture = blobStore.getAsyncTransferManager() - .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); - return completableFuture.whenComplete((response, throwable) -> { - if (throwable == null) { - completionListener.onResponse(response); - } else { - Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; - completionListener.onFailure(ex); - } - }); - } - @ExperimentalApi @Override public void readBlobAsync(String blobName, ActionListener listener) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index de815f9202f44..fc70fbb0db00e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -45,7 +45,6 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import java.io.IOException; import java.util.Collections; @@ -57,7 +56,6 @@ import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; -import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED; import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD; import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; @@ -79,8 +77,6 @@ class S3BlobStore implements BlobStore { private volatile boolean uploadRetryEnabled; - private volatile boolean permitBackedTransferEnabled; - private volatile boolean serverSideEncryption; private volatile ObjectCannedACL cannedACL; @@ -98,9 +94,6 @@ class S3BlobStore implements BlobStore { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final boolean multipartUploadEnabled; - private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; - private final GenericStatsMetricPublisher genericStatsMetricPublisher; S3BlobStore( S3Service service, @@ -116,10 +109,7 @@ class S3BlobStore implements BlobStore { AsyncTransferManager asyncTransferManager, AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, - AsyncExecutorContainer normalExecutorBuilder, - SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, - SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, - GenericStatsMetricPublisher genericStatsMetricPublisher + AsyncExecutorContainer normalExecutorBuilder ) { this.service = service; this.s3AsyncService = s3AsyncService; @@ -138,10 +128,6 @@ class S3BlobStore implements BlobStore { // Settings to initialize blobstore with. this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); - this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; - this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; - this.genericStatsMetricPublisher = genericStatsMetricPublisher; - this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -155,7 +141,6 @@ public void reload(RepositoryMetadata repositoryMetadata) { this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings()); this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); - this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -183,10 +168,6 @@ public boolean isUploadRetryEnabled() { return uploadRetryEnabled; } - public boolean isPermitBackedTransferEnabled() { - return permitBackedTransferEnabled; - } - public String bucket() { return bucket; } @@ -203,14 +184,6 @@ public int getBulkDeletesSize() { return bulkDeletesSize; } - public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() { - return normalPrioritySizeBasedBlockingQ; - } - - public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() { - return lowPrioritySizeBasedBlockingQ; - } - @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); @@ -228,9 +201,7 @@ public void close() throws IOException { @Override public Map stats() { - Map stats = statsMetricPublisher.getStats().toMap(); - stats.putAll(genericStatsMetricPublisher.stats()); - return stats; + return statsMetricPublisher.getStats().toMap(); } @Override @@ -240,7 +211,6 @@ public Map> extendedStats() { } Map> extendedStats = new HashMap<>(); statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap())); - extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats()); return extendedStats; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 42f6b9a88a4a5..405ac88a5be4a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -52,7 +52,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -67,7 +66,6 @@ import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotsService; @@ -164,15 +162,6 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); - /** - * Whether large uploads need to be redirected to slow sync s3 client. - */ - static final Setting PERMIT_BACKED_TRANSFER_ENABLED = Setting.boolSetting( - "permit_backed_transfer_enabled", - true, - Setting.Property.NodeScope - ); - /** * Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries. */ @@ -210,37 +199,6 @@ class S3Repository extends MeteredBlobStoreRepository { true, Setting.Property.NodeScope ); - /** - * Percentage of total available permits to be available for priority transfers. - */ - public static Setting S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting( - "s3_priority_permit_alloc_perc", - 70, - 21, - 80, - Setting.Property.NodeScope - ); - - /** - * Duration in minutes to wait for a permit in case no permit is available. - */ - public static Setting S3_PERMIT_WAIT_DURATION_MIN = Setting.intSetting( - "s3_permit_wait_duration_min", - 5, - 1, - 10, - Setting.Property.NodeScope - ); - - /** - * Number of transfer queue consumers - */ - public static Setting S3_TRANSFER_QUEUE_CONSUMERS = new Setting<>( - "s3_transfer_queue_consumers", - (s) -> Integer.toString(Math.max(5, OpenSearchExecutors.allocatedProcessors(s) * 2)), - (s) -> Setting.parseInt(s, 5, "s3_transfer_queue_consumers"), - Setting.Property.NodeScope - ); /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. @@ -325,9 +283,6 @@ class S3Repository extends MeteredBlobStoreRepository { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final Path pluginConfigPath; - private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; - private final GenericStatsMetricPublisher genericStatsMetricPublisher; private volatile int bulkDeletesSize; @@ -343,10 +298,7 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, - final boolean multipartUploadEnabled, - final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, - final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, - final GenericStatsMetricPublisher genericStatsMetricPublisher + final boolean multipartUploadEnabled ) { this( metadata, @@ -360,10 +312,7 @@ class S3Repository extends MeteredBlobStoreRepository { normalExecutorBuilder, s3AsyncService, multipartUploadEnabled, - Path.of(""), - normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ, - genericStatsMetricPublisher + Path.of("") ); } @@ -382,10 +331,7 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled, - Path pluginConfigPath, - final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, - final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, - final GenericStatsMetricPublisher genericStatsMetricPublisher + Path pluginConfigPath ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.service = service; @@ -396,9 +342,6 @@ class S3Repository extends MeteredBlobStoreRepository { this.urgentExecutorBuilder = urgentExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; - this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; - this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; - this.genericStatsMetricPublisher = genericStatsMetricPublisher; validateRepositoryMetadata(metadata); readRepositoryMetadata(); @@ -516,10 +459,7 @@ protected S3BlobStore createBlobStore() { asyncUploadUtils, urgentExecutorBuilder, priorityExecutorBuilder, - normalExecutorBuilder, - normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ, - genericStatsMetricPublisher + normalExecutorBuilder ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 110d91bfbd822..e7d2a4d024e60 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -41,9 +41,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.common.unit.ByteSizeUnit; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -56,8 +53,6 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; -import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -74,8 +69,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -89,8 +82,6 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String PRIORITY_STREAM_READER = "priority_stream_reader"; private static final String FUTURE_COMPLETION = "future_completion"; private static final String STREAM_READER = "stream_reader"; - private static final String LOW_TRANSFER_QUEUE_CONSUMER = "low_transfer_queue_consumer"; - private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer"; protected final S3Service service; private final S3AsyncService s3AsyncService; @@ -100,12 +91,6 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer urgentExecutorBuilder; private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; - private ExecutorService lowTransferQConsumerService; - private ExecutorService normalTransferQConsumerService; - private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; - private TransferSemaphoresHolder transferSemaphoresHolder; - private GenericStatsMetricPublisher genericStatsMetricPublisher; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -135,36 +120,9 @@ public List> getExecutorBuilders(Settings settings) { TimeValue.timeValueMinutes(5) ) ); - executorBuilders.add( - new FixedExecutorBuilder( - settings, - LOW_TRANSFER_QUEUE_CONSUMER, - lowPriorityTransferQConsumers(settings), - 10, - "thread_pool." + LOW_TRANSFER_QUEUE_CONSUMER - ) - ); - executorBuilders.add( - new FixedExecutorBuilder( - settings, - NORMAL_TRANSFER_QUEUE_CONSUMER, - normalPriorityTransferQConsumers(settings), - 10, - "thread_pool." + NORMAL_TRANSFER_QUEUE_CONSUMER - ) - ); return executorBuilders; } - private int lowPriorityTransferQConsumers(Settings settings) { - double lowPriorityAllocation = ((double) (100 - S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(settings))) / 100; - return Math.max(2, (int) (lowPriorityAllocation * S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings))); - } - - private int normalPriorityTransferQConsumers(Settings settings) { - return S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings); - } - static int halfNumberOfProcessors(int numberOfProcessors) { return (numberOfProcessors + 1) / 2; } @@ -231,67 +189,7 @@ public Collection createComponents( threadPool.executor(STREAM_READER), new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); - - this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER); - this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER); - - // High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO. - int availablePermits = Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10); - double priorityPermitAllocation = ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) - / 100; - int normalPriorityPermits = (int) (priorityPermitAllocation * availablePermits); - int lowPriorityPermits = availablePermits - normalPriorityPermits; - - int normalPriorityConsumers = normalPriorityTransferQConsumers(clusterService.getSettings()); - int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings()); - - ByteSizeValue normalPriorityQCapacity = new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB); - ByteSizeValue lowPriorityQCapacity = new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB); - - this.genericStatsMetricPublisher = new GenericStatsMetricPublisher( - normalPriorityQCapacity.getBytes(), - normalPriorityPermits, - lowPriorityQCapacity.getBytes(), - lowPriorityPermits - ); - - this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - normalPriorityQCapacity, - normalTransferQConsumerService, - normalPriorityConsumers, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.NORMAL - ); - - LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ( - lowPriorityQCapacity, - lowTransferQConsumerService, - lowPriorityConsumers, - genericStatsMetricPublisher - ); - this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; - this.transferSemaphoresHolder = new TransferSemaphoresHolder( - normalPriorityPermits, - lowPriorityPermits, - S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()), - TimeUnit.MINUTES, - genericStatsMetricPublisher - ); - - return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); - } - - // New class because in core, components are injected via guice only by instance creation due to which - // same binding types fail. - private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ { - public LowPrioritySizeBasedBlockingQ( - ByteSizeValue capacity, - ExecutorService executorService, - int consumers, - GenericStatsMetricPublisher genericStatsMetricPublisher - ) { - super(capacity, executorService, consumers, genericStatsMetricPublisher, QueueEventType.LOW); - } + return Collections.emptyList(); } // proxy method for testing @@ -306,8 +204,7 @@ protected S3Repository createRepository( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), normalExecutorBuilder.getStreamReader(), priorityExecutorBuilder.getStreamReader(), - urgentExecutorBuilder.getStreamReader(), - transferSemaphoresHolder + urgentExecutorBuilder.getStreamReader() ); return new S3Repository( metadata, @@ -321,10 +218,7 @@ protected S3Repository createRepository( normalExecutorBuilder, s3AsyncService, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), - configPath, - normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ, - genericStatsMetricPublisher + configPath ); } @@ -369,9 +263,7 @@ public List> getSettings() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, S3Repository.REDIRECT_LARGE_S3_UPLOAD, - S3Repository.UPLOAD_RETRY_ENABLED, - S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT, - S3Repository.PERMIT_BACKED_TRANSFER_ENABLED + S3Repository.UPLOAD_RETRY_ENABLED ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java deleted file mode 100644 index c9fa93ea0f5c3..0000000000000 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3TransferRejectedException.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3; - -import org.opensearch.OpenSearchException; - -/** - * Thrown when transfer event is rejected due to breach in event queue size. - */ -public class S3TransferRejectedException extends OpenSearchException { - public S3TransferRejectedException(String msg) { - super(msg); - } -} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 4c95a0ffc5ec3..b4c4ed0ecaa75 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -23,7 +23,6 @@ import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.repositories.s3.S3TransferRejectedException; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; @@ -35,8 +34,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -44,7 +41,7 @@ */ public class AsyncPartsHandler { - private static final Logger log = LogManager.getLogger(AsyncPartsHandler.class); + private static Logger log = LogManager.getLogger(AsyncPartsHandler.class); /** * Uploads parts of the upload multipart request* @@ -58,10 +55,9 @@ public class AsyncPartsHandler { * @param completedParts Reference of completed parts * @param inputStreamContainers Checksum containers * @param statsMetricPublisher sdk metric publisher - * @param maxRetryablePartSize Max content size which can be used for retries in buffered streams. * @return list of completable futures + * @throws IOException thrown in case of an IO error */ - @SuppressWarnings({ "rawtypes", "unchecked" }) public static List> uploadParts( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -73,52 +69,35 @@ public static List> uploadParts( AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, StatsMetricPublisher statsMetricPublisher, - boolean uploadRetryEnabled, - TransferSemaphoresHolder transferSemaphoresHolder, - long maxRetryablePartSize - ) throws InterruptedException { + boolean uploadRetryEnabled + ) throws IOException { List> futures = new ArrayList<>(); - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { - Semaphore semaphore = maybeAcquireSemaphore( - transferSemaphoresHolder, - requestContext, - uploadRequest.getWritePriority(), - uploadRequest.getKey() - ); - try { - InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); - inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); - UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() - .bucket(uploadRequest.getBucket()) - .partNumber(partIdx + 1) - .key(uploadRequest.getKey()) - .uploadId(uploadId) - .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) - .contentLength(inputStreamContainer.getContentLength()); - if (uploadRequest.doRemoteDataIntegrityCheck()) { - uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); - } - uploadPart( - s3AsyncClient, - executorService, - priorityExecutorService, - urgentExecutorService, - completedParts, - inputStreamContainers, - futures, - uploadPartRequestBuilder.build(), - inputStreamContainer, - uploadRequest, - uploadRetryEnabled, - maxRetryablePartSize, - semaphore - ); - } catch (Exception ex) { - if (semaphore != null) { - semaphore.release(); - } + InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); + inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); + UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() + .bucket(uploadRequest.getBucket()) + .partNumber(partIdx + 1) + .key(uploadRequest.getKey()) + .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) + .contentLength(inputStreamContainer.getContentLength()); + if (uploadRequest.doRemoteDataIntegrityCheck()) { + uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); } + uploadPart( + s3AsyncClient, + executorService, + priorityExecutorService, + urgentExecutorService, + completedParts, + inputStreamContainers, + futures, + uploadPartRequestBuilder.build(), + inputStreamContainer, + uploadRequest, + uploadRetryEnabled + ); } return futures; @@ -158,54 +137,14 @@ public static InputStream maybeRetryInputStream( InputStream inputStream, WritePriority writePriority, boolean uploadRetryEnabled, - long contentLength, - long maxRetryablePartSize + long contentLength ) { - // Since we are backing uploads with limited permits, it is ok to use buffered stream. Maximum in-memory buffer - // would be (max permits * maxRetryablePartSize) excluding urgent - if (uploadRetryEnabled == true - && (contentLength <= maxRetryablePartSize || writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { - return new UploadTrackedBufferedInputStream(inputStream, (int) (contentLength + 1)); + if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new BufferedInputStream(inputStream, (int) (contentLength + 1)); } return inputStream; } - public static Semaphore maybeAcquireSemaphore( - TransferSemaphoresHolder transferSemaphoresHolder, - TransferSemaphoresHolder.RequestContext requestContext, - WritePriority writePriority, - String file - ) throws InterruptedException { - final TransferSemaphoresHolder.TypeSemaphore semaphore; - if (writePriority != WritePriority.HIGH && writePriority != WritePriority.URGENT) { - semaphore = transferSemaphoresHolder.acquirePermit(writePriority, requestContext); - if (semaphore == null) { - throw new S3TransferRejectedException("Permit not available for transfer of file " + file); - } - } else { - semaphore = null; - } - - return semaphore; - } - - /** - * Overridden stream to identify upload streams among all buffered stream instances for triaging. - */ - static class UploadTrackedBufferedInputStream extends BufferedInputStream { - AtomicBoolean closed = new AtomicBoolean(); - - public UploadTrackedBufferedInputStream(InputStream in, int length) { - super(in, length); - } - - @Override - public void close() throws IOException { - super.close(); - closed.set(true); - } - } - private static void uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -217,11 +156,8 @@ private static void uploadPart( UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, UploadRequest uploadRequest, - boolean uploadRetryEnabled, - long maxRetryablePartSize, - Semaphore semaphore + boolean uploadRetryEnabled ) { - Integer partNumber = uploadPartRequest.partNumber(); ExecutorService streamReadExecutor; @@ -237,8 +173,7 @@ private static void uploadPart( inputStreamContainer.getInputStream(), uploadRequest.getWritePriority(), uploadRetryEnabled, - uploadPartRequest.contentLength(), - maxRetryablePartSize + uploadPartRequest.contentLength() ); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( @@ -248,10 +183,6 @@ private static void uploadPart( ); CompletableFuture convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> { - if (semaphore != null) { - semaphore.release(); - } - try { inputStream.close(); } catch (IOException ex) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 45705431fcfc9..2259780c95276 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -21,7 +21,6 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -48,7 +47,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -65,10 +63,6 @@ public final class AsyncTransferManager { private final ExecutorService priorityExecutorService; private final ExecutorService urgentExecutorService; private final long minimumPartSize; - private final long maxRetryablePartSize; - - @SuppressWarnings("rawtypes") - private final TransferSemaphoresHolder transferSemaphoresHolder; /** * The max number of parts on S3 side is 10,000 @@ -79,22 +73,19 @@ public final class AsyncTransferManager { * Construct a new object of AsyncTransferManager * * @param minimumPartSize The minimum part size for parallel multipart uploads + * @param executorService The stream reader {@link ExecutorService} for normal priority uploads + * @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads */ - @SuppressWarnings("rawtypes") public AsyncTransferManager( long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService, - ExecutorService urgentExecutorService, - TransferSemaphoresHolder transferSemaphoresHolder + ExecutorService urgentExecutorService ) { this.executorService = executorService; this.priorityExecutorService = priorityExecutorService; this.minimumPartSize = minimumPartSize; - // 10% buffer to allow additional metadata size in content such as encryption. - this.maxRetryablePartSize = (long) (minimumPartSize + 0.1 * minimumPartSize); this.urgentExecutorService = urgentExecutorService; - this.transferSemaphoresHolder = transferSemaphoresHolder; } /** @@ -116,21 +107,7 @@ public CompletableFuture uploadObject( try { if (streamContext.getNumberOfParts() == 1) { log.debug(() -> "Starting the upload as a single upload part request"); - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - Semaphore semaphore = AsyncPartsHandler.maybeAcquireSemaphore( - transferSemaphoresHolder, - requestContext, - uploadRequest.getWritePriority(), - uploadRequest.getKey() - ); - try { - uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher, semaphore); - } catch (Exception ex) { - if (semaphore != null) { - semaphore.release(); - } - throw ex; - } + uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); } else { log.debug(() -> "Starting the upload as multipart upload request"); uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); @@ -164,19 +141,21 @@ private void uploadInParts( // Ensure cancellations are forwarded to the createMultipartUploadFuture future CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture); - String uploadId; - try { - // Block main thread here so that upload of parts doesn't get executed in future completion thread. - // We should never execute latent operation like acquisition of permit in future completion pool. - CreateMultipartUploadResponse createMultipartUploadResponse = createMultipartUploadFuture.get(); - uploadId = createMultipartUploadResponse.uploadId(); - log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - } catch (Exception ex) { - handleException(returnFuture, () -> "Failed to initiate multipart upload", ex); - return; - } - - doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher); + createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> { + if (throwable != null) { + handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); + } else { + log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); + doUploadInParts( + s3AsyncClient, + uploadRequest, + streamContext, + returnFuture, + createMultipartUploadResponse.uploadId(), + statsMetricPublisher + ); + } + }); } private void doUploadInParts( @@ -205,9 +184,7 @@ private void doUploadInParts( completedParts, inputStreamContainers, statsMetricPublisher, - uploadRequest.isUploadRetryEnabled(), - transferSemaphoresHolder, - maxRetryablePartSize + uploadRequest.isUploadRetryEnabled() ); } catch (Exception ex) { try { @@ -338,14 +315,12 @@ public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority w return (long) Math.max(optimalPartSize, minimumPartSize); } - @SuppressWarnings("unchecked") private void uploadInOneChunk( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, - StreamContext streamContext, + InputStreamContainer inputStreamContainer, CompletableFuture returnFuture, - StatsMetricPublisher statsMetricPublisher, - Semaphore semaphore + StatsMetricPublisher statsMetricPublisher ) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) @@ -356,7 +331,6 @@ private void uploadInOneChunk( putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); } - PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); ExecutorService streamReadExecutor; if (uploadRequest.getWritePriority() == WritePriority.URGENT) { streamReadExecutor = urgentExecutorService; @@ -366,33 +340,25 @@ private void uploadInOneChunk( streamReadExecutor = executorService; } - CompletableFuture putObjectFuture = SocketAccess.doPrivileged(() -> { - InputStream inputStream = null; - CompletableFuture putObjectRespFuture; - try { - InputStreamContainer inputStreamContainer = streamContext.provideStream(0); - inputStream = AsyncPartsHandler.maybeRetryInputStream( - inputStreamContainer.getInputStream(), - uploadRequest.getWritePriority(), - uploadRequest.isUploadRetryEnabled(), - uploadRequest.getContentLength(), - maxRetryablePartSize - ); - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream( - inputStream, - inputStreamContainer.getContentLength(), - streamReadExecutor - ); - putObjectRespFuture = s3AsyncClient.putObject(putObjectRequest, asyncRequestBody); - } catch (Exception e) { - releaseResourcesSafely(semaphore, inputStream, uploadRequest.getKey()); - return CompletableFuture.failedFuture(e); - } - - InputStream finalInputStream = inputStream; - return putObjectRespFuture.handle((resp, throwable) -> { - releaseResourcesSafely(semaphore, finalInputStream, uploadRequest.getKey()); - + InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), + uploadRequest.getContentLength() + ); + CompletableFuture putObjectFuture = SocketAccess.doPrivileged( + () -> s3AsyncClient.putObject( + putObjectRequestBuilder.build(), + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) + ).handle((resp, throwable) -> { + try { + inputStream.close(); + } catch (IOException e) { + log.error( + () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), + e + ); + } if (throwable != null) { Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); if (unwrappedThrowable != null) { @@ -420,27 +386,13 @@ private void uploadInOneChunk( } return null; - }); - }); + }) + ); CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture); CompletableFutureUtils.forwardResultTo(putObjectFuture, returnFuture); } - private void releaseResourcesSafely(Semaphore semaphore, InputStream inputStream, String file) { - if (semaphore != null) { - semaphore.release(); - } - - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - log.error(() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", file), e); - } - } - } - private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest) { DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() .bucket(uploadRequest.getBucket()) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java deleted file mode 100644 index 170c80f5d4db6..0000000000000 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3.async; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; -import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.repositories.s3.GenericStatsMetricPublisher; -import org.opensearch.repositories.s3.S3TransferRejectedException; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Queue implementation to accept events based on their storage attribute. If size of queue is breached, then transfer - * event is rejected. - */ -public class SizeBasedBlockingQ extends AbstractLifecycleComponent { - private static final Logger log = LogManager.getLogger(SizeBasedBlockingQ.class); - - protected final LinkedBlockingQueue queue; - protected final Lock lock; - protected final Condition notEmpty; - - protected final AtomicLong currentSize; - protected final ByteSizeValue capacity; - protected final AtomicBoolean closed; - protected final ExecutorService executorService; - protected final int consumers; - private final GenericStatsMetricPublisher genericStatsMetricPublisher; - private final QueueEventType queueEventType; - - /** - * Constructor to create sized based blocking queue. - */ - public SizeBasedBlockingQ( - ByteSizeValue capacity, - ExecutorService executorService, - int consumers, - GenericStatsMetricPublisher genericStatsMetricPublisher, - QueueEventType queueEventType - ) { - this.queue = new LinkedBlockingQueue<>(); - this.lock = new ReentrantLock(); - this.notEmpty = lock.newCondition(); - this.currentSize = new AtomicLong(); - this.capacity = capacity; - this.closed = new AtomicBoolean(); - this.executorService = executorService; - this.consumers = consumers; - this.genericStatsMetricPublisher = genericStatsMetricPublisher; - this.queueEventType = queueEventType; - } - - public enum QueueEventType { - NORMAL, - LOW; - } - - @Override - protected void doStart() { - for (int worker = 0; worker < consumers; worker++) { - Thread consumer = new Consumer(queue, currentSize, lock, notEmpty, closed, genericStatsMetricPublisher, queueEventType); - executorService.submit(consumer); - } - } - - /** - * Add an item to the queue - */ - public void produce(Item item) throws InterruptedException { - if (item == null || item.size <= 0) { - throw new IllegalStateException("Invalid item input to produce."); - } - log.debug(() -> "Transfer queue event received of size: " + item.size + ". Current queue utilisation: " + currentSize.get()); - - if (currentSize.get() + item.size >= capacity.getBytes()) { - throw new S3TransferRejectedException("S3 Transfer queue capacity reached"); - } - - final Lock lock = this.lock; - final AtomicLong currentSize = this.currentSize; - lock.lock(); - try { - if (currentSize.get() + item.size >= capacity.getBytes()) { - throw new S3TransferRejectedException("S3 Transfer queue capacity reached"); - } - if (closed.get()) { - throw new AlreadyClosedException("Transfer queue is already closed."); - } - queue.put(item); - currentSize.addAndGet(item.size); - notEmpty.signalAll(); - updateStats(item.size, queueEventType, genericStatsMetricPublisher); - } finally { - lock.unlock(); - } - - } - - private static void updateStats(long itemSize, QueueEventType queueEventType, GenericStatsMetricPublisher genericStatsMetricPublisher) { - if (queueEventType == QueueEventType.NORMAL) { - genericStatsMetricPublisher.updateNormalPriorityQSize(itemSize); - } else if (queueEventType == QueueEventType.LOW) { - genericStatsMetricPublisher.updateLowPriorityQSize(itemSize); - } - } - - public int getSize() { - return queue.size(); - } - - public boolean isMaxCapacityBelowContentLength(long contentLength) { - return contentLength < capacity.getBytes(); - } - - protected static class Consumer extends Thread { - private final LinkedBlockingQueue queue; - private final Lock lock; - private final Condition notEmpty; - private final AtomicLong currentSize; - private final AtomicBoolean closed; - private final GenericStatsMetricPublisher genericStatsMetricPublisher; - private final QueueEventType queueEventType; - - public Consumer( - LinkedBlockingQueue queue, - AtomicLong currentSize, - Lock lock, - Condition notEmpty, - AtomicBoolean closed, - GenericStatsMetricPublisher genericStatsMetricPublisher, - QueueEventType queueEventType - ) { - this.queue = queue; - this.lock = lock; - this.notEmpty = notEmpty; - this.currentSize = currentSize; - this.closed = closed; - this.genericStatsMetricPublisher = genericStatsMetricPublisher; - this.queueEventType = queueEventType; - } - - @Override - public void run() { - while (true) { - try { - consume(); - } catch (AlreadyClosedException ex) { - return; - } catch (Exception ex) { - log.error("Failed to consume transfer event", ex); - } - } - } - - private void consume() throws InterruptedException { - final Lock lock = this.lock; - final AtomicLong currentSize = this.currentSize; - lock.lock(); - Item item; - try { - if (closed.get()) { - throw new AlreadyClosedException("transfer queue closed"); - } - while (currentSize.get() == 0) { - notEmpty.await(); - if (closed.get()) { - throw new AlreadyClosedException("transfer queue closed"); - } - } - - item = queue.take(); - currentSize.addAndGet(-item.size); - updateStats(-item.size, queueEventType, genericStatsMetricPublisher); - } finally { - lock.unlock(); - } - - try { - item.consumable.run(); - } catch (Exception ex) { - log.error("Exception on executing item consumable", ex); - } - } - - } - - public static class Item { - private final long size; - private final Runnable consumable; - - public Item(long size, Runnable consumable) { - this.size = size; - this.consumable = consumable; - } - } - - @Override - protected void doStop() { - doClose(); - } - - @Override - protected void doClose() { - lock.lock(); - try { - if (closed.get() == true) { - return; - } - closed.set(true); - notEmpty.signalAll(); - } finally { - lock.unlock(); - } - } -} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java deleted file mode 100644 index 7dccedb8d5278..0000000000000 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3.async; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.repositories.s3.GenericStatsMetricPublisher; - -import java.util.Objects; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - -/** - * Transfer semaphore holder for controlled transfer of data to remote. - */ -public class TransferSemaphoresHolder { - private static final Logger log = LogManager.getLogger(TransferSemaphoresHolder.class); - // For tests - protected TypeSemaphore lowPrioritySemaphore; - protected TypeSemaphore normalPrioritySemaphore; - private final int normalPriorityPermits; - private final int lowPriorityPermits; - private final int acquireWaitDuration; - private final TimeUnit acquireWaitDurationUnit; - - /** - * Constructor to create semaphores holder. - */ - public TransferSemaphoresHolder( - int normalPriorityPermits, - int lowPriorityPermits, - int acquireWaitDuration, - TimeUnit timeUnit, - GenericStatsMetricPublisher genericStatsPublisher - ) { - - this.normalPriorityPermits = normalPriorityPermits; - this.lowPriorityPermits = lowPriorityPermits; - this.normalPrioritySemaphore = new TypeSemaphore( - normalPriorityPermits, - TypeSemaphore.PermitType.NORMAL, - genericStatsPublisher::updateNormalPermits - ); - this.lowPrioritySemaphore = new TypeSemaphore( - lowPriorityPermits, - TypeSemaphore.PermitType.LOW, - genericStatsPublisher::updateLowPermits - ); - this.acquireWaitDuration = acquireWaitDuration; - this.acquireWaitDurationUnit = timeUnit; - } - - /** - * Overridden semaphore to identify transfer semaphores among all other semaphores for triaging. - */ - public static class TypeSemaphore extends Semaphore { - private final PermitType permitType; - private final Consumer permitChangeConsumer; - - public enum PermitType { - NORMAL, - LOW; - } - - public TypeSemaphore(int permits, PermitType permitType, Consumer permitChangeConsumer) { - super(permits); - this.permitType = permitType; - this.permitChangeConsumer = permitChangeConsumer; - } - - public PermitType getType() { - return permitType; - } - - @Override - public boolean tryAcquire() { - boolean acquired = super.tryAcquire(); - if (acquired) { - permitChangeConsumer.accept(true); - } - return acquired; - } - - @Override - public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { - boolean acquired = super.tryAcquire(timeout, unit); - if (acquired) { - permitChangeConsumer.accept(true); - } - return acquired; - } - - @Override - public void release() { - super.release(); - permitChangeConsumer.accept(false); - } - } - - /** - * For multiple part requests of a single file, request context object will be set with the decision if low - * priority permits can also be utilized in high priority transfers of parts of the file. If high priority get fully - * consumed then low priority permits will be acquired for transfer. - * - * If a low priority transfer request comes in and a high priority transfer is in progress then till current - * high priority transfer finishes, low priority transfer may have to compete. This is an acceptable side effect - * because low priority transfers are generally heavy and it is ok to have slow progress in the beginning. - * - */ - public static class RequestContext { - - private final boolean lowPriorityPermitsConsumable; - - private RequestContext(boolean lowPriorityPermitsConsumable) { - this.lowPriorityPermitsConsumable = lowPriorityPermitsConsumable; - } - - } - - public RequestContext createRequestContext() { - return new RequestContext(this.lowPrioritySemaphore.availablePermits() == lowPriorityPermits); - } - - /** - * Acquire permit based on the availability and based on the transfer priority. - * A high priority event can acquire a low priority semaphore if all low permits are available. - * A low priority event can acquire a high priority semaphore if at least 40% of high permits are available. We - * reserve this bandwidth to ensure that high priority events never wait for permits in case of ongoing low priority - * transfers. - */ - public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext requestContext) throws InterruptedException { - log.debug( - () -> "Acquire permit request for transfer type: " - + writePriority - + ". Available high priority permits: " - + normalPrioritySemaphore.availablePermits() - + " and low priority permits: " - + lowPrioritySemaphore.availablePermits() - ); - // Try acquiring low priority permit or high priority permit immediately if available. - // Otherwise, we wait for low priority permit. - if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { - if (lowPrioritySemaphore.tryAcquire()) { - return lowPrioritySemaphore; - } else if (normalPrioritySemaphore.availablePermits() > 0.4 * normalPriorityPermits && normalPrioritySemaphore.tryAcquire()) { - return normalPrioritySemaphore; - } else if (lowPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { - return lowPrioritySemaphore; - } - return null; - } - - // Try acquiring high priority permit or low priority permit immediately if available. - // Otherwise, we wait for high priority permit. - if (normalPrioritySemaphore.tryAcquire()) { - return normalPrioritySemaphore; - } else if (requestContext.lowPriorityPermitsConsumable && lowPrioritySemaphore.tryAcquire()) { - return lowPrioritySemaphore; - } else if (normalPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { - return normalPrioritySemaphore; - } - return null; - } - - /** - * Used in tests. - */ - public int getNormalPriorityPermits() { - return normalPriorityPermits; - } - - /** - * Used in tests. - */ - public int getLowPriorityPermits() { - return lowPriorityPermits; - } -} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index 8ea90880b91c5..a5304dc4a97d6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -24,7 +24,8 @@ public class UploadRequest { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private final boolean uploadRetryEnabled; + + private boolean uploadRetryEnabled; /** * Construct a new UploadRequest object diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java index 573a4f3f51a41..f84d953baae8e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java @@ -303,22 +303,7 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository( - metadata, - registry, - service, - clusterService, - recoverySettings, - null, - null, - null, - null, - null, - false, - null, - null, - null - ) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index e0eb4066f70a2..8c7e196d7c812 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -47,10 +47,7 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; -import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.Scheduler; import org.junit.After; import org.junit.Before; @@ -66,7 +63,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -92,13 +88,8 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private MockS3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; - private ExecutorService remoteTransferRetry; - private ExecutorService transferQueueConsumerService; - private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; - private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; static class MockS3AsyncService extends S3AsyncService { @@ -370,27 +361,7 @@ public void setUp() throws Exception { asyncService = new MockS3AsyncService(configPath(), 1000); futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); - remoteTransferRetry = Executors.newFixedThreadPool(20); - transferQueueConsumerService = Executors.newFixedThreadPool(20); - scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB), - transferQueueConsumerService, - 10, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.NORMAL - ); - lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 20L, ByteSizeUnit.GB), - transferQueueConsumerService, - 5, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.NORMAL - ); - normalPrioritySizeBasedBlockingQ.start(); - lowPrioritySizeBasedBlockingQ.start(); blobContainer = createBlobContainer(); super.setUp(); } @@ -399,14 +370,6 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { IOUtils.close(asyncService); - futureCompletionService.shutdown(); - streamReaderService.shutdown(); - remoteTransferRetry.shutdown(); - transferQueueConsumerService.shutdown(); - normalPrioritySizeBasedBlockingQ.close(); - lowPrioritySizeBasedBlockingQ.close(); - scheduler.shutdown(); - transferNIOGroup.close(); super.tearDown(); } @@ -428,7 +391,7 @@ private S3BlobStore createBlobStore() { streamReaderService, transferNIOGroup ); - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + return new S3BlobStore( null, asyncService, @@ -444,21 +407,11 @@ private S3BlobStore createBlobStore() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader(), - new TransferSemaphoresHolder( - 3, - Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), - 5, - TimeUnit.MINUTES, - genericStatsMetricPublisher - ) + asyncExecutorContainer.getStreamReader() ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer, - normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ, - genericStatsMetricPublisher + asyncExecutorContainer ); } @@ -606,32 +559,19 @@ private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } - public void testFailureWhenLargeFileRedirected() throws IOException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(true, WritePriority.LOW); - testLargeFilesRedirectedToSlowSyncClient(true, WritePriority.NORMAL); + public void testFailureWhenLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(true); } - public void testLargeFileRedirected() throws IOException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(false, WritePriority.LOW); - testLargeFilesRedirectedToSlowSyncClient(false, WritePriority.NORMAL); + public void testLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(false); } - private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, WritePriority writePriority) throws IOException, - InterruptedException { - ByteSizeValue capacity = new ByteSizeValue(1, ByteSizeUnit.GB); - int numberOfParts = 20; - final ByteSizeValue partSize = new ByteSizeValue(capacity.getBytes() / numberOfParts + 1, ByteSizeUnit.BYTES); - - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(1, ByteSizeUnit.MB), - transferQueueConsumerService, - 10, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.NORMAL - ); + private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) throws IOException, InterruptedException { + final ByteSizeValue partSize = new ByteSizeValue(1024, ByteSizeUnit.MB); - final long lastPartSize = new ByteSizeValue(200, ByteSizeUnit.MB).getBytes(); + int numberOfParts = 20; + final long lastPartSize = new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(); final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; CountDownLatch countDownLatch = new CountDownLatch(1); AtomicReference exceptionRef = new AtomicReference<>(); @@ -654,9 +594,6 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); - when(blobStore.getLowPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); - when(blobStore.getNormalPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); - final boolean serverSideEncryption = randomBoolean(); when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); @@ -707,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, writePriority, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { @@ -731,4 +668,5 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }); } + } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index aba914e2ca98c..ceab06bd051e9 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -68,8 +68,6 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; -import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; -import org.opensearch.repositories.s3.async.TransferSemaphoresHolder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -88,8 +86,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -117,12 +113,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; - private ExecutorService remoteTransferRetry; - private ExecutorService transferQueueConsumerService; - private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; - private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; - private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; @Before public void setUp() throws Exception { @@ -133,26 +124,7 @@ public void setUp() throws Exception { futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); transferNIOGroup = new AsyncTransferEventLoopGroup(1); - remoteTransferRetry = Executors.newFixedThreadPool(20); - transferQueueConsumerService = Executors.newFixedThreadPool(2); - scheduler = new ScheduledThreadPoolExecutor(1); - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), - transferQueueConsumerService, - 2, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.NORMAL - ); - lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), - transferQueueConsumerService, - 2, - genericStatsMetricPublisher, - SizeBasedBlockingQ.QueueEventType.LOW - ); - normalPrioritySizeBasedBlockingQ.start(); - lowPrioritySizeBasedBlockingQ.start(); + // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); super.setUp(); @@ -164,11 +136,6 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); futureCompletionService.shutdown(); - remoteTransferRetry.shutdown(); - transferQueueConsumerService.shutdown(); - scheduler.shutdown(); - normalPrioritySizeBasedBlockingQ.close(); - lowPrioritySizeBasedBlockingQ.close(); IOUtils.close(transferNIOGroup); if (previousOpenSearchPathConf != null) { @@ -237,7 +204,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( streamReaderService, transferNIOGroup ); - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); + return new S3BlobContainer( BlobPath.cleanPath(), new S3BlobStore( @@ -255,21 +222,11 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader(), - new TransferSemaphoresHolder( - 3, - Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), - 5, - TimeUnit.MINUTES, - genericStatsMetricPublisher - ) + asyncExecutorContainer.getStreamReader() ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer, - normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ, - genericStatsMetricPublisher + asyncExecutorContainer ) ) { @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index f8e9903bb3577..6fec535ae6301 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -169,11 +169,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { null, null, null, - false, - null, - null, - null, - null + false ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 55b58895abfd4..b753b847df869 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -33,7 +33,6 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.blobstore.ZeroInputStream; -import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -45,7 +44,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.ArgumentMatchers.any; @@ -63,19 +61,11 @@ public class AsyncTransferManagerTests extends OpenSearchTestCase { @Before public void setUp() throws Exception { s3AsyncClient = mock(S3AsyncClient.class); - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); asyncTransferManager = new AsyncTransferManager( ByteSizeUnit.MB.toBytes(5), Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor(), - new TransferSemaphoresHolder( - 3, - Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), - 5, - TimeUnit.MINUTES, - genericStatsMetricPublisher - ) + Executors.newSingleThreadExecutor() ); super.setUp(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java deleted file mode 100644 index 5be4037407d23..0000000000000 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3.async; - -import org.opensearch.core.common.unit.ByteSizeUnit; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.repositories.s3.GenericStatsMetricPublisher; -import org.opensearch.repositories.s3.S3TransferRejectedException; -import org.opensearch.test.OpenSearchTestCase; -import org.junit.After; -import org.junit.Before; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -public class SizeBasedBlockingQTests extends OpenSearchTestCase { - private ExecutorService consumerService; - private ExecutorService producerService; - - @Override - @Before - public void setUp() throws Exception { - this.consumerService = Executors.newFixedThreadPool(10); - this.producerService = Executors.newFixedThreadPool(100); - super.setUp(); - } - - @After - public void tearDown() throws Exception { - consumerService.shutdown(); - producerService.shutdown(); - super.tearDown(); - } - - public void testProducerConsumerOfBulkItems() throws InterruptedException { - GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - SizeBasedBlockingQ.QueueEventType queueEventType = randomBoolean() - ? SizeBasedBlockingQ.QueueEventType.NORMAL - : SizeBasedBlockingQ.QueueEventType.LOW; - SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(ByteSizeUnit.BYTES.toBytes(10)), - consumerService, - 10, - genericStatsMetricPublisher, - queueEventType - ); - sizeBasedBlockingQ.start(); - int numOfItems = randomIntBetween(100, 1000); - CountDownLatch latch = new CountDownLatch(numOfItems); - AtomicBoolean unknownError = new AtomicBoolean(); - for (int i = 0; i < numOfItems; i++) { - final int idx = i; - producerService.submit(() -> { - boolean throwException = randomBoolean(); - - SizeBasedBlockingQ.Item item = new TestItemToStr(randomIntBetween(1, 5), () -> { - latch.countDown(); - if (throwException) { - throw new RuntimeException("throwing random exception"); - } - }, idx); - - try { - sizeBasedBlockingQ.produce(item); - } catch (InterruptedException e) { - latch.countDown(); - unknownError.set(true); - throw new RuntimeException(e); - } catch (S3TransferRejectedException ex) { - latch.countDown(); - } - }); - } - latch.await(); - sizeBasedBlockingQ.close(); - assertFalse(unknownError.get()); - assertEquals(0L, genericStatsMetricPublisher.getNormalPriorityQSize()); - assertEquals(0L, genericStatsMetricPublisher.getLowPriorityQSize()); - } - - static class TestItemToStr extends SizeBasedBlockingQ.Item { - private final int id; - - public TestItemToStr(long size, Runnable consumable, int id) { - super(size, consumable); - this.id = id; - } - - @Override - public String toString() { - return String.valueOf(id); - } - } -} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java deleted file mode 100644 index 236f02c5eb1f7..0000000000000 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.s3.async; - -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.repositories.s3.GenericStatsMetricPublisher; -import org.opensearch.test.OpenSearchTestCase; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.mockito.Mockito; - -import static org.opensearch.repositories.s3.async.TransferSemaphoresHolder.TypeSemaphore.PermitType; - -public class TransferSemaphoresHolderTests extends OpenSearchTestCase { - - public void testAllocation() { - int availablePermits = randomIntBetween(5, 20); - double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int normalPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - normalPermits; - GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( - normalPermits, - lowPermits, - 1, - TimeUnit.NANOSECONDS, - genericStatsPublisher - ); - assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); - assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); - } - - public void testLowPriorityEventPermitAcquisition() throws InterruptedException { - int availablePermits = randomIntBetween(5, 50); - double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int normalPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - normalPermits; - GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( - normalPermits, - lowPermits, - 1, - TimeUnit.NANOSECONDS, - genericStatsPublisher - ); - - List semaphores = new ArrayList<>(); - int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); - - int lowAcquisitionsExpected = (normalPermitsEligibleForLowEvents + lowPermits); - for (int i = 0; i < lowAcquisitionsExpected; i++) { - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.LOW, - requestContext - ); - semaphores.add(acquiredSemaphore); - if (i >= lowPermits) { - assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); - } else { - assertEquals(PermitType.LOW, acquiredSemaphore.getType()); - } - } - - for (int i = 0; i < normalPermits - normalPermitsEligibleForLowEvents; i++) { - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.NORMAL, - requestContext - ); - assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); - semaphores.add(acquiredSemaphore); - } - - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.LOW, - requestContext - ); - assertNull(acquiredSemaphore); - - assertEquals(availablePermits, semaphores.size()); - semaphores.forEach(Semaphore::release); - assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); - assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); - - } - - public void testNormalPermitEventAcquisition() throws InterruptedException { - int availablePermits = randomIntBetween(5, 50); - double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int normalPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - normalPermits; - GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( - normalPermits, - lowPermits, - 1, - TimeUnit.NANOSECONDS, - genericStatsPublisher - ); - - List semaphores = new ArrayList<>(); - List lowSemaphores = new ArrayList<>(); - int normalAcquisitionsExpected = normalPermits + lowPermits; - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - for (int i = 0; i < normalAcquisitionsExpected; i++) { - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.NORMAL, - requestContext - ); - semaphores.add(acquiredSemaphore); - if (i >= normalPermits) { - assertEquals(PermitType.LOW, acquiredSemaphore.getType()); - lowSemaphores.add(acquiredSemaphore); - } else { - assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); - } - } - assertEquals(availablePermits, semaphores.size()); - - int lowAcquired = lowPermits; - - Semaphore removedLowSemaphore = lowSemaphores.remove(0); - removedLowSemaphore.release(); - semaphores.remove(removedLowSemaphore); - - requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.LOW, - requestContext - ); - semaphores.add(acquiredSemaphore); - lowSemaphores.add(acquiredSemaphore); - while (lowAcquired > 1) { - requestContext = transferSemaphoresHolder.createRequestContext(); - acquiredSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.NORMAL, requestContext); - assertNull(acquiredSemaphore); - lowAcquired--; - } - - semaphores.forEach(Semaphore::release); - assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); - assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); - assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); - } - - private static class TestTransferSemaphoresHolder extends TransferSemaphoresHolder { - AtomicInteger normalWaitCount = new AtomicInteger(); - AtomicInteger lowWaitCount = new AtomicInteger(); - - /** - * Constructor to create semaphores holder. - */ - public TestTransferSemaphoresHolder( - int normalPermits, - int lowPermits, - int acquireWaitDuration, - TimeUnit timeUnit, - GenericStatsMetricPublisher genericStatsMetricPublisher - ) throws InterruptedException { - super(normalPermits, lowPermits, acquireWaitDuration, timeUnit, genericStatsMetricPublisher); - TypeSemaphore executingNormalSemaphore = normalPrioritySemaphore; - TypeSemaphore executingLowSemaphore = lowPrioritySemaphore; - - this.normalPrioritySemaphore = Mockito.spy(normalPrioritySemaphore); - this.lowPrioritySemaphore = Mockito.spy(lowPrioritySemaphore); - Mockito.doAnswer(invocation -> { - normalWaitCount.incrementAndGet(); - return false; - }).when(normalPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); - Mockito.doAnswer(invocation -> executingNormalSemaphore.availablePermits()).when(normalPrioritySemaphore).availablePermits(); - Mockito.doAnswer(invocation -> executingNormalSemaphore.tryAcquire()).when(normalPrioritySemaphore).tryAcquire(); - - Mockito.doAnswer(invocation -> { - lowWaitCount.incrementAndGet(); - return false; - }).when(lowPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); - Mockito.doAnswer(invocation -> executingLowSemaphore.availablePermits()).when(lowPrioritySemaphore).availablePermits(); - Mockito.doAnswer(invocation -> executingLowSemaphore.tryAcquire()).when(lowPrioritySemaphore).tryAcquire(); - } - } - - public void testNormalSemaphoreAcquiredWait() throws InterruptedException { - int availablePermits = randomIntBetween(10, 50); - double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int normalPermits = (int) (availablePermits * priorityAllocation); - GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( - normalPermits, - availablePermits - normalPermits, - 5, - TimeUnit.MINUTES, - genericStatsPublisher - ); - - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.LOW, requestContext); - assertEquals(PermitType.LOW, lowSemaphore.getType()); - for (int i = 0; i < normalPermits; i++) { - requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.NORMAL, - requestContext - ); - assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); - } - - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.NORMAL, - requestContext - ); - assertNull(acquiredSemaphore); - assertEquals(1, transferSemaphoresHolder.normalWaitCount.get()); - assertEquals(0, transferSemaphoresHolder.lowWaitCount.get()); - } - - public void testLowSemaphoreAcquiredWait() throws InterruptedException { - int availablePermits = randomIntBetween(10, 50); - double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int normalPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - normalPermits; - GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10); - TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( - normalPermits, - lowPermits, - 5, - TimeUnit.MINUTES, - genericStatsPublisher - ); - - TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); - for (int i = 0; i < normalPermitsEligibleForLowEvents; i++) { - TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.NORMAL, - requestContext - ); - assertEquals(PermitType.NORMAL, lowSemaphore.getType()); - } - - for (int i = 0; i < lowPermits; i++) { - requestContext = transferSemaphoresHolder.createRequestContext(); - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.LOW, - requestContext - ); - assertEquals(PermitType.LOW, acquiredSemaphore.getType()); - } - - TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.LOW, - requestContext - ); - assertNull(acquiredSemaphore); - assertEquals(1, transferSemaphoresHolder.lowWaitCount.get()); - assertEquals(0, transferSemaphoresHolder.normalWaitCount.get()); - } - -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index 8ce8ec8e01abe..0f6646d37f950 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -75,7 +75,6 @@ default void reload(RepositoryMetadata repositoryMetadata) {} * Metrics for BlobStore interactions */ enum Metric { - GENERIC_STATS("generic_stats"), REQUEST_SUCCESS("request_success_total"), REQUEST_FAILURE("request_failures_total"), REQUEST_LATENCY("request_time_in_millis"), diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index 4e8db0a3a8c69..3f341c878c3c7 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -14,12 +14,7 @@ * @opensearch.internal */ public enum WritePriority { - // Used for segment transfers during refresh, flush or merges NORMAL, - // Used for transfer of translog or ckp files. HIGH, - // Used for transfer of remote cluster state - URGENT, - // All other background transfers such as in snapshot recovery, recovery from local store or index etc. - LOW + URGENT } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 36c9e9e663212..fb2320cddc683 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -438,14 +438,10 @@ private void uploadNewSegments( batchUploadListener.onFailure(ex); }); statsListener.beforeUpload(src); - remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload()); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener); } } - private boolean isLowPriorityUpload() { - return isLocalOrSnapshotRecovery(); - } - /** * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index ab76150f8f83d..345583bbbd1be 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -29,7 +29,6 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.index.store.exception.ChecksumCombinationException; import java.io.FileNotFoundException; @@ -324,12 +323,11 @@ public boolean copyFrom( String remoteFileName, IOContext context, Runnable postUploadRunner, - ActionListener listener, - boolean lowPriorityUpload + ActionListener listener ) { if (blobContainer instanceof AsyncMultiStreamBlobContainer) { try { - uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener, lowPriorityUpload); + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); } catch (Exception e) { listener.onFailure(e); } @@ -344,8 +342,7 @@ private void uploadBlob( String remoteFileName, IOContext ioContext, Runnable postUploadRunner, - ActionListener listener, - boolean lowPriorityUpload + ActionListener listener ) throws Exception { long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; @@ -356,13 +353,12 @@ private void uploadBlob( if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } - lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( src, remoteFileName, contentLength, true, - lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, + WritePriority.NORMAL, (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), expectedChecksum, remoteIntegrityEnabled diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 8c0ecb4cc783a..ec1163fe91b6c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -453,7 +453,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param context IOContext to be used to open IndexInput of file during remote upload * @param listener Listener to handle upload callback events */ - public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { + public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { @@ -462,7 +462,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } catch (IOException e) { throw new RuntimeException("Exception in segment postUpload for file " + src, e); } - }, listener, lowPriorityUpload); + }, listener); if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index ee81369725e6f..9e38e1749d434 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -104,8 +104,7 @@ public void onResponse(Void t) { public void onFailure(Exception e) { fail("Listener responded with exception" + e); } - }, - false + } ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertTrue(postUploadInvoked.get()); @@ -142,8 +141,7 @@ public void onResponse(Void t) { public void onFailure(Exception e) { countDownLatch.countDown(); } - }, - false + } ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertFalse(postUploadInvoked.get()); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 567199cf64cd8..b1e2028d761f0 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -639,7 +639,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) {} }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); storeDirectory.close(); @@ -683,7 +683,7 @@ public void onFailure(Exception e) { latch.countDown(); } }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));