From dc6f5aa80bab5f45bdfba421b69d5e6fbc53969c Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 20 Aug 2023 01:28:56 +0530 Subject: [PATCH 01/14] Rate Limiter integration for remote transfer --- .../org/opensearch/common/StreamLimiter.java | 51 +++++++++++++ .../RateLimitingOffsetRangeInputStream.java | 72 +++++++++++++++++++ .../blobstore/RateLimitingInputStream.java | 39 ++-------- .../store/RemoteSegmentStoreDirectory.java | 13 +++- .../RemoteSegmentStoreDirectoryFactory.java | 8 ++- .../blobstore/BlobStoreRepository.java | 40 ++++++++++- 6 files changed, 184 insertions(+), 39 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/StreamLimiter.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java new file mode 100644 index 0000000000000..c7b101ab75831 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import org.apache.lucene.store.RateLimiter; + +import java.io.IOException; +import java.util.function.Supplier; + +public class StreamLimiter { + + private final Supplier rateLimiterSupplier; + + private final StreamLimiter.Listener listener; + + private int bytesSinceLastRateLimit; + + public StreamLimiter(Supplier rateLimiterSupplier, Listener listener) { + this.rateLimiterSupplier = rateLimiterSupplier; + this.listener = listener; + } + + public void maybePause(int bytes) throws IOException { + bytesSinceLastRateLimit += bytes; + final RateLimiter rateLimiter = rateLimiterSupplier.get(); + if (rateLimiter != null) { + if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { + long pause = rateLimiter.pause(bytesSinceLastRateLimit); + bytesSinceLastRateLimit = 0; + if (pause > 0) { + listener.onPause(pause); + } + } + } + } + + /** + * Internal listener + * + * @opensearch.internal + */ + public interface Listener { + void onPause(long nanos); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java new file mode 100644 index 0000000000000..cb5fea46ac7d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.RateLimiter; +import org.opensearch.common.StreamLimiter; + +import java.io.IOException; +import java.util.function.Supplier; + +public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream { + + private StreamLimiter streamLimiter; + + private OffsetRangeInputStream delegate; + + public RateLimitingOffsetRangeInputStream( + OffsetRangeInputStream delegate, + Supplier rateLimiterSupplier, + StreamLimiter.Listener listener + ) { + this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener); + this.delegate = delegate; + } + + @Override + public int read() throws IOException { + int b = delegate.read(); + streamLimiter.maybePause(1); + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int n = delegate.read(b, off, len); + if (n > 0) { + streamLimiter.maybePause(n); + } + return n; + } + + @Override + public synchronized void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + + @Override + public long getFilePointer() throws IOException { + return delegate.getFilePointer(); + } + + @Override + public synchronized void reset() throws IOException { + delegate.reset(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java index 86ecef1173e48..048a7db2d4ba4 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -33,6 +33,7 @@ package org.opensearch.index.snapshots.blobstore; import org.apache.lucene.store.RateLimiter; +import org.opensearch.common.StreamLimiter; import java.io.FilterInputStream; import java.io.IOException; @@ -46,45 +47,17 @@ */ public class RateLimitingInputStream extends FilterInputStream { - private final Supplier rateLimiterSupplier; + private StreamLimiter streamLimiter; - private final Listener listener; - - private long bytesSinceLastRateLimit; - - /** - * Internal listener - * - * @opensearch.internal - */ - public interface Listener { - void onPause(long nanos); - } - - public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, Listener listener) { + public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, StreamLimiter.Listener listener) { super(delegate); - this.rateLimiterSupplier = rateLimiterSupplier; - this.listener = listener; - } - - private void maybePause(int bytes) throws IOException { - bytesSinceLastRateLimit += bytes; - final RateLimiter rateLimiter = rateLimiterSupplier.get(); - if (rateLimiter != null) { - if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { - long pause = rateLimiter.pause(bytesSinceLastRateLimit); - bytesSinceLastRateLimit = 0; - if (pause > 0) { - listener.onPause(pause); - } - } - } + this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener); } @Override public int read() throws IOException { int b = super.read(); - maybePause(1); + streamLimiter.maybePause(1); return b; } @@ -92,7 +65,7 @@ public int read() throws IOException { public int read(byte[] b, int off, int len) throws IOException { int n = super.read(b, off, len); if (n > 0) { - maybePause(n); + streamLimiter.maybePause(n); } return n; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index b1077bef5b492..22e82ac3b4f7a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -32,6 +32,7 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.util.ByteUtils; @@ -59,6 +60,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -101,6 +103,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; + private final BiFunction rateLimitedTransfer; + /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -128,13 +132,15 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, - ThreadPool threadPool + ThreadPool threadPool, + BiFunction rateLimitedTransfer ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; + this.rateLimitedTransfer = rateLimitedTransfer; init(); } @@ -464,7 +470,10 @@ private void uploadBlob(Directory from, String src, String remoteFileName, IOCon contentLength, true, WritePriority.NORMAL, - (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), + (size, position) -> rateLimitedTransfer.apply( + new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), + true + ), expectedChecksum, remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer ); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 3de7a706c0688..3f3b35bc4bf9c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -66,7 +66,13 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); + return new RemoteSegmentStoreDirectory( + dataDirectory, + metadataDirectory, + mdLockManager, + threadPool, + ((BlobStoreRepository) repository)::maybeRateLimitRemoteTransfer + ); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 693022a60cc09..86ef46a43838e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -73,6 +73,11 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream; import org.opensearch.common.collect.Tuple; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; @@ -89,9 +94,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.Strings; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.compress.Compressor; @@ -295,10 +297,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final RateLimiter restoreRateLimiter; + private final RateLimiter remoteUploadRateLimiter; + + private final RateLimiter remoteDownloadRateLimiter; + private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric(); + private final CounterMetric remoteDownloadRateLimitingTimeInNanos = new CounterMetric(); + + private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric(); + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "metadata", METADATA_NAME_FORMAT, @@ -398,6 +408,8 @@ protected BlobStoreRepository( this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); + remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); + remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); readOnly = READONLY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); @@ -3009,6 +3021,14 @@ private static InputStream maybeRateLimit(InputStream stream, Supplier rateLimiterSupplier, + CounterMetric metric + ) { + return new RateLimitingOffsetRangeInputStream(offsetRangeInputStream, rateLimiterSupplier, metric::inc); + } + public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos), @@ -3017,6 +3037,20 @@ public InputStream maybeRateLimitRestores(InputStream stream) { ); } + public OffsetRangeInputStream maybeRateLimitRemoteTransfer(OffsetRangeInputStream offsetRangeInputStream, boolean isUpload) { + return isUpload + ? maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteUploadRateLimiter, remoteUploadRateLimitingTimeInNanos) + : maybeRateLimitRemoteTransfers( + maybeRateLimitRemoteTransfers( + offsetRangeInputStream, + () -> remoteDownloadRateLimiter, + remoteDownloadRateLimitingTimeInNanos + ), + recoverySettings::rateLimiter, + remoteDownloadRateLimitingTimeInNanos + ); + } + public InputStream maybeRateLimitSnapshots(InputStream stream) { return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); } From 386e73774a2e2c6f46e1809d9a4151e4b5dabcc7 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 20 Aug 2023 02:40:26 +0530 Subject: [PATCH 02/14] Minor refactor Signed-off-by: Bukhtawar Khan --- .../org/opensearch/common/StreamLimiter.java | 3 ++ .../RateLimitingOffsetRangeInputStream.java | 13 +++++++-- .../store/RemoteSegmentStoreDirectory.java | 13 ++++----- .../RemoteSegmentStoreDirectoryFactory.java | 2 +- .../blobstore/BlobStoreRepository.java | 28 +++++++++---------- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java index c7b101ab75831..40a922dd707dd 100644 --- a/server/src/main/java/org/opensearch/common/StreamLimiter.java +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -13,6 +13,9 @@ import java.io.IOException; import java.util.function.Supplier; +/** + * The stream limiter that limits the transfer of bytes + */ public class StreamLimiter { private final Supplier rateLimiterSupplier; diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java index cb5fea46ac7d6..9d8d3cffb227a 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java @@ -14,12 +14,21 @@ import java.io.IOException; import java.util.function.Supplier; +/** + * Rate Limits an {@link OffsetRangeInputStream} + */ public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream { - private StreamLimiter streamLimiter; + private final StreamLimiter streamLimiter; - private OffsetRangeInputStream delegate; + private final OffsetRangeInputStream delegate; + /** + * The ctor for RateLimitingOffsetRangeInputStream + * @param delegate the underlying {@link OffsetRangeInputStream} + * @param rateLimiterSupplier the supplier for {@link RateLimiter} + * @param listener the listener to be invoked on rate limits + */ public RateLimitingOffsetRangeInputStream( OffsetRangeInputStream delegate, Supplier rateLimiterSupplier, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 22e82ac3b4f7a..7c854a487b7e5 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -60,7 +60,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.zip.CRC32; @@ -103,7 +103,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final BiFunction rateLimitedTransfer; + private final UnaryOperator rateLimitedTransferFilter; /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. @@ -133,14 +133,14 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - BiFunction rateLimitedTransfer + UnaryOperator rateLimitedTransferFilter ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.rateLimitedTransfer = rateLimitedTransfer; + this.rateLimitedTransferFilter = rateLimitedTransferFilter; init(); } @@ -470,9 +470,8 @@ private void uploadBlob(Directory from, String src, String remoteFileName, IOCon contentLength, true, WritePriority.NORMAL, - (size, position) -> rateLimitedTransfer.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), - true + (size, position) -> rateLimitedTransferFilter.apply( + new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) ), expectedChecksum, remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 3f3b35bc4bf9c..e0d91739f3e4a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -71,7 +71,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh metadataDirectory, mdLockManager, threadPool, - ((BlobStoreRepository) repository)::maybeRateLimitRemoteTransfer + ((BlobStoreRepository) repository)::maybeRateLimitRemoteUploadTransfers ); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 86ef46a43838e..72d42eda392c3 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -73,9 +73,6 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; -import org.opensearch.core.common.Strings; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream; import org.opensearch.common.collect.Tuple; @@ -94,6 +91,9 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.compress.Compressor; @@ -3037,18 +3037,16 @@ public InputStream maybeRateLimitRestores(InputStream stream) { ); } - public OffsetRangeInputStream maybeRateLimitRemoteTransfer(OffsetRangeInputStream offsetRangeInputStream, boolean isUpload) { - return isUpload - ? maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteUploadRateLimiter, remoteUploadRateLimitingTimeInNanos) - : maybeRateLimitRemoteTransfers( - maybeRateLimitRemoteTransfers( - offsetRangeInputStream, - () -> remoteDownloadRateLimiter, - remoteDownloadRateLimitingTimeInNanos - ), - recoverySettings::rateLimiter, - remoteDownloadRateLimitingTimeInNanos - ); + public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) { + return maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteUploadRateLimiter, remoteUploadRateLimitingTimeInNanos); + } + + public OffsetRangeInputStream maybeRateLimitRemoteDownloadTransfers(OffsetRangeInputStream offsetRangeInputStream) { + return maybeRateLimitRemoteTransfers( + maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteDownloadRateLimiter, remoteDownloadRateLimitingTimeInNanos), + recoverySettings::rateLimiter, + remoteDownloadRateLimitingTimeInNanos + ); } public InputStream maybeRateLimitSnapshots(InputStream stream) { From 55068fd3b55770c7a1b6ca1f6d01cedb936af8fb Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 20 Aug 2023 23:08:33 +0530 Subject: [PATCH 03/14] Test fixes Signed-off-by: Bukhtawar Khan --- .../index/store/RemoteBufferedOutputDirectory.java | 2 +- .../java/org/opensearch/index/store/RemoteDirectory.java | 8 ++++++-- .../index/store/RemoteSegmentStoreDirectoryFactory.java | 2 +- .../repositories/blobstore/BlobStoreRepository.java | 8 ++++++++ .../index/shard/RemoteStoreRefreshListenerTests.java | 3 ++- .../org/opensearch/index/store/RemoteDirectoryTests.java | 2 +- .../index/store/RemoteSegmentStoreDirectoryTests.java | 3 ++- .../org/opensearch/index/shard/IndexShardTestCase.java | 8 ++++---- 8 files changed, 25 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java index b430ae2a6bc9c..6fc4403563655 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java @@ -22,7 +22,7 @@ */ public class RemoteBufferedOutputDirectory extends RemoteDirectory { public RemoteBufferedOutputDirectory(BlobContainer blobContainer) { - super(blobContainer); + super(blobContainer, s -> s); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index a5e02a5baed69..1613e72086f20 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; /** @@ -45,12 +46,15 @@ public class RemoteDirectory extends Directory { protected final BlobContainer blobContainer; + protected final UnaryOperator rateLimitedFilter; + public BlobContainer getBlobContainer() { return blobContainer; } - public RemoteDirectory(BlobContainer blobContainer) { + public RemoteDirectory(BlobContainer blobContainer, UnaryOperator rateLimitedFilter) { this.blobContainer = blobContainer; + this.rateLimitedFilter = rateLimitedFilter; } /** @@ -149,7 +153,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { InputStream inputStream = null; try { inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, inputStream, fileLength(name)); + return new RemoteIndexInput(name, rateLimitedFilter.apply(inputStream), fileLength(name)); } catch (Exception e) { // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) inputStream.close(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index e0d91739f3e4a..80147ae0f0061 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -81,6 +81,6 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { BlobPath extendedPath = commonBlobPath.add(extention); BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); - return new RemoteDirectory(dataBlobContainer); + return new RemoteDirectory(dataBlobContainer, ((BlobStoreRepository) repository)::maybeRateLimitRemoteDownloadTransfers); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 72d42eda392c3..556025834850a 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3049,6 +3049,14 @@ public OffsetRangeInputStream maybeRateLimitRemoteDownloadTransfers(OffsetRangeI ); } + public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) { + return maybeRateLimit( + maybeRateLimit(inputStream, () -> remoteDownloadRateLimiter, remoteDownloadRateLimitingTimeInNanos), + recoverySettings::rateLimiter, + remoteDownloadRateLimitingTimeInNanos + ); + } + public InputStream maybeRateLimitSnapshots(InputStream stream) { return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 83b07e986bcc5..d6aa723d685b0 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -152,7 +152,8 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { mock(RemoteDirectory.class), remoteMetadataDirectory, mock(RemoteStoreLockManager.class), - mock(ThreadPool.class) + mock(ThreadPool.class), + (s) -> s ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index b220b0891f11d..0a2107a0fc960 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -47,7 +47,7 @@ public class RemoteDirectoryTests extends OpenSearchTestCase { @Before public void setup() { blobContainer = mock(BlobContainer.class); - remoteDirectory = new RemoteDirectory(blobContainer); + remoteDirectory = new RemoteDirectory(blobContainer, s -> s); } public void testListAllEmpty() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 91154e5b77641..a5c65d5b53315 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -97,7 +97,8 @@ public void setup() throws IOException { remoteDataDirectory, remoteMetadataDirectory, mdLockManager, - threadPool + threadPool, + (s) -> s ); testUploadTracker = new TestUploadListener(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 8348584379f9c..d1a7022ff55d5 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -781,11 +781,11 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, (s) -> s); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { - return new RemoteDirectory(getBlobContainer(f)); + return new RemoteDirectory(getBlobContainer(f), s -> s); } protected BlobContainer getBlobContainer(Path f) throws IOException { @@ -1160,8 +1160,8 @@ protected void startReplicaAfterRecovery( IndexShardRoutingTable newRoutingTable = initializingReplicaRouting.isRelocationTarget() ? new IndexShardRoutingTable.Builder(routingTable).removeShard(primary.routingEntry()).addShard(replica.routingEntry()).build() : new IndexShardRoutingTable.Builder(routingTable).removeShard(initializingReplicaRouting) - .addShard(replica.routingEntry()) - .build(); + .addShard(replica.routingEntry()) + .build(); Set inSyncIdsWithReplica = new HashSet<>(inSyncIds); inSyncIdsWithReplica.add(replica.routingEntry().allocationId().getId()); // update both primary and replica shard state From c5cd0c3f8e916afe0b726afcad9e4f2f6688e9d0 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 21 Aug 2023 23:51:33 +0530 Subject: [PATCH 04/14] Refactor RemoteDirectory Signed-off-by: Bukhtawar Khan --- .../store/RemoteBufferedOutputDirectory.java | 2 +- .../index/store/RemoteDirectory.java | 139 +++++++++++++++++- .../store/RemoteSegmentStoreDirectory.java | 138 +++-------------- .../RemoteSegmentStoreDirectoryFactory.java | 36 +++-- .../RemoteStoreRefreshListenerTests.java | 3 +- .../index/store/RemoteDirectoryTests.java | 2 +- .../RemoteSegmentStoreDirectoryTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 4 +- 8 files changed, 181 insertions(+), 146 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java index 6fc4403563655..b704416d489d9 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java @@ -22,7 +22,7 @@ */ public class RemoteBufferedOutputDirectory extends RemoteDirectory { public RemoteBufferedOutputDirectory(BlobContainer blobContainer) { - super(blobContainer, s -> s); + super(blobContainer, s -> s, s -> s); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 1613e72086f20..5193c43bd641c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -8,15 +8,30 @@ package org.opensearch.index.store; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.exception.CorruptFileException; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.store.exception.ChecksumCombinationException; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,6 +47,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.zip.CRC32; + +import com.jcraft.jzlib.JZlib; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -46,15 +64,29 @@ public class RemoteDirectory extends Directory { protected final BlobContainer blobContainer; - protected final UnaryOperator rateLimitedFilter; + protected final UnaryOperator uploadRateLimiter; + + protected final UnaryOperator downloadRateLimiter; + + /** + * Number of bytes in the segment file to store checksum + */ + private static final int SEGMENT_CHECKSUM_BYTES = 8; + + private static final Logger logger = LogManager.getLogger(RemoteDirectory.class); public BlobContainer getBlobContainer() { return blobContainer; } - public RemoteDirectory(BlobContainer blobContainer, UnaryOperator rateLimitedFilter) { + public RemoteDirectory( + BlobContainer blobContainer, + UnaryOperator uploadRateLimiter, + UnaryOperator downloadRateLimiter + ) { this.blobContainer = blobContainer; - this.rateLimitedFilter = rateLimitedFilter; + this.uploadRateLimiter = uploadRateLimiter; + this.downloadRateLimiter = downloadRateLimiter; } /** @@ -153,7 +185,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { InputStream inputStream = null; try { inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, rateLimitedFilter.apply(inputStream), fileLength(name)); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength(name)); } catch (Exception e) { // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) inputStream.close(); @@ -263,4 +295,103 @@ public Lock obtainLock(String name) throws IOException { public void delete() throws IOException { blobContainer.delete(); } + + public boolean copyFrom( + Directory from, + String src, + IOContext context, + Runnable postUploadRunner, + String remoteFileName, + ActionListener listener + ) { + if (blobContainer instanceof VerifyingMultiStreamBlobContainer) { + try { + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); + } catch (Exception e) { + listener.onFailure(e); + } + return true; + } + return false; + } + + private void uploadBlob( + Directory from, + String src, + String remoteFileName, + IOContext ioContext, + Runnable postUploadRunner, + ActionListener listener + ) throws Exception { + long expectedChecksum = calculateChecksumOfChecksum(from, src); + long contentLength; + try (IndexInput indexInput = from.openInput(src, ioContext)) { + contentLength = indexInput.length(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + WritePriority.NORMAL, + (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), + expectedChecksum, + this.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer + ); + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postUploadRunner.run(); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); + + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((VerifyingMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + } + + private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + try { + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum in segment file: " + + file + + ", directory: " + + directory, + file, + e + ); + } + } + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 7c854a487b7e5..9cb51f9f65b56 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; @@ -24,21 +23,11 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; -import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; -import org.opensearch.common.blobstore.exception.CorruptFileException; -import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -60,11 +49,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; - -import com.jcraft.jzlib.JZlib; /** * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. @@ -85,11 +70,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; - /** - * Number of bytes in the segment file to store checksum - */ - private static final int SEGMENT_CHECKSUM_BYTES = 8; - /** * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data */ @@ -103,8 +83,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final UnaryOperator rateLimitedTransferFilter; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -132,15 +110,13 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, - ThreadPool threadPool, - UnaryOperator rateLimitedTransferFilter + ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.rateLimitedTransferFilter = rateLimitedTransferFilter; init(); } @@ -439,79 +415,25 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param listener Listener to handle upload callback events */ public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { - if (remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) { - try { - String remoteFilename = getNewRemoteSegmentFilename(src); - uploadBlob(from, src, remoteFilename, context, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } else { - try { + try { + final String remoteFileName = getNewRemoteSegmentFilename(src); + boolean uploaded = remoteDataDirectory.copyFrom(from, src, context, () -> { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + } catch (IOException e) { + throw new RuntimeException("Exception in segment postUpload for file " + src, e); + } + }, remoteFileName, listener); + if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); - listener.onFailure(e); } + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); + listener.onFailure(e); } } - private void uploadBlob(Directory from, String src, String remoteFileName, IOContext ioContext, ActionListener listener) - throws Exception { - long expectedChecksum = calculateChecksumOfChecksum(from, src); - long contentLength; - try (IndexInput indexInput = from.openInput(src, ioContext)) { - contentLength = indexInput.length(); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - src, - remoteFileName, - contentLength, - true, - WritePriority.NORMAL, - (size, position) -> rateLimitedTransferFilter.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) - ), - expectedChecksum, - remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer - ); - ActionListener completionListener = ActionListener.wrap(resp -> { - try { - postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); - listener.onResponse(null); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); - listener.onFailure(e); - } - }, ex -> { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); - IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); - if (corruptIndexException != null) { - listener.onFailure(corruptIndexException); - return; - } - Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); - if (throwable != null) { - CorruptFileException corruptFileException = (CorruptFileException) throwable; - listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); - return; - } - listener.onFailure(ex); - }); - - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); - } - }); - - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).asyncBlobUpload(writeContext, completionListener); - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * @@ -587,13 +509,6 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce return metadataFiles.get(0); } - public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum) throws IOException { - String remoteFilename; - remoteFilename = getNewRemoteSegmentFilename(dest); - remoteDataDirectory.copyFrom(from, src, remoteFilename, context); - postUpload(from, src, remoteFilename, checksum); - } - private void postUpload(Directory from, String src, String remoteFilename, String checksum) throws IOException { UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); @@ -605,7 +520,9 @@ private void postUpload(Directory from, String src, String remoteFilename, Strin */ @Override public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { - copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src)); + String remoteFilename = getNewRemoteSegmentFilename(dest); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename, getChecksumOfLocalFile(from, src)); } /** @@ -739,27 +656,6 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I } } - private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { - try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { - long storedChecksum = CodecUtil.retrieveChecksum(indexInput); - CRC32 checksumOfChecksum = new CRC32(); - checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); - try { - return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); - } catch (Exception e) { - throw new ChecksumCombinationException( - "Potentially corrupted file: Checksum combination failed while combining stored checksum " - + "and calculated checksum of stored checksum in segment file: " - + file - + ", directory: " - + directory, - file, - e - ); - } - } - } - private String getExistingRemoteFilename(String localFilename) { if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 80147ae0f0061..8f74d2ea8edaf 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.Directory; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -23,7 +24,9 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.InputStream; import java.util.function.Supplier; +import java.util.function.UnaryOperator; /** * Factory for a remote store directory @@ -54,11 +57,18 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); + BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); + BlobPath commonBlobPath = blobStoreRepository.basePath(); commonBlobPath = commonBlobPath.add(indexUUID).add(shardId).add(SEGMENTS); - RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); - RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); + RemoteDirectory dataDirectory = createRemoteDirectory( + blobStoreRepository, + commonBlobPath, + "data", + blobStoreRepository::maybeRateLimitRemoteUploadTransfers, + blobStoreRepository::maybeRateLimitRemoteDownloadTransfers + ); + RemoteDirectory metadataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "metadata", r -> r, r -> r); RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( repositoriesService.get(), repositoryName, @@ -66,21 +76,21 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory( - dataDirectory, - metadataDirectory, - mdLockManager, - threadPool, - ((BlobStoreRepository) repository)::maybeRateLimitRemoteUploadTransfers - ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } - private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { + private RemoteDirectory createRemoteDirectory( + BlobStoreRepository repository, + BlobPath commonBlobPath, + String extention, + UnaryOperator uploadRateLimiter, + UnaryOperator downLoadRateLimiter + ) { BlobPath extendedPath = commonBlobPath.add(extention); - BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); - return new RemoteDirectory(dataBlobContainer, ((BlobStoreRepository) repository)::maybeRateLimitRemoteDownloadTransfers); + BlobContainer dataBlobContainer = repository.blobStore().blobContainer(extendedPath); + return new RemoteDirectory(dataBlobContainer, uploadRateLimiter, downLoadRateLimiter); } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index d6aa723d685b0..83b07e986bcc5 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -152,8 +152,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { mock(RemoteDirectory.class), remoteMetadataDirectory, mock(RemoteStoreLockManager.class), - mock(ThreadPool.class), - (s) -> s + mock(ThreadPool.class) ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 0a2107a0fc960..ba9891adbfea2 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -47,7 +47,7 @@ public class RemoteDirectoryTests extends OpenSearchTestCase { @Before public void setup() { blobContainer = mock(BlobContainer.class); - remoteDirectory = new RemoteDirectory(blobContainer, s -> s); + remoteDirectory = new RemoteDirectory(blobContainer, r -> r, r -> r); } public void testListAllEmpty() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index a5c65d5b53315..91154e5b77641 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -97,8 +97,7 @@ public void setup() throws IOException { remoteDataDirectory, remoteMetadataDirectory, mdLockManager, - threadPool, - (s) -> s + threadPool ); testUploadTracker = new TestUploadListener(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index d1a7022ff55d5..20ef6a4053f11 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -781,11 +781,11 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, (s) -> s); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { - return new RemoteDirectory(getBlobContainer(f), s -> s); + return new RemoteDirectory(getBlobContainer(f), r -> r, r -> r); } protected BlobContainer getBlobContainer(Path f) throws IOException { From ef18396551bd30a05c75d58f55eb21a0b013c972 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 21 Aug 2023 23:55:35 +0530 Subject: [PATCH 05/14] Test spotless fix Signed-off-by: Bukhtawar Khan --- .../java/org/opensearch/index/shard/IndexShardTestCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 20ef6a4053f11..b3d9f8b29b310 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1160,8 +1160,8 @@ protected void startReplicaAfterRecovery( IndexShardRoutingTable newRoutingTable = initializingReplicaRouting.isRelocationTarget() ? new IndexShardRoutingTable.Builder(routingTable).removeShard(primary.routingEntry()).addShard(replica.routingEntry()).build() : new IndexShardRoutingTable.Builder(routingTable).removeShard(initializingReplicaRouting) - .addShard(replica.routingEntry()) - .build(); + .addShard(replica.routingEntry()) + .build(); Set inSyncIdsWithReplica = new HashSet<>(inSyncIds); inSyncIdsWithReplica.add(replica.routingEntry().allocationId().getId()); // update both primary and replica shard state From 272de0a364d07fe709f7e050da9fb154d873f421 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 00:33:11 +0530 Subject: [PATCH 06/14] Adding tests Signed-off-by: Bukhtawar Khan --- .../opensearch/remotestore/RemoteStoreIT.java | 2 +- .../remotestore/RemoteStoreRestoreIT.java | 44 ++++++++++ .../remotestore/RemoteStoreThrottlingIT.java | 39 +++++++++ .../multipart/RemoteStoreMultipartIT.java | 46 ++++++++++ .../org/opensearch/common/StreamLimiter.java | 4 + .../store/RemoteBufferedOutputDirectory.java | 2 +- .../index/store/RemoteDirectory.java | 4 + .../RemoteSegmentStoreDirectoryFactory.java | 19 ++-- .../repositories/FilterRepository.java | 10 +++ .../opensearch/repositories/Repository.java | 10 +++ .../blobstore/BlobStoreRepository.java | 87 +++++++++++++++---- .../index/store/RemoteDirectoryTests.java | 2 +- .../RepositoriesServiceTests.java | 10 +++ .../index/shard/IndexShardTestCase.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 10 +++ 15 files changed, 266 insertions(+), 25 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 87ec515ffe740..9a2948861e967 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -38,7 +38,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { - private static final String INDEX_NAME = "remote-store-test-idx-1"; + protected final String INDEX_NAME = "remote-store-test-idx-1"; @Override protected Collection> nodePlugins() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 507ab40084355..183b960c8980c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -14,7 +14,11 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -26,7 +30,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -453,4 +459,42 @@ public void testRTSRestoreNoData() throws IOException { } // TODO: Restore flow - index aliases + + public void testRateLimitedRemoteDownloads() throws Exception { + assertAcked( + client().admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType("fs") + .setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("max_remote_download_bytes_per_sec", "2kb") + .put("chunk_size", 100, ByteSizeUnit.BYTES) + + ) + ); + int shardCount = randomIntBetween(1, 5); + int numberOfIteration = randomIntBetween(2, 5); + testRestoreFlow(numberOfIteration, false, shardCount); + prepareCluster(0, 3, INDEX_NAME, 0, shardCount); + Map indexStats = indexData(numberOfIteration, false, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureRed(INDEX_NAME); + restore(INDEX_NAME); + assertBusy(() -> { + long downloadPauseTime = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + downloadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteDownloadThrottleTimeInNanos(); + } + assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos())); + }, 30, TimeUnit.SECONDS); + ensureGreen(INDEX_NAME); + // This is required to get updated number from already active shards which were not restored + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + assertEquals(0, getNumShards(INDEX_NAME).numReplicas); + verifyRestoredData(indexStats, true, INDEX_NAME); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java new file mode 100644 index 0000000000000..9ad1e7897ce30 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class RemoteStoreThrottlingIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + @Before + public void setup() { + setupRepo(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index a523d5c0f5470..3833ea2537254 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -8,17 +8,24 @@ package org.opensearch.remotestore.multipart; +import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; import org.opensearch.remotestore.RemoteStoreIT; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.repositories.RepositoriesService; import java.nio.file.Path; import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class RemoteStoreMultipartIT extends RemoteStoreIT { @@ -35,4 +42,43 @@ protected void putRepository(Path path) { .setSettings(Settings.builder().put("location", path)) ); } + + public void testRateLimitedRemoteUploads() throws Exception { + internalCluster().startDataOnlyNodes(1); + Client client = client(); + logger.info("--> updating repository"); + Path repositoryLocation = randomRepoPath(); + assertAcked( + client.admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings( + Settings.builder() + .put("location", repositoryLocation) + .put("compress", randomBoolean()) + .put("max_remote_upload_bytes_per_sec", "200b") + .put("chunk_size", 100, ByteSizeUnit.BYTES) + ) + ); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 10; i++) { + index(INDEX_NAME, "_doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + // check if throttling is active + assertBusy(() -> { + long uploadPauseTime = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteUploadThrottleTimeInNanos(); + } + assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos())); + }, 30, TimeUnit.SECONDS); + + assertThat(client.prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value, equalTo(10L)); + } } diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java index 40a922dd707dd..eab1427232999 100644 --- a/server/src/main/java/org/opensearch/common/StreamLimiter.java +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -8,6 +8,8 @@ package org.opensearch.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import java.io.IOException; @@ -24,6 +26,8 @@ public class StreamLimiter { private int bytesSinceLastRateLimit; + private static final Logger logger = LogManager.getLogger(StreamLimiter.class); + public StreamLimiter(Supplier rateLimiterSupplier, Listener listener) { this.rateLimiterSupplier = rateLimiterSupplier; this.listener = listener; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java index b704416d489d9..b430ae2a6bc9c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteBufferedOutputDirectory.java @@ -22,7 +22,7 @@ */ public class RemoteBufferedOutputDirectory extends RemoteDirectory { public RemoteBufferedOutputDirectory(BlobContainer blobContainer) { - super(blobContainer, s -> s, s -> s); + super(blobContainer); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 5193c43bd641c..25d8d3fbf1a0a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -79,6 +79,10 @@ public BlobContainer getBlobContainer() { return blobContainer; } + public RemoteDirectory(BlobContainer blobContainer) { + this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity()); + } + public RemoteDirectory( BlobContainer blobContainer, UnaryOperator uploadRateLimiter, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 8f74d2ea8edaf..1b211f6ded6a6 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -68,7 +68,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh blobStoreRepository::maybeRateLimitRemoteUploadTransfers, blobStoreRepository::maybeRateLimitRemoteDownloadTransfers ); - RemoteDirectory metadataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "metadata", r -> r, r -> r); + RemoteDirectory metadataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "metadata"); RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( repositoriesService.get(), repositoryName, @@ -85,12 +85,21 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh private RemoteDirectory createRemoteDirectory( BlobStoreRepository repository, BlobPath commonBlobPath, - String extention, + String extension, UnaryOperator uploadRateLimiter, UnaryOperator downLoadRateLimiter ) { - BlobPath extendedPath = commonBlobPath.add(extention); - BlobContainer dataBlobContainer = repository.blobStore().blobContainer(extendedPath); - return new RemoteDirectory(dataBlobContainer, uploadRateLimiter, downLoadRateLimiter); + return new RemoteDirectory( + repository.blobStore().blobContainer(commonBlobPath.add(extension)), + uploadRateLimiter, downLoadRateLimiter + ); + } + + private RemoteDirectory createRemoteDirectory( + BlobStoreRepository repository, + BlobPath commonBlobPath, + String extension + ) { + return new RemoteDirectory(repository.blobStore().blobContainer(commonBlobPath.add(extension))); } } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 1aba9e25a0dc2..08f8bcb467d03 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -137,6 +137,16 @@ public long getRestoreThrottleTimeInNanos() { return in.getRestoreThrottleTimeInNanos(); } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return in.getRemoteUploadThrottleTimeInNanos(); + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return in.getRemoteDownloadThrottleTimeInNanos(); + } + @Override public String startVerification() { return in.startVerification(); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 862a8de1e3218..76a3b65c9ea55 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -198,6 +198,16 @@ default void deleteSnapshotsAndReleaseLockFiles( */ long getRestoreThrottleTimeInNanos(); + /** + * Returns restore throttle time in nanoseconds + */ + long getRemoteUploadThrottleTimeInNanos(); + + /** + * Returns restore throttle time in nanoseconds + */ + long getRemoteDownloadThrottleTimeInNanos(); + /** * Returns stats on the repository usage */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 556025834850a..7ad2f8f74cd72 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1790,6 +1790,16 @@ public long getRestoreThrottleTimeInNanos() { return restoreRateLimitingTimeInNanos.count(); } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return remoteUploadRateLimitingTimeInNanos.count(); + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return remoteUploadRateLimitingTimeInNanos.count(); + } + protected void assertSnapshotOrGenericThread() { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" @@ -3017,48 +3027,75 @@ private static ActionListener fileQueueListener( }); } - private static InputStream maybeRateLimit(InputStream stream, Supplier rateLimiterSupplier, CounterMetric metric) { - return new RateLimitingInputStream(stream, rateLimiterSupplier, metric::inc); + private static void mayBeLogRateLimits(BlobStoreTransferContext context, RateLimiter rateLimiter, long time) { + logger.info( + () -> new ParameterizedMessage( + "Rate limited blob store transfer, context [{}], for duration [{} ms] for configured rate [{} MBps]", + context, + TimeValue.timeValueNanos(time).millis(), + rateLimiter.getMBPerSec() + ) + ); + } + + private static InputStream maybeRateLimit( + InputStream stream, + Supplier rateLimiterSupplier, + CounterMetric metric, + BlobStoreTransferContext context + ) { + return new RateLimitingInputStream(stream, rateLimiterSupplier, (t) -> { + mayBeLogRateLimits(context, rateLimiterSupplier.get(), t); + metric.inc(t); + }); } private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( OffsetRangeInputStream offsetRangeInputStream, Supplier rateLimiterSupplier, - CounterMetric metric + CounterMetric metric, + BlobStoreTransferContext context ) { - return new RateLimitingOffsetRangeInputStream(offsetRangeInputStream, rateLimiterSupplier, metric::inc); + return new RateLimitingOffsetRangeInputStream(offsetRangeInputStream, rateLimiterSupplier, (t) -> { + mayBeLogRateLimits(context, rateLimiterSupplier.get(), t); + metric.inc(t); + }); } public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( - maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos), + maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), recoverySettings::rateLimiter, - restoreRateLimitingTimeInNanos + restoreRateLimitingTimeInNanos, + BlobStoreTransferContext.SNAPSHOT_RESTORE ); } public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) { - return maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteUploadRateLimiter, remoteUploadRateLimitingTimeInNanos); - } - - public OffsetRangeInputStream maybeRateLimitRemoteDownloadTransfers(OffsetRangeInputStream offsetRangeInputStream) { return maybeRateLimitRemoteTransfers( - maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteDownloadRateLimiter, remoteDownloadRateLimitingTimeInNanos), - recoverySettings::rateLimiter, - remoteDownloadRateLimitingTimeInNanos + offsetRangeInputStream, + () -> remoteUploadRateLimiter, + remoteUploadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_UPLOAD ); } public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) { return maybeRateLimit( - maybeRateLimit(inputStream, () -> remoteDownloadRateLimiter, remoteDownloadRateLimitingTimeInNanos), + maybeRateLimit( + inputStream, + () -> remoteDownloadRateLimiter, + remoteDownloadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_DOWNLOAD + ), recoverySettings::rateLimiter, - remoteDownloadRateLimitingTimeInNanos + remoteDownloadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_DOWNLOAD ); } public InputStream maybeRateLimitSnapshots(InputStream stream) { - return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); + return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT); } @Override @@ -3419,4 +3456,22 @@ private static final class ShardSnapshotMetaDeleteResult { this.blobsToDelete = blobsToDelete; } } + + enum BlobStoreTransferContext { + REMOTE_UPLOAD("remote_upload"), + REMOTE_DOWNLOAD("remote_download"), + SNAPSHOT("snapshot"), + SNAPSHOT_RESTORE("snapshot_restore"); + + private final String name; + + BlobStoreTransferContext(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index ba9891adbfea2..b220b0891f11d 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -47,7 +47,7 @@ public class RemoteDirectoryTests extends OpenSearchTestCase { @Before public void setup() { blobContainer = mock(BlobContainer.class); - remoteDirectory = new RemoteDirectory(blobContainer, r -> r, r -> r); + remoteDirectory = new RemoteDirectory(blobContainer); } public void testListAllEmpty() throws IOException { diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index c574c6d516fd3..62bc4016d892d 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -281,6 +281,16 @@ public long getRestoreThrottleTimeInNanos() { return 0; } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return 0; + } + @Override public String startVerification() { return null; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b3d9f8b29b310..8348584379f9c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -785,7 +785,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { - return new RemoteDirectory(getBlobContainer(f), r -> r, r -> r); + return new RemoteDirectory(getBlobContainer(f)); } protected BlobContainer getBlobContainer(Path f) throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 38520e9292206..fbee13ab3b551 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -150,6 +150,16 @@ public long getRestoreThrottleTimeInNanos() { return 0; } + @Override + public long getRemoteUploadThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRemoteDownloadThrottleTimeInNanos() { + return 0; + } + @Override public String startVerification() { return null; From ee354df16a3374b2bb2a71d901ee4d3b4251bbbb Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 14:20:49 +0530 Subject: [PATCH 07/14] Test fixes and Changelog Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 1 + .../remotestore/RemoteStoreRestoreIT.java | 17 ++++---- .../remotestore/RemoteStoreThrottlingIT.java | 39 ------------------- .../multipart/RemoteStoreMultipartIT.java | 4 +- .../RemoteSegmentStoreDirectoryFactory.java | 33 +++------------- .../blobstore/BlobStoreRepository.java | 4 +- 6 files changed, 18 insertions(+), 80 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e859874d1cbf1..1cbcd6479b1ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -148,6 +148,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) +- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 183b960c8980c..2f9ba95e0d388 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -17,7 +17,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; -import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -32,9 +31,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.greaterThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { @@ -458,8 +457,6 @@ public void testRTSRestoreNoData() throws IOException { testRestoreFlow(0, true, randomIntBetween(1, 5)); } - // TODO: Restore flow - index aliases - public void testRateLimitedRemoteDownloads() throws Exception { assertAcked( client().admin() @@ -471,15 +468,13 @@ public void testRateLimitedRemoteDownloads() throws Exception { .put("location", randomRepoPath()) .put("compress", randomBoolean()) .put("max_remote_download_bytes_per_sec", "2kb") - .put("chunk_size", 100, ByteSizeUnit.BYTES) + .put("chunk_size", 200, ByteSizeUnit.BYTES) ) ); - int shardCount = randomIntBetween(1, 5); - int numberOfIteration = randomIntBetween(2, 5); - testRestoreFlow(numberOfIteration, false, shardCount); + int shardCount = randomIntBetween(1, 3); prepareCluster(0, 3, INDEX_NAME, 0, shardCount); - Map indexStats = indexData(numberOfIteration, false, INDEX_NAME); + Map indexStats = indexData(5, false, INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); ensureRed(INDEX_NAME); @@ -489,7 +484,7 @@ public void testRateLimitedRemoteDownloads() throws Exception { for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { downloadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteDownloadThrottleTimeInNanos(); } - assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos())); + assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos())); }, 30, TimeUnit.SECONDS); ensureGreen(INDEX_NAME); // This is required to get updated number from already active shards which were not restored @@ -497,4 +492,6 @@ public void testRateLimitedRemoteDownloads() throws Exception { assertEquals(0, getNumShards(INDEX_NAME).numReplicas); verifyRestoredData(indexStats, true, INDEX_NAME); } + + // TODO: Restore flow - index aliases } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java deleted file mode 100644 index 9ad1e7897ce30..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreThrottlingIT.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.remotestore; - -import org.opensearch.common.settings.Settings; -import org.opensearch.plugins.Plugin; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collection; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) -public class RemoteStoreThrottlingIT extends RemoteStoreBaseIntegTestCase { - - private static final String INDEX_NAME = "remote-store-test-idx-1"; - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - @Before - public void setup() { - setupRepo(); - } - - @Override - public Settings indexSettings() { - return remoteStoreIndexSettings(0); - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index 3833ea2537254..842a576a92a38 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -57,7 +57,7 @@ public void testRateLimitedRemoteUploads() throws Exception { Settings.builder() .put("location", repositoryLocation) .put("compress", randomBoolean()) - .put("max_remote_upload_bytes_per_sec", "200b") + .put("max_remote_upload_bytes_per_sec", "1kb") .put("chunk_size", 100, ByteSizeUnit.BYTES) ) ); @@ -76,7 +76,7 @@ public void testRateLimitedRemoteUploads() throws Exception { for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteUploadThrottleTimeInNanos(); } - assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(10, 30)).nanos())); + assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos())); }, 30, TimeUnit.SECONDS); assertThat(client.prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value, equalTo(10L)); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 1b211f6ded6a6..31b49f6813ad2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -9,9 +9,7 @@ package org.opensearch.index.store; import org.apache.lucene.store.Directory; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -24,9 +22,7 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.util.function.Supplier; -import java.util.function.UnaryOperator; /** * Factory for a remote store directory @@ -61,14 +57,14 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh BlobPath commonBlobPath = blobStoreRepository.basePath(); commonBlobPath = commonBlobPath.add(indexUUID).add(shardId).add(SEGMENTS); - RemoteDirectory dataDirectory = createRemoteDirectory( - blobStoreRepository, - commonBlobPath, - "data", + RemoteDirectory dataDirectory = new RemoteDirectory( + blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("data")), blobStoreRepository::maybeRateLimitRemoteUploadTransfers, blobStoreRepository::maybeRateLimitRemoteDownloadTransfers ); - RemoteDirectory metadataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "metadata"); + RemoteDirectory metadataDirectory = new RemoteDirectory( + blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("metadata")) + ); RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( repositoriesService.get(), repositoryName, @@ -82,24 +78,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh } } - private RemoteDirectory createRemoteDirectory( - BlobStoreRepository repository, - BlobPath commonBlobPath, - String extension, - UnaryOperator uploadRateLimiter, - UnaryOperator downLoadRateLimiter - ) { - return new RemoteDirectory( - repository.blobStore().blobContainer(commonBlobPath.add(extension)), - uploadRateLimiter, downLoadRateLimiter - ); - } - - private RemoteDirectory createRemoteDirectory( - BlobStoreRepository repository, - BlobPath commonBlobPath, - String extension - ) { + private RemoteDirectory createRemoteDirectory(BlobStoreRepository repository, BlobPath commonBlobPath, String extension) { return new RemoteDirectory(repository.blobStore().blobContainer(commonBlobPath.add(extension))); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 7ad2f8f74cd72..108a022a2612b 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1797,7 +1797,7 @@ public long getRemoteUploadThrottleTimeInNanos() { @Override public long getRemoteDownloadThrottleTimeInNanos() { - return remoteUploadRateLimitingTimeInNanos.count(); + return remoteDownloadRateLimitingTimeInNanos.count(); } protected void assertSnapshotOrGenericThread() { @@ -3028,7 +3028,7 @@ private static ActionListener fileQueueListener( } private static void mayBeLogRateLimits(BlobStoreTransferContext context, RateLimiter rateLimiter, long time) { - logger.info( + logger.debug( () -> new ParameterizedMessage( "Rate limited blob store transfer, context [{}], for duration [{} ms] for configured rate [{} MBps]", context, From f5e0508ffaac12222cffa1c09b67e1f9923cb9af Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 14:24:01 +0530 Subject: [PATCH 08/14] Add tags Signed-off-by: Bukhtawar Khan --- server/src/main/java/org/opensearch/common/StreamLimiter.java | 2 ++ .../transfer/stream/RateLimitingOffsetRangeInputStream.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java index eab1427232999..756c792ffdd57 100644 --- a/server/src/main/java/org/opensearch/common/StreamLimiter.java +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -17,6 +17,8 @@ /** * The stream limiter that limits the transfer of bytes + * + * @opensearch.internal */ public class StreamLimiter { diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java index 9d8d3cffb227a..b455999bbed0c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStream.java @@ -16,6 +16,8 @@ /** * Rate Limits an {@link OffsetRangeInputStream} + * + * @opensearch.internal */ public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream { From a959d0648442984185c3fdf36ffe3677fcefb0b0 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 14:29:03 +0530 Subject: [PATCH 09/14] Fix up minor Signed-off-by: Bukhtawar Khan --- server/src/main/java/org/opensearch/common/StreamLimiter.java | 2 -- .../index/snapshots/blobstore/RateLimitingInputStream.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java index 756c792ffdd57..b3930ba761aa9 100644 --- a/server/src/main/java/org/opensearch/common/StreamLimiter.java +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -28,8 +28,6 @@ public class StreamLimiter { private int bytesSinceLastRateLimit; - private static final Logger logger = LogManager.getLogger(StreamLimiter.class); - public StreamLimiter(Supplier rateLimiterSupplier, Listener listener) { this.rateLimiterSupplier = rateLimiterSupplier; this.listener = listener; diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java index 048a7db2d4ba4..ee601f96ecee1 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -47,7 +47,7 @@ */ public class RateLimitingInputStream extends FilterInputStream { - private StreamLimiter streamLimiter; + private final StreamLimiter streamLimiter; public RateLimitingInputStream(InputStream delegate, Supplier rateLimiterSupplier, StreamLimiter.Listener listener) { super(delegate); From d3ce29c4be3dfdc7a86af2d1f7b3c88907bd4159 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 14:42:30 +0530 Subject: [PATCH 10/14] Fix up minor Signed-off-by: Bukhtawar Khan --- server/src/main/java/org/opensearch/common/StreamLimiter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/StreamLimiter.java b/server/src/main/java/org/opensearch/common/StreamLimiter.java index b3930ba761aa9..ec203a1c30868 100644 --- a/server/src/main/java/org/opensearch/common/StreamLimiter.java +++ b/server/src/main/java/org/opensearch/common/StreamLimiter.java @@ -8,8 +8,6 @@ package org.opensearch.common; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import java.io.IOException; From 0518408b3fd661b7c13cdb6f13076fa9d39fe3de Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 15:00:56 +0530 Subject: [PATCH 11/14] Fix up minor Signed-off-by: Bukhtawar Khan --- .../java/org/opensearch/remotestore/RemoteStoreRestoreIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 6519ea9b27d29..60d7eefbb6d9b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -488,7 +488,7 @@ public void testRateLimitedRemoteDownloads() throws Exception { // This is required to get updated number from already active shards which were not restored assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); assertEquals(0, getNumShards(INDEX_NAME).numReplicas); - verifyRestoredData(indexStats, true, INDEX_NAME); + verifyRestoredData(indexStats, INDEX_NAME); } // TODO: Restore flow - index aliases From cc5ef6b1a92d20a0d311dd86c0201e2e3cc02066 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 24 Aug 2023 17:24:55 +0530 Subject: [PATCH 12/14] Fix up minor Signed-off-by: Bukhtawar Khan --- .../store/RemoteSegmentStoreDirectoryTests.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 91154e5b77641..44dfb44eb9a15 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -517,6 +517,15 @@ public void onFailure(Exception e) {} public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + remoteDataDirectory = new RemoteDirectory(blobContainer); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); + populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -528,9 +537,6 @@ public void testCopyFilesFromMultipartIOException() throws Exception { storeDirectory.sync(List.of(filename)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); - - VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); Mockito.doAnswer(invocation -> { ActionListener completionListener = invocation.getArgument(1); completionListener.onFailure(new Exception("Test exception")); From d10d2ee844854805962e975a41e5acdf8e50ee3c Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 25 Aug 2023 00:57:44 +0530 Subject: [PATCH 13/14] Tests Signed-off-by: Bukhtawar Khan --- ...teLimitingOffsetRangeInputStreamTests.java | 46 +++++++++++ .../BlobStoreRemoteTransferTests.java | 80 +++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java create mode 100644 server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java new file mode 100644 index 0000000000000..fc2eba4c35e2a --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/RateLimitingOffsetRangeInputStreamTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.store.RateLimiter; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class RateLimitingOffsetRangeInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + private Directory directory; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + directory = new NIOFSDirectory(testFile.getParent()); + } + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new RateLimitingOffsetRangeInputStream( + new OffsetRangeIndexInputStream(directory.openInput(testFile.getFileName().toString(), IOContext.DEFAULT), size, position), + () -> new RateLimiter.SimpleRateLimiter(randomIntBetween(10, 20)), + (t) -> {} + ); + } + + @Override + @After + public void tearDown() throws Exception { + directory.close(); + super.tearDown(); + } +} diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java new file mode 100644 index 0000000000000..ea9559412d4c1 --- /dev/null +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; + +import static org.hamcrest.Matchers.equalTo; + +public class BlobStoreRemoteTransferTests extends OpenSearchTestCase { + + private BlobStoreRepository repository; + + public void testDefaultThrottlingValues() throws Exception { + repository = (BlobStoreRepository) createRepository(); + try (ByteArrayInputStream input = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8))) { + InputStream is = repository.maybeRateLimitRemoteDownloadTransfers(input); + assertBusy(() -> { + is.read(); + assertThat(repository.getRemoteDownloadThrottleTimeInNanos(), equalTo(0L)); + assertThat(repository.getRemoteUploadThrottleTimeInNanos(), equalTo(0L)); + }); + + } + } + + /** Create a {@link Repository} with a random name **/ + protected Repository createRepository() { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build() + ); + } +} From cc5a597ea5b6fed77674c01a393b4b350fdc71de Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 25 Aug 2023 17:00:39 +0530 Subject: [PATCH 14/14] Tests Signed-off-by: Bukhtawar Khan --- .../index/store/RemoteDirectory.java | 2 +- .../store/RemoteSegmentStoreDirectory.java | 4 +- .../index/store/RemoteDirectoryTests.java | 89 +++++++++++++++++++ .../BlobStoreRemoteTransferTests.java | 80 ----------------- 4 files changed, 92 insertions(+), 83 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 25d8d3fbf1a0a..04b5d7eb7c6bd 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -303,9 +303,9 @@ public void delete() throws IOException { public boolean copyFrom( Directory from, String src, + String remoteFileName, IOContext context, Runnable postUploadRunner, - String remoteFileName, ActionListener listener ) { if (blobContainer instanceof VerifyingMultiStreamBlobContainer) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9cb51f9f65b56..0f6ca2a61b67d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -417,13 +417,13 @@ public IndexInput openInput(String name, IOContext context) throws IOException { public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); - boolean uploaded = remoteDataDirectory.copyFrom(from, src, context, () -> { + boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { try { postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); } catch (IOException e) { throw new RuntimeException("Exception in segment postUpload for file " + src, e); } - }, remoteFileName, listener); + }, listener); if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index b220b0891f11d..7655690685889 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -8,12 +8,17 @@ package org.opensearch.index.store; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.core.action.ActionListener; import org.opensearch.test.OpenSearchTestCase; @@ -28,9 +33,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.mockito.Mockito; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -58,6 +68,85 @@ public void testListAllEmpty() throws IOException { assertArrayEquals(expectedFileName, actualFileNames); } + public void testCopyFrom() throws IOException, InterruptedException { + AtomicReference postUploadInvoked = new AtomicReference<>(false); + String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RemoteDirectory remoteDirectory = new RemoteDirectory(blobContainer); + remoteDirectory.copyFrom( + storeDirectory, + filename, + filename, + IOContext.READ, + () -> postUploadInvoked.set(true), + new ActionListener<>() { + @Override + public void onResponse(Void t) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Listener responded with exception" + e); + } + } + ); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertTrue(postUploadInvoked.get()); + storeDirectory.close(); + } + + public void testCopyFromWithException() throws IOException, InterruptedException { + AtomicReference postUploadInvoked = new AtomicReference<>(false); + String filename = "_100.si"; + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RemoteDirectory remoteDirectory = new RemoteDirectory(blobContainer); + remoteDirectory.copyFrom( + storeDirectory, + filename, + filename, + IOContext.READ, + () -> postUploadInvoked.set(true), + new ActionListener<>() { + @Override + public void onResponse(Void t) { + fail("Listener responded with success"); + } + + @Override + public void onFailure(Exception e) { + countDownLatch.countDown(); + } + } + ); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertFalse(postUploadInvoked.get()); + storeDirectory.close(); + } + public void testListAll() throws IOException { Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java deleted file mode 100644 index ea9559412d4c1..0000000000000 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRemoteTransferTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.repositories.blobstore; - -import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.env.Environment; -import org.opensearch.env.TestEnvironment; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.repositories.Repository; -import org.opensearch.repositories.fs.FsRepository; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; - -import static org.hamcrest.Matchers.equalTo; - -public class BlobStoreRemoteTransferTests extends OpenSearchTestCase { - - private BlobStoreRepository repository; - - public void testDefaultThrottlingValues() throws Exception { - repository = (BlobStoreRepository) createRepository(); - try (ByteArrayInputStream input = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8))) { - InputStream is = repository.maybeRateLimitRemoteDownloadTransfers(input); - assertBusy(() -> { - is.read(); - assertThat(repository.getRemoteDownloadThrottleTimeInNanos(), equalTo(0L)); - assertThat(repository.getRemoteUploadThrottleTimeInNanos(), equalTo(0L)); - }); - - } - } - - /** Create a {@link Repository} with a random name **/ - protected Repository createRepository() { - Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); - final FsRepository repository = new FsRepository( - repositoryMetadata, - createEnvironment(), - xContentRegistry(), - clusterService, - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) - ) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo manually - } - }; - clusterService.addStateApplier(event -> repository.updateState(event.state())); - // Apply state once to initialize repo properly like RepositoriesService would - repository.updateState(clusterService.state()); - repository.start(); - return repository; - } - - /** Create a {@link Environment} with random path.home and path.repo **/ - private Environment createEnvironment() { - Path home = createTempDir(); - return TestEnvironment.newEnvironment( - Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) - .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) - .build() - ); - } -}