Skip to content

Commit

Permalink
[segment replication] decouple the rateLimiter of segrep and recovery…
Browse files Browse the repository at this point in the history
… (12939)

use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter"

Signed-off-by: maxliu <[email protected]>
  • Loading branch information
Ferrari248 committed Apr 2, 2024
1 parent 88e5f5c commit 7f138c3
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.recoveryRateLimiter(), listener);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,16 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<ByteSizeValue> SEGREP_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"segrep.max_bytes_per_sec",
new ByteSizeValue(0),
public static final Setting<Boolean> INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING = Setting.boolSetting(
"indices.replication.use_individual_rate_limiter",
false,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(200, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);
Expand Down Expand Up @@ -176,13 +183,14 @@ public class RecoverySettings {
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue segrepMaxBytesPerSec;
private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile boolean useReplicationIndividualRateLimiter;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter segrepRateLimiter;
private volatile SimpleRateLimiter recoveryRateLimiter;
private volatile SimpleRateLimiter replicationRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand All @@ -207,24 +215,28 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.segrepMaxBytesPerSec = SEGREP_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
this.useReplicationIndividualRateLimiter = INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.get(settings);
this.replicationMaxBytesPerSec = useReplicationIndividualRateLimiter == true ?
INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings):
INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings)
;
if (replicationMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
}

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(SEGREP_MAX_BYTES_PER_SEC_SETTING, this::setSegrepMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -243,12 +255,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

}

public RateLimiter rateLimiter() {
return rateLimiter;
public RateLimiter recoveryRateLimiter() {
return recoveryRateLimiter;
}

public RateLimiter segrepRateLimiter() {
return segrepRateLimiter;
return replicationRateLimiter;
}

public TimeValue retryDelayNetwork() {
Expand Down Expand Up @@ -314,25 +326,26 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else if (recoveryRateLimiter != null) {
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
}

private void setSegrepMaxBytesPerSec(ByteSizeValue segrepMaxBytesPerSec) {
this.segrepMaxBytesPerSec = segrepMaxBytesPerSec;
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
} else if (segrepRateLimiter != null) {
segrepRateLimiter.setMBPerSec(segrepMaxBytesPerSec.getMbFrac());
private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
if (useReplicationIndividualRateLimiter == false) return;
if (replicationMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void writeFileChunk(
if (SegmentReplicationTargetService.Actions.FILE_CHUNK.equals(action)) {
rl = recoverySettings.segrepRateLimiter();
} else {
rl = recoverySettings.rateLimiter();
rl = recoverySettings.recoveryRateLimiter();
}
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3159,7 +3159,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers(
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
restoreRateLimitingTimeInNanos,
BlobStoreTransferContext.SNAPSHOT_RESTORE
);
Expand All @@ -3182,7 +3182,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
assertEquals(null, recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.segrepRateLimiter());
}
Expand Down

0 comments on commit 7f138c3

Please sign in to comment.