Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Jul 14, 2023
1 parent 1379c65 commit 951392c
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

Expand Down Expand Up @@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = !engineConfig.getIndexSettings().isRemoteStoreEnabled();
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand All @@ -138,16 +140,16 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
ensureOpen();
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
boolean remoteStoreEnabled = engineConfig.getIndexSettings().isRemoteStoreEnabled();
readerManager.updateSegments(infos, remoteStoreEnabled);
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
if (remoteStoreEnabled == false) {
if (shouldCommit) {
commitSegmentInfos();
} else {
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
Expand Down Expand Up @@ -389,7 +391,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
// if remote store is enabled, all segments durably persisted
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* Update this reader's segments and refresh.
*
* @param infos {@link SegmentInfos} infos
* @param remoteStoreEnabled true if remote store is enabled
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos, boolean remoteStoreEnabled) throws IOException {
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased (in the case remote store is not enabled)
if (remoteStoreEnabled == false) {
infos.updateGeneration(currentInfos);
}
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
Expand All @@ -85,7 +85,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand Down Expand Up @@ -161,7 +161,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnable
// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

Expand All @@ -171,7 +171,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
nrtEngine.getLatestSegmentInfos().changed();
nrtEngine.getLatestSegmentInfos().changed();
Expand All @@ -198,7 +198,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
CountDownLatch latch = new CountDownLatch(1);
Thread commitThread = new Thread(() -> {
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -255,7 +255,7 @@ public void testRefreshOnNRTEngine() throws IOException {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -277,7 +277,7 @@ public void testTrimTranslogOps() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings);
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 100),
Expand Down Expand Up @@ -313,7 +313,7 @@ public void testCommitSegmentInfos() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
.stream()
Expand Down Expand Up @@ -365,4 +365,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
}
return new NRTReplicationEngine(replicaConfig);
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.Assert;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -61,8 +58,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata())
.primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1)
.build();
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
syncDirectory(replica);
closeShards(replica);
shards.removeReplica(replica);

Expand Down Expand Up @@ -114,11 +110,8 @@ public IndexShard indexShard() {
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
syncDirectory(primary);
syncDirectory(newReplicaShard);
}
}

Expand Down Expand Up @@ -151,11 +144,9 @@ public void testNoTranslogHistoryTransferred() throws Exception {
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
syncDirectory(primary);
syncDirectory(replica);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
Expand Down Expand Up @@ -42,10 +39,8 @@ public void testReplicaSyncingFromRemoteStore() throws IOException {
replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
assertDocs(replicaShard, "1", "2");

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
syncDirectory(primaryShard);
syncDirectory(replicaShard);
closeShards(primaryShard, replicaShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.indices.recovery;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -67,14 +64,9 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
assertEquals(1, primary.getRetentionLeases().leases().size());
assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry())));

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
syncDirectory(primary);
syncDirectory(replica1);
syncDirectory(replica2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1204,4 +1204,13 @@ protected void performOnReplica(RetentionLeaseSyncAction.Request request, IndexS
}
}

/**
* Syncs all files in directory of shard
* @param shard
* @throws IOException
*/
protected void syncDirectory(IndexShard shard) throws IOException {
shard.store().directory().sync(List.of(shard.store().directory().listAll()));
}

}

0 comments on commit 951392c

Please sign in to comment.