From 2c8c81890cc394d22618fdef238d8da82bd0ac5d Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 3 Apr 2024 12:02:48 +0530 Subject: [PATCH] [Remote Store] Make translog transfer timeout configurable using dynamic setting (#12704) Signed-off-by: Sachin Kale --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 3 +- .../opensearch/index/shard/IndexShard.java | 3 +- ...emoteBlobStoreInternalTranslogFactory.java | 10 ++- .../index/translog/RemoteFsTranslog.java | 28 ++++-- .../transfer/TranslogTransferManager.java | 11 +-- .../opensearch/indices/IndicesService.java | 12 ++- .../indices/RemoteStoreSettings.java | 27 +++++- .../opensearch/index/IndexModuleTests.java | 3 +- .../index/translog/RemoteFsTranslogTests.java | 13 ++- .../TranslogTransferManagerTests.java | 87 +++++++------------ ...RemoteStoreSettingsDynamicUpdateTests.java | 30 +++++++ .../index/shard/IndexShardTestCase.java | 3 +- 13 files changed, 147 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07f6d949dedc5..6987597baeaba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6d62bd4a884fc..299a828bdd8a4 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -731,7 +731,8 @@ public void apply(Settings value, Settings current, Settings previous) { IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 113656b5b0302..994eefab7290a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4988,7 +4988,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException { TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); - RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy()); + RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings); } /* @@ -5011,6 +5011,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { getThreadPool(), shardPath().resolveTranslog(), indexSettings.getRemoteStorePathStrategy(), + remoteStoreSettings, logger ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e100ffaabf13d..4599aa32325c1 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; + private final RemoteStoreSettings remoteStoreSettings; + public RemoteBlobStoreInternalTranslogFactory( Supplier repositoriesServiceSupplier, ThreadPool threadPool, String repositoryName, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + RemoteStoreSettings remoteStoreSettings ) { Repository repository; try { @@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory( this.repository = repository; this.threadPool = threadPool; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.remoteStoreSettings = remoteStoreSettings; } @Override @@ -74,7 +79,8 @@ public Translog newTranslog( blobStoreRepository, threadPool, startedPrimarySupplier, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + remoteStoreSettings ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 7f6e17a572880..da905b9605dfd 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -29,6 +29,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -101,7 +102,8 @@ public RemoteFsTranslog( BlobStoreRepository blobStoreRepository, ThreadPool threadPool, BooleanSupplier startedPrimarySupplier, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + RemoteStoreSettings remoteStoreSettings ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); @@ -114,7 +116,8 @@ public RemoteFsTranslog( shardId, fileTransferTracker, remoteTranslogTransferTracker, - indexSettings().getRemoteStorePathStrategy() + indexSettings().getRemoteStorePathStrategy(), + remoteStoreSettings ); try { download(translogTransferManager, location, logger); @@ -164,6 +167,7 @@ public static void download( ThreadPool threadPool, Path location, RemoteStorePathStrategy pathStrategy, + RemoteStoreSettings remoteStoreSettings, Logger logger ) throws IOException { assert repository instanceof BlobStoreRepository : String.format( @@ -182,7 +186,8 @@ public static void download( shardId, fileTransferTracker, remoteTranslogTransferTracker, - pathStrategy + pathStrategy, + remoteStoreSettings ); RemoteFsTranslog.download(translogTransferManager, location, logger); logger.trace(remoteTranslogTransferTracker.toString()); @@ -282,7 +287,8 @@ public static TranslogTransferManager buildTranslogTransferManager( ShardId shardId, FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker tracker, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + RemoteStoreSettings remoteStoreSettings ) { assert Objects.nonNull(pathStrategy); String indexUUID = shardId.getIndex().getUUID(); @@ -304,7 +310,7 @@ public static TranslogTransferManager buildTranslogTransferManager( .build(); BlobPath mdPath = pathStrategy.generatePath(mdPathInput); BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool); - return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker); + return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings); } @Override @@ -576,8 +582,13 @@ private void deleteStaleRemotePrimaryTerms() { } } - public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy) - throws IOException { + public static void cleanup( + Repository repository, + ShardId shardId, + ThreadPool threadPool, + RemoteStorePathStrategy pathStrategy, + RemoteStoreSettings remoteStoreSettings + ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; // We use a dummy stats tracker to ensure the flow doesn't break. @@ -590,7 +601,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th shardId, fileTransferTracker, remoteTranslogTransferTracker, - pathStrategy + pathStrategy, + remoteStoreSettings ); // clean up all remote translog files translogTransferManager.deleteTranslogFiles(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index c9e07ca3ef8c1..1087244623b87 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -28,6 +28,7 @@ import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -60,9 +61,7 @@ public class TranslogTransferManager { private final BlobPath remoteMetadataTransferPath; private final FileTransferTracker fileTransferTracker; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; - - private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; - + private final RemoteStoreSettings remoteStoreSettings; private static final int METADATA_FILES_TO_FETCH = 10; private final Logger logger; @@ -79,7 +78,8 @@ public TranslogTransferManager( BlobPath remoteDataTransferPath, BlobPath remoteMetadataTransferPath, FileTransferTracker fileTransferTracker, - RemoteTranslogTransferTracker remoteTranslogTransferTracker + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + RemoteStoreSettings remoteStoreSettings ) { this.shardId = shardId; this.transferService = transferService; @@ -88,6 +88,7 @@ public TranslogTransferManager( this.fileTransferTracker = fileTransferTracker; this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.remoteStoreSettings = remoteStoreSettings; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); try { - if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { + if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) { Exception ex = new TranslogUploadFailedException( "Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete" ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 3bc38c8fba7ff..cc5f408b04647 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -517,7 +517,8 @@ protected void closeInternal() { repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory, - settings + settings, + remoteStoreSettings ); this.searchRequestStats = searchRequestStats; this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); @@ -547,7 +548,8 @@ private static BiFunction getTrans Supplier repositoriesServiceSupplier, ThreadPool threadPool, RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - Settings settings + Settings settings, + RemoteStoreSettings remoteStoreSettings ) { return (indexSettings, shardRouting) -> { if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { @@ -555,14 +557,16 @@ private static BiFunction getTrans repositoriesServiceSupplier, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()) + remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()), + remoteStoreSettings ); } else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) { return new RemoteBlobStoreInternalTranslogFactory( repositoriesServiceSupplier, threadPool, RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()), - remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()) + remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()), + remoteStoreSettings ); } return new InternalTranslogFactory(); diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 5e6dba2b398db..7f2121093f8e8 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -54,8 +54,20 @@ public class RemoteStoreSettings { Property.Dynamic ); + /** + * Controls timeout value while uploading translog and checkpoint files to remote translog + */ + public static final Setting CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.translog.transfer_timeout", + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(30), + Property.NodeScope, + Property.Dynamic + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; + private volatile TimeValue clusterRemoteTranslogTransferTimeout; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, this::setMinRemoteSegmentMetadataFiles ); + + this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, + this::setClusterRemoteTranslogTransferTimeout + ); } - // Exclusively for testing, please do not use it elsewhere. public TimeValue getClusterRemoteTranslogBufferInterval() { return clusterRemoteTranslogBufferInterval; } @@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles) public int getMinRemoteSegmentMetadataFiles() { return this.minRemoteSegmentMetadataFiles; } + + public TimeValue getClusterRemoteTranslogTransferTimeout() { + return clusterRemoteTranslogTransferTimeout; + } + + private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) { + this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; + } } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 885209cd63f81..4593416bfc198 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -236,7 +236,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { repositoriesServiceReference::get, threadPool, indexSettings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 10) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 10), + DefaultRemoteStoreSettings.INSTANCE ); } return new InternalTranslogFactory(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 6757287db546e..28979a3dc4f28 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -50,6 +50,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.TranslogUploadFailedException; +import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin repository, threadPool, primaryMode::get, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + DefaultRemoteStoreSettings.INSTANCE ); } @@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + DefaultRemoteStoreSettings.INSTANCE ) { @Override ChannelFactory getChannelFactory() { @@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + DefaultRemoteStoreSettings.INSTANCE ) { @Override ChannelFactory getChannelFactory() { @@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException { repository, threadPool, () -> Boolean.TRUE, - new RemoteTranslogTransferTracker(shardId, 10) + new RemoteTranslogTransferTracker(shardId, 10), + DefaultRemoteStoreSettings.INSTANCE ) { @Override ChannelFactory getChannelFactory() { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 49719017ce736..81ae479d018b0 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -18,6 +18,7 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; @@ -27,6 +28,8 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.indices.DefaultRemoteStoreSettings; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -100,7 +103,8 @@ public void setUp() throws Exception { remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE ); delayForBlobDownload = 1; @@ -165,7 +169,8 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -188,20 +193,36 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { public void testTransferSnapshotOnUploadTimeout() throws Exception { doAnswer(invocationOnMock -> { - Thread.sleep(31 * 1000); + Set transferFileSnapshots = invocationOnMock.getArgument(0); + ActionListener listener = invocationOnMock.getArgument(2); + Runnable runnable = () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (TransferFileSnapshot transferFileSnapshot : transferFileSnapshots) { + listener.onResponse(transferFileSnapshot); + } + }; + Thread t = new Thread(runnable); + t.start(); return null; }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker( new ShardId("index", "indexUUid", 0), remoteTranslogTransferTracker ); + RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class); + when(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()).thenReturn(new TimeValue(1)); TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + remoteStoreSettings ); SetOnce exception = new SetOnce<>(); translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -243,7 +264,8 @@ public void testTransferSnapshotOnThreadInterrupt() throws Exception { remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE ); SetOnce exception = new SetOnce<>(); @@ -336,14 +358,6 @@ public String toString() { } public void testReadMetadataNoFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); @@ -358,14 +372,6 @@ public void testReadMetadataNoFile() throws IOException { // This should happen most of the time - public void testReadMetadataFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); TranslogTransferMetadata metadata1 = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename1 = metadata1.getFileName(); @@ -395,14 +401,6 @@ public void testReadMetadataFile() throws IOException { } public void testReadMetadataReadException() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); @@ -432,15 +430,6 @@ public void testMetadataFileNameOrder() throws IOException { } public void testReadMetadataListException() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); - doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onFailure(new IOException("Issue while listing")); @@ -512,7 +501,8 @@ public void testDeleteTranslogSuccess() throws Exception { remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -526,14 +516,6 @@ public void testDeleteTranslogSuccess() throws Exception { } public void testDeleteStaleTranslogMetadata() { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName(); @@ -584,7 +566,8 @@ public void testDeleteTranslogFailure() throws Exception { remoteBaseTransferPath.add(TRANSLOG.getName()), remoteBaseTransferPath.add(METADATA.getName()), tracker, - remoteTranslogTransferTracker + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -622,14 +605,6 @@ public void testGetPrimaryTermAndGeneration() { } public void testMetadataConflict() throws InterruptedException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - null, - remoteTranslogTransferTracker - ); TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2, "node--1"); String mdFilename = tm.getFileName(); long count = mdFilename.chars().filter(ch -> ch == METADATA_SEPARATOR.charAt(0)).count(); diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index 3809a44e55901..8a77d97f88d67 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -10,8 +10,11 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING; + public class RemoteStoreSettingsDynamicUpdateTests extends OpenSearchTestCase { private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(Settings.EMPTY, clusterSettings); @@ -66,4 +69,31 @@ public void testSegmentMetadataRetention() { ); assertEquals(15, remoteStoreSettings.getMinRemoteSegmentMetadataFiles()); } + + public void testClusterRemoteTranslogTransferTimeout() { + // Test default value + assertEquals(TimeValue.timeValueSeconds(30), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); + + // Test override with valid value + clusterSettings.applySettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.getKey(), "40s").build()); + assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); + + // Test override with value less than minimum + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.getKey(), "10s").build() + ) + ); + assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); + + // Test override with invalid time value + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.getKey(), "123").build() + ) + ); + assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 45c0f1eb69f49..6b609d8af62a1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -682,7 +682,8 @@ protected IndexShard newShard( () -> mockRepoSvc, threadPool, settings.getRemoteStoreTranslogRepository(), - new RemoteTranslogTransferTracker(shardRouting.shardId(), 20) + new RemoteTranslogTransferTracker(shardRouting.shardId(), 20), + DefaultRemoteStoreSettings.INSTANCE ); } return new InternalTranslogFactory();