From 55881a19a46b42f7ff4b948b7faaceba34e09d2e Mon Sep 17 00:00:00 2001 From: Nicholas Walter Knize Date: Wed, 9 Feb 2022 08:42:02 -0600 Subject: [PATCH 1/3] Always use Lucene index in peer recovery With soft deletes no longer optional, peer recovery is switched to always use the lucene index instead of replaying operations from the translog. Signed-off-by: Nicholas Walter Knize --- .../index/seqno/RetentionLeaseIT.java | 7 +- .../org/opensearch/index/engine/Engine.java | 56 +-------- .../index/engine/InternalEngine.java | 88 +-------------- .../index/engine/ReadOnlyEngine.java | 30 +---- .../opensearch/index/shard/IndexShard.java | 34 ++---- .../index/shard/PrimaryReplicaSyncer.java | 8 +- .../recovery/RecoverySourceHandler.java | 39 +++---- .../indices/recovery/RecoveryTarget.java | 10 +- .../index/engine/InternalEngineTests.java | 6 +- .../IndexLevelReplicationTests.java | 18 +-- .../shard/PrimaryReplicaSyncerTests.java | 46 -------- .../indices/recovery/RecoveryTests.java | 106 +----------------- .../index/engine/EngineTestCase.java | 8 +- 13 files changed, 51 insertions(+), 405 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java b/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java index e1c56129c9f4b..fbacea3670ddb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java @@ -43,7 +43,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -122,7 +121,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -175,7 +174,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -186,7 +185,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); primary.removeRetentionLease(id, countDownLatchListener(latch)); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.remove(id); latch.await(); retentionLock.close(); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 1f330990348dc..811a2c10dfda7 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -730,7 +730,7 @@ public enum SearcherScope { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource); + public abstract Closeable acquireHistoryRetentionLock(); /** * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). @@ -744,51 +744,7 @@ public abstract Translog.Snapshot newChangesSnapshot( boolean requiredFullRange ) throws IOException; - /** - * Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting - * seqno range (both inclusive). - */ - public Translog.Snapshot newChangesSnapshot( - String source, - HistorySource historySource, - MapperService mapperService, - long fromSeqNo, - long toSeqNo, - boolean requiredFullRange - ) throws IOException { - return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); - } - - /** - * Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive). - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - public abstract Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; - - /** - * Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine. - */ - public abstract int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; - - /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) - */ - public abstract boolean hasCompleteOperationHistory( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; + public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo); /** * Gets the minimum retained sequence number for this engine. @@ -2040,12 +1996,4 @@ public interface TranslogRecoveryRunner { * to advance this marker to at least the given sequence number. */ public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); - - /** - * Whether we should read history operations from translog or Lucene index - */ - public enum HistorySource { - TRANSLOG, - INDEX - } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index ae508d627a00a..e2742e8f2b48b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -608,45 +608,6 @@ public void syncTranslog() throws IOException { revisitIndexDeletionPolicyOnTranslogSynced(); } - /** - * Creates a new history snapshot for reading operations since the provided seqno. - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - @Override - public Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException { - if (historySource == HistorySource.INDEX) { - return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); - } else { - return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE); - } - } - - /** - * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. - */ - @Override - public int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException { - if (historySource == HistorySource.INDEX) { - try ( - Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false) - ) { - return snapshot.totalOperations(); - } - } else { - return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); - } - } - @Override public TranslogStats getTranslogStats() { return getTranslog().stats(); @@ -2817,22 +2778,6 @@ long getNumDocUpdates() { return numDocUpdates.count(); } - @Override - public Translog.Snapshot newChangesSnapshot( - String source, - HistorySource historySource, - MapperService mapperService, - long fromSeqNo, - long toSeqNo, - boolean requiredFullRange - ) throws IOException { - if (historySource == HistorySource.INDEX) { - return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); - } else { - return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); - } - } - @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -2867,28 +2812,8 @@ public Translog.Snapshot newChangesSnapshot( } } - @Override - public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, MapperService mapperService, long startingSeqNo) - throws IOException { - if (historySource == HistorySource.INDEX) { - return getMinRetainedSeqNo() <= startingSeqNo; - } else { - final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - // avoid scanning translog if not necessary - if (startingSeqNo > currentLocalCheckpoint) { - return true; - } - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsProcessed(operation.seqNo()); - } - } - } - return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint; - } + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { + return getMinRetainedSeqNo() <= startingSeqNo; } /** @@ -2899,13 +2824,8 @@ public final long getMinRetainedSeqNo() { return softDeletesPolicy.getMinRetainedSeqNo(); } - @Override - public Closeable acquireHistoryRetentionLock(HistorySource historySource) { - if (historySource == HistorySource.INDEX) { - return softDeletesPolicy.acquireRetentionLock(); - } else { - return translog.acquireRetentionLock(); - } + public Closeable acquireHistoryRetentionLock() { + return softDeletesPolicy.acquireRetentionLock(); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 0491eb0db94cd..d9cf8e2cd65fe 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -320,7 +320,7 @@ public boolean ensureTranslogSynced(Stream locations) { public void syncTranslog() {} @Override - public Closeable acquireHistoryRetentionLock(HistorySource historySource) { + public Closeable acquireHistoryRetentionLock() { return () -> {}; } @@ -335,33 +335,7 @@ public Translog.Snapshot newChangesSnapshot( return newEmptySnapshot(); } - @Override - public Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { - return newEmptySnapshot(); - } - - @Override - public int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { - return 0; - } - - @Override - public boolean hasCompleteOperationHistory( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { // we can do operation-based recovery if we don't have to replay any operation. return startingSeqNo > seqNoStats.getMaxSeqNo(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b2f94f3d398ef..af2dc4993e0a8 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2311,23 +2311,8 @@ protected void doRun() { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) { - return getEngine().acquireHistoryRetentionLock(source); - } - - /** - * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard. - */ - public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo); - } - - /** - * Creates a new history snapshot for reading operations since the provided starting seqno (inclusive). - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo); + public Closeable acquireHistoryRetentionLock() { + return getEngine().acquireHistoryRetentionLock(); } /** @@ -2336,17 +2321,16 @@ public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySourc * the provided starting seqno (inclusive) and ending seqno (inclusive) * The returned snapshot can be retrieved from either Lucene index or translog files. */ - public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo, long endSeqNo) - throws IOException { - return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true); + public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException { + return getEngine().newChangesSnapshot(reason, mapperService, startingSeqNo, endSeqNo, true); } /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). - * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} + * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()} */ - public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo); + public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) { + return getEngine().hasCompleteOperationHistory(reason, startingSeqNo); } /** @@ -2536,7 +2520,7 @@ public RetentionLease addRetentionLease( assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignore = acquireHistoryRetentionLock()) { final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; @@ -2559,7 +2543,7 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignore = acquireHistoryRetentionLock()) { final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; diff --git a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java index b5e40881cfd43..bbdf948af5c32 100644 --- a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java @@ -49,7 +49,6 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.core.internal.io.IOUtils; -import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import org.opensearch.tasks.Task; @@ -99,16 +98,13 @@ public void resync(final IndexShard indexShard, final ActionListener Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1; + assert startingSeqNo >= 0 : "startingSeqNo must be non-negative; got [" + startingSeqNo + "]"; final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = indexShard.getHistoryOperations( - "resync", - indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - startingSeqNo - ); + snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 1bd659853e10e..cb74b751a2cc7 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -187,7 +187,6 @@ public void recoverToTarget(ActionListener listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); }; - final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); final SetOnce retentionLeaseRef = new SetOnce<>(); runUnderPrimaryPermit(() -> { @@ -211,19 +210,13 @@ public void recoverToTarget(ActionListener listener) { cancellableThreads, logger ); - final Engine.HistorySource historySource; - if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) { - historySource = Engine.HistorySource.INDEX; - } else { - historySource = Engine.HistorySource.TRANSLOG; - } - final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource); + final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() - && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo()) - && (historySource == Engine.HistorySource.TRANSLOG + && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) + && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's @@ -231,7 +224,7 @@ && isTargetSameHistory() // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { + if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -274,13 +267,11 @@ && isTargetSameHistory() // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = softDeletesEnabled - ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L - : 0; + startingSeqNo = Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try { - final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo); + final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo); final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { @@ -327,10 +318,7 @@ && isTargetSameHistory() sendFileStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog( - shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), - prepareEngineStep - ); + prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { @@ -350,11 +338,8 @@ && isTargetSameHistory() ); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - logger.trace( - "snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo) - ); - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false); resources.add(phase2Snapshot); retentionLock.close(); @@ -415,6 +400,12 @@ private boolean isTargetSameHistory() { return targetHistoryUUID.equals(shard.getHistoryUUID()); } + private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("peer-recover", startingSeqNo, Long.MAX_VALUE, false)) { + return snapshot.totalOperations(); + } + } + static void runUnderPrimaryPermit( CancellableThreads.Interruptible runnable, String reason, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index b4bcec3273379..3ea7cad528e82 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe private boolean hasUncommittedOperations() throws IOException { long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - return indexShard.estimateNumberOfHistoryOperations( - "peer-recovery", - indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - localCheckpointOfCommit + 1 - ) > 0; + try ( + Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false) + ) { + return snapshot.totalOperations() > 0; + } } @Override diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 928dad8685cfe..8e21b8aadc4b5 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -6174,8 +6174,8 @@ public void testHistoryBasedOnSource() throws Exception { } } MapperService mapperService = createMapperService("test"); - List luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService); - List translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService); + List luceneOps = readAllOperationsBasedOnSource(engine, mapperService); + List translogOps = readAllOperationsBasedOnSource(engine, mapperService); assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); } @@ -6328,7 +6328,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } - try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignored = engine.acquireHistoryRetentionLock()) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java index 5a366574fd397..2167bc7c2c024 100644 --- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java @@ -508,18 +508,12 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { assertThat(snapshot.totalOperations(), equalTo(0)); } } - try ( - Translog.Snapshot snapshot = shard.getHistoryOperations( - "test", - shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - 0 - ) - ) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } // the failure replicated directly from the replication channel. - indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + indexResp = shards.index(new IndexRequest(index.getName()).id("any").source("{}", XContentType.JSON)); assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); Translog.NoOp noop2 = new Translog.NoOp(1, primaryTerm, indexException.toString()); expectedTranslogOps.add(noop2); @@ -532,13 +526,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2))); } } - try ( - Translog.Snapshot snapshot = shard.getHistoryOperations( - "test", - shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - 0 - ) - ) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } diff --git a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java index 4a9b445c12f80..1c3fa908f11da 100644 --- a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java @@ -53,34 +53,23 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.VersionType; -import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.translog.TestTranslog; -import org.opensearch.index.translog.Translog; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -238,41 +227,6 @@ public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { } } - public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { - IndexShard shard = spy(newStartedShard(true)); - when(shard.getLastKnownGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO); - int numOps = between(0, 20); - List operations = new ArrayList<>(); - for (int i = 0; i < numOps; i++) { - operations.add( - new Translog.Index( - "_doc", - Integer.toString(i), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, - primaryTerm, - new byte[] { 1 } - ) - ); - } - Engine.HistorySource source = shard.indexSettings.isSoftDeleteEnabled() - ? Engine.HistorySource.INDEX - : Engine.HistorySource.TRANSLOG; - doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong()); - TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); - List sentOperations = new ArrayList<>(); - PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - sentOperations.addAll(Arrays.asList(request.getOperations())); - listener.onResponse(new ResyncReplicationResponse()); - }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); - syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); - PlainActionFuture fut = new PlainActionFuture<>(); - syncer.resync(shard, fut); - fut.actionGet(); - assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList()))); - closeShards(shard); - } - public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status( randomAlphaOfLength(10), diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index d105da6565897..0e6b959ee46f9 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -41,7 +41,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; -import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.bulk.BulkShardRequest; @@ -69,7 +68,6 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; -import org.opensearch.test.VersionUtils; import java.io.IOException; import java.util.HashMap; @@ -146,108 +144,6 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { } } - public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception { - /* - * The flow of this test: - * - delete #1 - * - roll generation (to create gen 2) - * - index #0 - * - index #3 - * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained) - * - index #2 - * - index #5 - * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed. - */ - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_2_0_0)) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) - .build(); - try (ReplicationGroup shards = createGroup(1, settings)) { - shards.startAll(); - // create out of order delete and index op on replica - final IndexShard orgReplica = shards.getReplicas().get(0); - final String indexName = orgReplica.shardId().getIndexName(); - final long primaryTerm = orgReplica.getOperationPrimaryTerm(); - - // delete #1 - orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete - orgReplica.applyDeleteOperationOnReplica(1, primaryTerm, 2, "type", "id"); - getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation - // index #0 - orgReplica.applyIndexOperationOnReplica( - 0, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON) - ); - // index #3 - orgReplica.applyIndexOperationOnReplica( - 3, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON) - ); - // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. - orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); - // index #2 - orgReplica.applyIndexOperationOnReplica( - 2, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON) - ); - orgReplica.sync(); // advance local checkpoint - orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); - // index #5 -> force NoOp #4. - orgReplica.applyIndexOperationOnReplica( - 5, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON) - ); - - final int translogOps; - if (randomBoolean()) { - if (randomBoolean()) { - logger.info("--> flushing shard (translog will be trimmed)"); - IndexMetadata.Builder builder = IndexMetadata.builder(orgReplica.indexSettings().getIndexMetadata()); - builder.settings( - Settings.builder() - .put(orgReplica.indexSettings().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - ); - orgReplica.indexSettings().updateIndexMetadata(builder.build()); - orgReplica.onSettingsChanged(); - translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed). - } else { - logger.info("--> flushing shard (translog will be retained)"); - translogOps = 6; // 5 ops + seqno gaps - } - flushShard(orgReplica); - } else { - translogOps = 6; // 5 ops + seqno gaps - } - - final IndexShard orgPrimary = shards.getPrimary(); - shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed. - - IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - shards.assertAllEqual(3); - - assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps)); - } - } - public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) @@ -329,7 +225,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); shards.assertAllEqual(3); - try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) { + try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.size(6)); } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 7654606767e43..8da71288a24c3 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1350,13 +1350,9 @@ public static List readAllOperationsInLucene(Engine engine, /** * Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source. */ - public static List readAllOperationsBasedOnSource( - Engine engine, - Engine.HistorySource historySource, - MapperService mapper - ) throws IOException { + public static List readAllOperationsBasedOnSource(Engine engine, MapperService mapper) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper, 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op); From 6f20e7279a8139c1f93f74b90ca7b3e30898fd10 Mon Sep 17 00:00:00 2001 From: Nicholas Walter Knize Date: Tue, 22 Feb 2022 23:41:14 -0600 Subject: [PATCH 2/3] changes from PR feedback Signed-off-by: Nicholas Walter Knize --- .../opensearch/indices/recovery/RecoverySourceHandler.java | 7 ++++--- .../org/opensearch/index/engine/InternalEngineTests.java | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index cb74b751a2cc7..56f95d16a6e83 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -132,6 +132,7 @@ public class RecoverySourceHandler { private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); + private static final String PEER_RECOVERY_NAME = "peer-recovery"; public RecoverySourceHandler( IndexShard shard, @@ -215,7 +216,7 @@ public void recoverToTarget(ActionListener listener) { final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() - && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) + && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo()) && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, @@ -339,7 +340,7 @@ && isTargetSameHistory() final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false); + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false); resources.add(phase2Snapshot); retentionLock.close(); @@ -401,7 +402,7 @@ private boolean isTargetSameHistory() { } private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException { - try (Translog.Snapshot snapshot = shard.newChangesSnapshot("peer-recover", startingSeqNo, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) { return snapshot.totalOperations(); } } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 8e21b8aadc4b5..3f155a56736ca 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -6175,9 +6175,7 @@ public void testHistoryBasedOnSource() throws Exception { } MapperService mapperService = createMapperService("test"); List luceneOps = readAllOperationsBasedOnSource(engine, mapperService); - List translogOps = readAllOperationsBasedOnSource(engine, mapperService); assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); - assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); } } From 8a21554bf0bb2c665913017bcaee01d40181a8fd Mon Sep 17 00:00:00 2001 From: Nicholas Walter Knize Date: Wed, 23 Feb 2022 10:17:48 -0600 Subject: [PATCH 3/3] wrap trace log in tracelogger.enabled Signed-off-by: Nicholas Walter Knize --- .../opensearch/indices/recovery/RecoverySourceHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 56f95d16a6e83..b7735d171e58c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -339,7 +339,9 @@ && isTargetSameHistory() ); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); + } final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false); resources.add(phase2Snapshot); retentionLock.close();