Skip to content

Commit

Permalink
Rate Limiter integration for remote transfer (#9448) (#9551)
Browse files Browse the repository at this point in the history
* Rate Limiter integration for remote transfer, introduces repository settings to rate limit remote store uploads and downloads

Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar authored Aug 25, 2023
1 parent fb739fc commit c0aba88
Show file tree
Hide file tree
Showing 18 changed files with 686 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add jdk.incubator.vector module support for JDK 20+ ([#8601](https://github.com/opensearch-project/OpenSearch/pull/8601))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
protected final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -26,9 +29,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -450,5 +455,41 @@ public void testRTSRestoreDataOnlyInTranslog() throws IOException {
testRestoreFlow(0, true, randomIntBetween(1, 5));
}

public void testRateLimitedRemoteDownloads() throws Exception {
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType("fs")
.setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("max_remote_download_bytes_per_sec", "2kb")
.put("chunk_size", 200, ByteSizeUnit.BYTES)

)
);
int shardCount = randomIntBetween(1, 3);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(5, false, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);
restore(INDEX_NAME);
assertBusy(() -> {
long downloadPauseTime = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
downloadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteDownloadThrottleTimeInNanos();
}
assertThat(downloadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos()));
}, 30, TimeUnit.SECONDS);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(0, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, INDEX_NAME);
}

// TODO: Restore flow - index aliases
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@

package org.opensearch.remotestore.multipart;

import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.RepositoriesService;

import java.nio.file.Path;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class RemoteStoreMultipartIT extends RemoteStoreIT {

Expand All @@ -35,4 +42,43 @@ protected void putRepository(Path path) {
.setSettings(Settings.builder().put("location", path))
);
}

public void testRateLimitedRemoteUploads() throws Exception {
internalCluster().startDataOnlyNodes(1);
Client client = client();
logger.info("--> updating repository");
Path repositoryLocation = randomRepoPath();
assertAcked(
client.admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("max_remote_upload_bytes_per_sec", "1kb")
.put("chunk_size", 100, ByteSizeUnit.BYTES)
)
);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 10; i++) {
index(INDEX_NAME, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
// check if throttling is active
assertBusy(() -> {
long uploadPauseTime = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getRemoteUploadThrottleTimeInNanos();
}
assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos()));
}, 30, TimeUnit.SECONDS);

assertThat(client.prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value, equalTo(10L));
}
}
56 changes: 56 additions & 0 deletions server/src/main/java/org/opensearch/common/StreamLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import org.apache.lucene.store.RateLimiter;

import java.io.IOException;
import java.util.function.Supplier;

/**
* The stream limiter that limits the transfer of bytes
*
* @opensearch.internal
*/
public class StreamLimiter {

private final Supplier<RateLimiter> rateLimiterSupplier;

private final StreamLimiter.Listener listener;

private int bytesSinceLastRateLimit;

public StreamLimiter(Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

public void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
}

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Rate Limits an {@link OffsetRangeInputStream}
*
* @opensearch.internal
*/
public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream {

private final StreamLimiter streamLimiter;

private final OffsetRangeInputStream delegate;

/**
* The ctor for RateLimitingOffsetRangeInputStream
* @param delegate the underlying {@link OffsetRangeInputStream}
* @param rateLimiterSupplier the supplier for {@link RateLimiter}
* @param listener the listener to be invoked on rate limits
*/
public RateLimitingOffsetRangeInputStream(
OffsetRangeInputStream delegate,
Supplier<RateLimiter> rateLimiterSupplier,
StreamLimiter.Listener listener
) {
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
this.delegate = delegate;
}

@Override
public int read() throws IOException {
int b = delegate.read();
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = delegate.read(b, off, len);
if (n > 0) {
streamLimiter.maybePause(n);
}
return n;
}

@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}

@Override
public long getFilePointer() throws IOException {
return delegate.getFilePointer();
}

@Override
public synchronized void reset() throws IOException {
delegate.reset();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.snapshots.blobstore;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.FilterInputStream;
import java.io.IOException;
Expand All @@ -46,53 +47,25 @@
*/
public class RateLimitingInputStream extends FilterInputStream {

private final Supplier<RateLimiter> rateLimiterSupplier;
private final StreamLimiter streamLimiter;

private final Listener listener;

private long bytesSinceLastRateLimit;

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}

public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, StreamLimiter.Listener listener) {
super(delegate);
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

private void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
}

@Override
public int read() throws IOException {
int b = super.read();
maybePause(1);
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = super.read(b, off, len);
if (n > 0) {
maybePause(n);
streamLimiter.maybePause(n);
}
return n;
}
Expand Down
Loading

0 comments on commit c0aba88

Please sign in to comment.