Skip to content

Commit

Permalink
Make translog transfer timeout configurable using dynamic setting
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Mar 28, 2024
1 parent 8ad0dc0 commit 26b5624
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4944,7 +4944,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathType());
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathType(), remoteStoreSettings);
}

/*
Expand All @@ -4967,6 +4967,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathType(),
remoteStoreSettings,
logger
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final RemoteStoreSettings remoteStoreSettings;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand All @@ -74,7 +79,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -100,7 +101,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -113,7 +115,8 @@ public RemoteFsTranslog(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathType()
indexSettings().getRemoteStorePathType(),
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -163,6 +166,7 @@ public static void download(
ThreadPool threadPool,
Path location,
RemoteStorePathType pathType,
RemoteStoreSettings remoteStoreSettings,
Logger logger
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Expand All @@ -181,7 +185,8 @@ public static void download(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathType
pathType,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -259,15 +264,16 @@ public static TranslogTransferManager buildTranslogTransferManager(
ShardId shardId,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathType pathType
RemoteStorePathType pathType,
RemoteStoreSettings remoteStoreSettings
) {
assert Objects.nonNull(pathType);
String indexUUID = shardId.getIndex().getUUID();
String shardIdStr = String.valueOf(shardId.id());
BlobPath dataPath = pathType.path(blobStoreRepository.basePath(), indexUUID, shardIdStr, TRANSLOG, DATA);
BlobPath mdPath = pathType.path(blobStoreRepository.basePath(), indexUUID, shardIdStr, TRANSLOG, METADATA);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
}

@Override
Expand Down Expand Up @@ -539,7 +545,7 @@ private void deleteStaleRemotePrimaryTerms() {
}
}

public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathType pathType)
public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathType pathType, RemoteStoreSettings remoteStoreSettings)
throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
Expand All @@ -553,7 +559,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathType
pathType,
remoteStoreSettings
);
// clean up all remote translog files
translogTransferManager.deleteTranslogFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -60,9 +61,7 @@ public class TranslogTransferManager {
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private final RemoteStoreSettings remoteStoreSettings;
private static final int METADATA_FILES_TO_FETCH = 10;

private final Logger logger;
Expand All @@ -79,7 +78,8 @@ public TranslogTransferManager(
BlobPath remoteDataTransferPath,
BlobPath remoteMetadataTransferPath,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
this.shardId = shardId;
this.transferService = transferService;
Expand All @@ -88,6 +88,7 @@ public TranslogTransferManager(
this.fileTransferTracker = fileTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
Expand Down Expand Up @@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH);

try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
);
Expand Down
12 changes: 8 additions & 4 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ protected void closeInternal() {
repositoriesServiceSupplier,
threadPool,
remoteStoreStatsTrackerFactory,
settings
settings,
remoteStoreSettings
);
this.searchRequestStats = searchRequestStats;
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -527,22 +528,25 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
Settings settings
Settings settings,
RemoteStoreSettings remoteStoreSettings
) {
return (indexSettings, shardRouting) -> {
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls timeout value while uploading translog and checkpoint files to remote translog
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.translog.transfer_timeout",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30),
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);

this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);
}

// Exclusively for testing, please do not use it elsewhere.
public TimeValue getClusterRemoteTranslogBufferInterval() {
return clusterRemoteTranslogBufferInterval;
}
Expand All @@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}

public TimeValue getClusterRemoteTranslogTransferTimeout() {
return clusterRemoteTranslogTransferTimeout;
}

private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
repositoriesServiceReference::get,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10)
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
DefaultRemoteStoreSettings.INSTANCE
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
repository,
threadPool,
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down Expand Up @@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down
Loading

0 comments on commit 26b5624

Please sign in to comment.