Skip to content

Commit

Permalink
[Segment Replication][Remote Store] Remove commits when remote store …
Browse files Browse the repository at this point in the history
…is enabled (#8050) (#8753)

* remove commits + fix failing test



* fix failing tests



* fix precommit failure



* remove logs



* address review comments



---------


(cherry picked from commit 2f830be)

Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent c0da9f0 commit 23ddab2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class NRTReplicationEngine extends Engine implements LifecycleAware {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

Expand Down Expand Up @@ -116,6 +117,7 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -165,7 +167,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
if (shouldCommit) {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}
Expand Down Expand Up @@ -426,15 +430,21 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
// if remote store is enabled, all segments durably persisted
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
store.directory().syncMetaData();
}
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
47 changes: 25 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4670,31 +4670,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
if (this.shardRouting.primary()) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public class NRTReplicationEngineTests extends EngineTestCase {
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
);

private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.build()
);

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
Expand Down Expand Up @@ -129,6 +137,29 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
}
}

public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());

// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());

// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException {
// if the replica is already at segments_N that is received, it will commit segments_N+1.
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down Expand Up @@ -312,18 +343,11 @@ public void testCommitSegmentInfos() throws Exception {
}
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings)
throws IOException {
Lucene.cleanLuceneIndex(store.directory());
final Path translogDir = createTempDir();
final EngineConfig replicaConfig = config(
defaultSettings,
store,
translogDir,
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
if (Lucene.indexExists(store.directory()) == false) {
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(
Expand All @@ -336,4 +360,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
}
return new NRTReplicationEngine(replicaConfig);
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
}
}

0 comments on commit 23ddab2

Please sign in to comment.