diff --git a/CHANGELOG.md b/CHANGELOG.md index 374dd4ab57ee6..9874e7c431b7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) +- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java index 50ff76c6b62f3..82ab5b0118c0e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java @@ -37,6 +37,7 @@ public AcknowledgedResponse createDataStream(String name) throws Exception { CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name); AcknowledgedResponse response = client().admin().indices().createDataStream(request).get(); assertThat(response.isAcknowledged(), is(true)); + performRemoteStoreTestAction(); return response; } @@ -67,6 +68,7 @@ public RolloverResponse rolloverDataStream(String name) throws Exception { RolloverResponse response = client().admin().indices().rolloverIndex(request).get(); assertThat(response.isAcknowledged(), is(true)); assertThat(response.isRolledOver(), is(true)); + performRemoteStoreTestAction(); return response; } @@ -109,5 +111,4 @@ public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception { assertThat(response.isAcknowledged(), is(true)); return response; } - } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index 29786158bc73c..e9afd6d36bb87 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -9,6 +9,7 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.opensearch.cluster.ClusterState; @@ -111,7 +112,7 @@ public void testFullClusterRestore() throws Exception { * 8. Add data nodes to recover index data * 9. Verify Metadata and index data is restored. */ - public void testFullClusterStateRestore() throws Exception { + public void testFullClusterRestoreDoesntFailWithConflictingLocalState() throws Exception { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; int dataNodeCount = shardCount * (replicaCount + 1); @@ -159,7 +160,7 @@ public Settings onNodeStopped(String nodeName) { // start data nodes to trigger index data recovery internalCluster().startDataOnlyNodes(dataNodeCount); - verifyRestoredData(indexStats, INDEX_NAME); + verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true); } public void testFullClusterRestoreMultipleIndices() throws Exception { @@ -291,6 +292,14 @@ private void validateCurrentMetadata() throws Exception { }); } + public void testDataStreamPostRemoteStateRestore() throws Exception { + new DataStreamRolloverIT() { + protected boolean triggerRemoteStateRestore() { + return true; + } + }.testDataStreamRollover(); + } + public void testFullClusterRestoreGlobalMetadata() throws Exception { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 8ae25c6758195..5e91176ed0473 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -15,6 +15,8 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.FollowersChecker; +import org.opensearch.cluster.coordination.LeaderChecker; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; @@ -23,15 +25,20 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; - @Before + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + public void setup() { internalCluster().startNodes(3); } public void testStatsResponseFromAllNodes() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() { } public void testStatsResponseAllShards() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() { } public void testStatsResponseFromLocalNode() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() { } public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Disable Refresh Interval for the index @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce } public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception { + setup(); // Scenario: // - Create index with single primary and N-1 replica shards (N = no of data nodes) // - Disable Refresh Interval for the index @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr } public void testStatsOnShardRelocation() { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Index documents @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() { } public void testStatsOnShardUnassigned() throws IOException { + setup(); // Scenario: // - Create index with single primary and two replica shard // - Index documents @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException { } public void testStatsOnRemoteStoreRestore() throws IOException { + setup(); // Creating an index with primary shard count == total nodes in cluster and 0 replicas int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount)); @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException { } public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception { + setup(); // Create an index with one primary and one replica shard createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); ensureGreen(INDEX_NAME); @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce }, 5, TimeUnit.SECONDS); } + public void testStatsCorrectnessOnFailover() { + Settings clusterSettings = Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(nodeSettings(0)) + .build(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings); + internalCluster().startDataOnlyNodes(2, clusterSettings); + + // Create an index with one primary and one replica shard + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); + ensureGreen(INDEX_NAME); + + // Index some docs and refresh + indexDocs(); + refresh(INDEX_NAME); + + String primaryNode = primaryNodeName(INDEX_NAME); + String replicaNode = replicaNodeName(INDEX_NAME); + + // Start network disruption - primary node will be isolated + Set nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new)); + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + ensureStableCluster(2, clusterManagerNode); + + RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0"); + List matches = Arrays.stream(response.getRemoteStoreStats()) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(1, matches.size()); + RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats(); + assertEquals(0, segmentStats.refreshTimeLagMs); + + networkDisruption.stopDisrupting(); + internalCluster().clearDisruptionScheme(); + ensureStableCluster(3, clusterManagerNode); + ensureGreen(INDEX_NAME); + logger.info("Test completed"); + } + private void indexDocs() { for (int i = 0; i < randomIntBetween(5, 10); i++) { if (randomBoolean()) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 3d37056956c69..874713b51d627 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -215,8 +215,7 @@ public NodeStats(StreamInput in) throws IOException { } else { resourceUsageStats = null; } - // TODO: change to V_2_12_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { segmentReplicationRejectionStats = in.readOptionalWriteable(SegmentReplicationRejectionStats::new); } else { segmentReplicationRejectionStats = null; @@ -431,6 +430,7 @@ public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() { return segmentReplicationRejectionStats; } + @Nullable public RepositoriesStats getRepositoriesStats() { return repositoriesStats; } @@ -481,8 +481,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(resourceUsageStats); } - // TODO: change to V_2_12_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(segmentReplicationRejectionStats); } if (out.getVersion().onOrAfter(Version.V_2_12_0)) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7ac7da819b215..a0fca4f0a2ff0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 83bf8c82ee3dd..62e8faf33e1fa 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for remote translog IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, + IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, // Settings for remote store enablement IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b11015c099c25..025ad075d83b6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - // TODO make this two variable as dynamic setting [issue: #10688] - public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; - public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.index_metadata.upload_timeout", + INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.global_metadata.upload_timeout", + GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable { private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; + private volatile TimeValue indexMetadataUploadTimeout; + private volatile TimeValue globalMetadataUploadTimeout; + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; @@ -171,7 +188,11 @@ public RemoteClusterStateService( this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); + this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); + this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); + clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -367,7 +388,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException ); try { - if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { // TODO: We should add metrics where transfer is timing out. [Issue: #10687] GlobalMetadataTransferException ex = new GlobalMetadataTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") @@ -422,7 +443,7 @@ private List writeIndexMetadataParallel(ClusterState clus } try { - if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { IndexMetadataTransferException ex = new IndexMetadataTransferException( String.format( Locale.ROOT, @@ -621,6 +642,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } + private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { + this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; + } + + private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) { + this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout; + } + + public TimeValue getIndexMetadataUploadTimeout() { + return this.indexMetadataUploadTimeout; + } + + public TimeValue getGlobalMetadataUploadTimeout() { + return this.globalMetadataUploadTimeout; + } + static String getManifestFileName(long term, long version, boolean committed) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ return String.join( @@ -732,7 +769,10 @@ public Metadata getLatestMetadata(String clusterName, String clusterUUID) { // Fetch Index Metadata Map indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get()); - return Metadata.builder(globalMetadata).indices(indices).build(); + Map indexMetadataMap = new HashMap<>(); + indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); }); + + return Metadata.builder(globalMetadata).indices(indexMetadataMap).build(); } private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 99d2b5a74c406..00e765d73f77f 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -668,6 +668,14 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final Setting INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting( + "index.remote_store.translog.keep_extra_gen", + 100, + 0, + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -680,6 +688,7 @@ public static IndexMergePolicy fromString(String text) { private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; + private int remoteTranslogKeepExtraGen; private Version extendedCompatibilitySnapshotVersion; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock @@ -850,6 +859,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); + this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { @@ -1021,6 +1031,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setRemoteTranslogUploadBufferInterval ); + scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1300,6 +1311,10 @@ public TimeValue getRemoteTranslogUploadBufferInterval() { return remoteTranslogUploadBufferInterval; } + public int getRemoteTranslogExtraKeep() { + return remoteTranslogKeepExtraGen; + } + /** * Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set. */ @@ -1311,6 +1326,10 @@ public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUpload this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval; } + public void setRemoteTranslogKeepExtraGen(int extraGen) { + this.remoteTranslogKeepExtraGen = extraGen; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java index 9f9f150ebe2d7..492f253bbcb7c 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java @@ -34,8 +34,7 @@ public SegmentReplicationRejectionStats(final long totalRejectionCount) { } public SegmentReplicationRejectionStats(StreamInput in) throws IOException { - // TODO: change to V_2_12_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.totalRejectionCount = in.readVLong(); } } @@ -53,8 +52,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - // TODO: change to V_2_12_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeVLong(totalRejectionCount); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java index 1a9896540212e..4214a87049350 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() { ); } + @Override + public String toString() { + return "RemoteTranslogTransferStats{" + + "lastSuccessfulUploadTimestamp=" + + lastSuccessfulUploadTimestamp.get() + + "," + + "totalUploadsStarted=" + + totalUploadsStarted.get() + + "," + + "totalUploadsSucceeded=" + + totalUploadsSucceeded.get() + + "," + + "totalUploadsFailed=" + + totalUploadsFailed.get() + + "," + + "uploadBytesStarted=" + + uploadBytesStarted.get() + + "," + + "uploadBytesFailed=" + + uploadBytesFailed.get() + + "," + + "totalUploadTimeInMillis=" + + totalUploadTimeInMillis.get() + + "," + + "uploadBytesMovingAverage=" + + uploadBytesMovingAverageReference.get().getAverage() + + "," + + "uploadBytesPerSecMovingAverage=" + + uploadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "uploadTimeMovingAverage=" + + uploadTimeMsMovingAverageReference.get().getAverage() + + "," + + "lastSuccessfulDownloadTimestamp=" + + lastSuccessfulDownloadTimestamp.get() + + "," + + "totalDownloadsSucceeded=" + + totalDownloadsSucceeded.get() + + "," + + "downloadBytesSucceeded=" + + downloadBytesSucceeded.get() + + "," + + "totalDownloadTimeInMillis=" + + totalDownloadTimeInMillis.get() + + "," + + "downloadBytesMovingAverage=" + + downloadBytesMovingAverageReference.get().getAverage() + + "," + + "downloadBytesPerSecMovingAverage=" + + downloadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "downloadTimeMovingAverage=" + + downloadTimeMsMovingAverageReference.get().getAverage() + + "," + + "}"; + } + /** * Represents the tracker's state as seen in the stats API. * diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f990a3b56e856..fb4e9056153aa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE * @throws IOException if exception occurs while reading segments from remote store. */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { + boolean syncSegmentSuccess = false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } + syncSegmentSuccess = true; } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { + logger.trace( + "syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}", + syncSegmentSuccess, + (System.currentTimeMillis() - startTimeMs) + ); store.decRef(); remoteStore.decRef(); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index c650edc31da8d..3e97b07abfb5d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -123,14 +123,13 @@ public void beforeRefresh() throws IOException {} @Override protected void runAfterRefreshExactlyOnce(boolean didRefresh) { - if (shouldSync(didRefresh)) { + // We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean + // from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete + // ready state. + if (shouldSync(didRefresh) && isReadyForUpload()) { segmentTracker.updateLocalRefreshTimeAndSeqNo(); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); - } + initializeRemoteDirectoryOnTermUpdate(); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); updateLocalSizeMapAndTracker(localSegmentsPostRefresh); @@ -160,20 +159,20 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { } private boolean shouldSync(boolean didRefresh) { - // The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it - // is important to upload the zero state segments so that the restore does not break. return this.primaryTerm != indexShard.getOperationPrimaryTerm() + // If the readers change, didRefresh is always true. || didRefresh - || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + // The third condition exists for uploading the zero state segments where the refresh has not changed the reader + // reference, but it is important to upload the zero state segments so that the restore does not break. + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty() + // When the shouldSync is called the first time, then 1st condition on primary term is true. But after that + // we update the primary term and the same condition would not evaluate to true again in syncSegments. + // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. + || isRefreshAfterCommitSafe(); } private boolean syncSegments() { - if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { - logger.debug( - "Skipped syncing segments with primaryMode={} indexShardState={}", - indexShard.getReplicationTracker().isPrimaryMode(), - indexShard.state() - ); + if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event // When primary shard is restored from remote store, the recovery happens first followed by changing // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through @@ -323,6 +322,19 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } + /** + * Returns if the current refresh has happened after a commit. + * @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false. + */ + private boolean isRefreshAfterCommitSafe() { + try { + return isRefreshAfterCommit(); + } catch (Exception e) { + logger.info("Exception occurred in isRefreshAfterCommitSafe", e); + } + return false; + } + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); @@ -439,6 +451,48 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB } } + /** + * On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that + * has been uploaded to remote store successfully. This method also updates the segment tracker about the latest + * uploaded segment files onto remote store. + */ + private void initializeRemoteDirectoryOnTermUpdate() throws IOException { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init(); + + // During failover, the uploaded metadata would have names of files that have been uploaded to remote store. + // Here we update the tracker with latest remote uploaded files. + if (uploadedMetadata != null) { + segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet()); + } + } + } + + /** + * This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the + * returned value of this method for scheduling retries in syncSegments method. + * @return true iff primaryMode is true and index shard is not in closed state. + */ + private boolean isReadyForUpload() { + boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED; + if (isReady == false) { + StringBuilder sb = new StringBuilder("Skipped syncing segments with"); + if (indexShard.getReplicationTracker() != null) { + sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode()); + } + if (indexShard.state() != null) { + sb.append(" indexShardState=").append(indexShard.state()); + } + if (indexShard.getEngineOrNull() != null) { + sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); + } + logger.trace(sb.toString()); + } + return isReady; + } + /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2dd9b1a545d4a..a305a774f5854 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -161,6 +161,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t remoteTranslogTransferTracker ); RemoteFsTranslog.download(translogTransferManager, location, logger); + logger.trace(remoteTranslogTransferTracker.toString()); } static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { @@ -173,15 +174,20 @@ static void download(TranslogTransferManager translogTransferManager, Path locat */ IOException ex = null; for (int i = 0; i <= DOWNLOAD_RETRIES; i++) { + boolean success = false; + long startTimeMs = System.currentTimeMillis(); try { downloadOnce(translogTransferManager, location, logger); + success = true; return; } catch (FileNotFoundException | NoSuchFileException e) { // continue till download retries ex = e; + } finally { + logger.trace("downloadOnce success={} timeElapsed={}", success, (System.currentTimeMillis() - startTimeMs)); } } - logger.debug("Exhausted all download retries during translog/checkpoint file download"); + logger.info("Exhausted all download retries during translog/checkpoint file download"); throw ex; } @@ -425,7 +431,7 @@ public void trimUnreferencedReaders() throws IOException { // cleans up remote translog files not referenced in latest uploaded metadata. // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); - for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { + for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) { if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { break; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 8f6185c0b88fd..173e15b8eca37 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -102,6 +102,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RemoteClusterStateService remoteClusterStateService; + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; @@ -132,6 +133,7 @@ public void setup() { .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), @@ -149,7 +151,7 @@ public void setup() { "test-node-id", repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool ); @@ -826,9 +828,9 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { ).getIndices(); assertEquals(indexMetadataMap.size(), 1); - assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName()); - assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfShards(), indexMetadata.getNumberOfShards()); - assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfReplicas(), indexMetadata.getNumberOfReplicas()); + assertEquals(indexMetadataMap.get(index.getName()).getIndex().getName(), index.getName()); + assertEquals(indexMetadataMap.get(index.getName()).getNumberOfShards(), indexMetadata.getNumberOfShards()); + assertEquals(indexMetadataMap.get(index.getName()).getNumberOfReplicas(), indexMetadata.getNumberOfReplicas()); } public void testMarkLastStateAsCommittedSuccess() throws IOException { @@ -1053,6 +1055,38 @@ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Excepti assertBusy(() -> assertEquals(1, callCount.get())); } + public void testIndexMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteClusterStateService.getIndexMetadataUploadTimeout() + ); + + // verify update index metadata upload timeout + int indexMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds()); + } + + public void testGlobalMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteClusterStateService.getGlobalMetadataUploadTimeout() + ); + + // verify update global metadata upload timeout + int globalMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds()); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 42e0df2dc90c1..3cb65610fab58 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -97,6 +97,7 @@ import java.util.zip.CheckedInputStream; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -124,6 +125,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + + AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; BlobStoreTransferService blobStoreTransferService; @@ -163,13 +166,13 @@ public void tearDown() throws Exception { private RemoteFsTranslog create(Path path) throws IOException { final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return create(path, createRepository(), translogUUID); + return create(path, createRepository(), translogUUID, 0); } - private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, int extraGenToKeep) throws IOException { this.repository = repository; globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogConfig translogConfig = getTranslogConfig(path, extraGenToKeep); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); @@ -185,10 +188,17 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin primaryMode::get, new RemoteTranslogTransferTracker(shardId, 10) ); + } + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + return create(path, repository, translogUUID, 0); } private TranslogConfig getTranslogConfig(final Path path) { + return getTranslogConfig(path, 0); + } + + private TranslogConfig getTranslogConfig(final Path path, int gensToKeep) { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) // only randomize between nog age retention and a long one, so failures will have a chance of reproducing @@ -196,6 +206,7 @@ private TranslogConfig getTranslogConfig(final Path path) { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), gensToKeep) .build(); return getTranslogConfig(path, settings); } @@ -372,6 +383,111 @@ public void testSimpleOperations() throws IOException { } + private TranslogConfig getConfig(int gensToKeep) { + Path tempDir = createTempDir(); + final TranslogConfig temp = getTranslogConfig(tempDir, gensToKeep); + final TranslogConfig config = new TranslogConfig( + temp.getShardId(), + temp.getTranslogPath(), + temp.getIndexSettings(), + temp.getBigArrays(), + new ByteSizeValue(1, ByteSizeUnit.KB), + "" + ); + return config; + } + + private ChannelFactory getChannelFactory() { + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + writeCalls.incrementAndGet(); + return super.write(src); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + return channelFactory; + } + + public void testExtraGenToKeep() throws Exception { + TranslogConfig config = getConfig(1); + ChannelFactory channelFactory = getChannelFactory(); + final Set persistedSeqNos = new HashSet<>(); + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + channelFactory, + primaryTerm.get() + ); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); + ArrayList ops = new ArrayList<>(); + try ( + RemoteFsTranslog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + persistedSeqNos::add, + repository, + threadPool, + () -> Boolean.TRUE, + new RemoteTranslogTransferTracker(shardId, 10) + ) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + } + ) { + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 })); + + // expose the new checkpoint (simulating a commit), before we trim the translog + translog.setMinSeqNoToKeep(2); + + // Trims from local + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 3, primaryTerm.get(), new byte[] { 1 })); + + // Trims from remote now + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + + } + } + public void testReadLocation() throws IOException { ArrayList ops = new ArrayList<>(); ArrayList locs = new ArrayList<>(); @@ -619,14 +735,22 @@ public void testSimpleOperationsUpload() throws Exception { // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); assertEquals(2, translog.stats().estimatedNumberOfOperations()); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); translog.setMinSeqNoToKeep(2); - - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); - assertBusy(() -> assertEquals(4, translog.allUploaded().size())); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + } public void testMetadataFileDeletion() throws Exception { @@ -1273,49 +1397,10 @@ public void testTranslogWriter() throws IOException { } public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { - Path tempDir = createTempDir(); - final TranslogConfig temp = getTranslogConfig(tempDir); - final TranslogConfig config = new TranslogConfig( - temp.getShardId(), - temp.getTranslogPath(), - temp.getIndexSettings(), - temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB), - "" - ); - + final TranslogConfig config = getConfig(1); final Set persistedSeqNos = new HashSet<>(); - final AtomicInteger writeCalls = new AtomicInteger(); - - final ChannelFactory channelFactory = (file, openOption) -> { - FileChannel delegate = FileChannel.open(file, openOption); - boolean success = false; - try { - // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation - final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); - - final FileChannel channel; - if (isCkpFile) { - channel = delegate; - } else { - channel = new FilterFileChannel(delegate) { - - @Override - public int write(ByteBuffer src) throws IOException { - writeCalls.incrementAndGet(); - return super.write(src); - } - }; - } - success = true; - return channel; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(delegate); - } - } - }; - + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = getChannelFactory(); String translogUUID = Translog.createEmptyTranslog( config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 63d8f069bebea..952cd6c085966 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -1871,6 +1871,18 @@ public void stopAllNodes() { } } + /** + * Replace all nodes by stopping all current node and starting new node. + * Used for remote store test cases, where remote state is restored. + */ + public void resetCluster() { + int totalClusterManagerNodes = numClusterManagerNodes(); + int totalDataNodes = numDataNodes(); + stopAllNodes(); + startClusterManagerOnlyNodes(totalClusterManagerNodes); + startDataOnlyNodes(totalDataNodes); + } + private synchronized void startAndPublishNodesAndClients(List nodeAndClients) { if (nodeAndClients.size() > 0) { final int newClusterManagers = (int) nodeAndClients.stream() diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index c16cc1d2a5fba..ad27d9834f159 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -789,6 +789,30 @@ protected Settings featureFlagSettings() { return featureSettings.build(); } + /** + * Represent if it needs to trigger remote state restore or not. + * For tests with remote store enabled domain, it will be overridden to true. + * + * @return if needs to perform remote state restore or not + */ + protected boolean triggerRemoteStateRestore() { + return false; + } + + /** + * For tests with remote cluster state, it will reset the cluster and cluster state will be + * restored from remote. + */ + protected void performRemoteStoreTestAction() { + if (triggerRemoteStateRestore()) { + String clusterUUIDBefore = clusterService().state().metadata().clusterUUID(); + internalCluster().resetCluster(); + String clusterUUIDAfter = clusterService().state().metadata().clusterUUID(); + // assertion that UUID is changed post restore. + assertFalse(clusterUUIDBefore.equals(clusterUUIDAfter)); + } + } + /** * Creates one or more indices and asserts that the indices are acknowledged. If one of the indices * already exists this method will fail and wipe all the indices created so far.