From 012c4fa4ec8b719cecd4e163c5fdc0e4f42679d3 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 28 Aug 2023 16:02:18 -0700 Subject: [PATCH] [Segment Replication] Fix bug where replica shows stale doc count during engine reset. (#9495) * Fix bug where replica shows stale doc count during engine reset. This change fixes an issue where replica shards can temporarily return stale results while converting to a RO engine during an engine reset. This is possible because NRTReplicationEngine did not previously implement flush and the freshest data is only active on the reader. Fixed by implementing flush and also honoring acquireLatestCommit's flushFirst parameter. Signed-off-by: Marc Handalian * Add changelog entry. Signed-off-by: Marc Handalian * Add unit test for search during engine reset. Signed-off-by: Marc Handalian * Remove useless test. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 5 +-- .../index/engine/NRTReplicationEngine.java | 32 +++++++++++++++++-- .../engine/NRTReplicationEngineTests.java | 10 ++++-- .../index/shard/RemoteIndexShardTests.java | 10 +++--- .../SegmentReplicationIndexShardTests.java | 31 ++++++++++++++++++ 6 files changed, 76 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db57363ab605f..8dee8390cbc5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -174,6 +174,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291)) - Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) - Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019)) +- [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 69cdd80bb5085..5855ed7470559 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -143,8 +143,9 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); - refresh(INDEX_NAME); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + final SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + // new primary should have at least the doc count from the first set of segments. + assertTrue(response.getHits().getTotalHits().value >= 1); // assert we can index into the new primary. client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 48556cc6b9709..b529dfbe13bf4 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -39,6 +39,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; @@ -57,6 +59,7 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private final Lock flushLock = new ReentrantLock(); protected final ReplicaFileTracker replicaFileTracker; private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; @@ -156,7 +159,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - commitSegmentInfos(); + flush(false, true); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } @@ -184,7 +187,7 @@ private void commitSegmentInfos(SegmentInfos infos) throws IOException { translogManager.syncTranslog(); } - protected void commitSegmentInfos() throws IOException { + private void commitSegmentInfos() throws IOException { commitSegmentInfos(getLatestSegmentInfos()); } @@ -351,7 +354,27 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + ensureOpen(); + // readLock is held here to wait/block any concurrent close that acquires the writeLock. + try (final ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + if (flushLock.tryLock() == false) { + if (waitIfOngoing == false) { + return; + } + flushLock.lock(); + } + // we are now locked. + try { + commitSegmentInfos(); + } catch (IOException e) { + throw new FlushFailedEngineException(shardId, e); + } finally { + flushLock.unlock(); + } + } + } @Override public void forceMerge( @@ -365,6 +388,9 @@ public void forceMerge( @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { + if (flushFirst) { + flush(false, true); + } try { final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); return new GatedCloseable<>(indexCommit, () -> {}); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index cb93d3a8db20e..22eb5195af507 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -152,7 +152,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); // commit the infos to push us to segments_3. - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -283,7 +283,7 @@ public void testTrimTranslogOps() throws Exception { } } - public void testCommitSegmentInfos() throws Exception { + public void testFlush() throws Exception { // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints // stored in user data. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -304,7 +304,7 @@ public void testCommitSegmentInfos() throws Exception { LocalCheckpointTracker localCheckpointTracker = nrtEngine.getLocalCheckpointTracker(); final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); // ensure getLatestSegmentInfos returns an updated infos ref with correct userdata. final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); @@ -322,6 +322,10 @@ public void testCommitSegmentInfos() throws Exception { userData = committedInfos.getUserData(); assertEquals(processedCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO))); + + try (final GatedCloseable indexCommit = nrtEngine.acquireLastIndexCommit(true)) { + assertEquals(committedInfos.getGeneration() + 1, indexCommit.get().getGeneration()); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..ead9c1c22c931 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -236,7 +236,7 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { MatcherAssert.assertThat( "Replica commits infos bytes referencing latest refresh point", latestReplicaCommit.files(true), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_5") + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_6") ); MatcherAssert.assertThat( "Segments are referenced in memory", @@ -294,20 +294,20 @@ public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception { replicateSegments(primary, shards.getReplicas()); assertDocCount(primary, 1); assertDocCount(replica, 1); - assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); - assertSingleSegmentFile(replica, "segments_4"); + assertEquals("segments_5", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); primary.refresh("test"); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 2); - assertSingleSegmentFile(replica, "segments_4"); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); flushShard(primary); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 3); - assertSingleSegmentFile(replica, "segments_5"); + assertSingleSegmentFile(replica, "segments_6"); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); assertTrue(diff.missing.isEmpty()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 807b4a9cd7482..e8220830063ee 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -34,6 +34,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.replication.TestReplicationSource; @@ -71,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -773,6 +775,35 @@ public void testNoDuplicateSeqNo() throws Exception { } } + public void testQueryDuringEngineResetShowsDocs() throws Exception { + final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory(); + final NRTReplicationEngineFactory spy = spy(engineFactory); + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(10); + + final AtomicReference failed = new AtomicReference<>(); + doAnswer(ans -> { + try { + final Engine engineOrNull = replicaShard.getEngineOrNull(); + assertNotNull(engineOrNull); + assertTrue(engineOrNull instanceof ReadOnlyEngine); + shards.assertAllEqual(10); + } catch (Throwable e) { + failed.set(e); + } + return ans.callRealMethod(); + }).when(spy).newReadWriteEngine(any()); + shards.promoteReplicaToPrimary(replicaShard).get(); + assertNull("Expected correct doc count during engine reset", failed.get()); + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts.