Skip to content

Commit

Permalink
Fix test testDropPrimaryDuringReplication and clean up ReplicationChe…
Browse files Browse the repository at this point in the history
…ckpoint validation (#8889)

* Fix test testDropPrimaryDuringReplication and clean up ReplicationCheckpoint validation.

This test is now occasionally failing with replicas having 0 documents. This occurs in a couple of ways:
1. After dropping the old primary the new primary is not publishing a checkpoint to replicas unless it indexes docs from translog after flipping to primary mode.
If there is nothing to index, it will not publish a checkpoint, but the other replica could have never sync'd with the original primary and be left out of date.
- This PR fixes this by force publishing a checkpoint after the new primary flips to primary mode.
2. The replica receives a checkpoint post failover and cancels its sync with the former primary that is still active, recognizing a primary term bump.
However this cancellation is async and immediately starting a new replication event could fail as its still replicating.
- This PR fixes this by attempting to process the latest received checkpoint on failure, if the shard is not failed and still behind.

This PR also introduces a few changes to ensure the accuracy of the ReplicationCheckpoint tracked on primary & replicas.
- Ensure the checkpoint stored in SegmentReplicationTarget is the checkpoint passed from the primary and not locally computed.  This ensures checks for primary term are accurate and not using a locally compued operationPrimaryTerm.
- Introduces a refresh listener for both primary & replica to update the ReplicationCheckpoint and store it in replicationTracker post refresh rather than redundantly computing when accessed.
- Removes unnecessary onCheckpointPublished method used to start replication timers manually.  This will happen automatically on primaries once its local cp is updated.

Signed-off-by: Marc Handalian <[email protected]>

* Handle NoSuchFileException when attempting to delete decref'd files.

To avoid divergent logic with remote store, we always incref/decref the segmentinfos.files(true) which includes the segments_n file.
Decref to 0 will attempt to delete the file from the store and its possible this _n file does not yet exist. This change will ignore if we get a noSuchFile while attempting to delete.

Signed-off-by: Marc Handalian <[email protected]>

* Add more unit tests.

Signed-off-by: Marc Handalian <[email protected]>

* Clean up IndexShardTests.testCheckpointReffreshListenerWithNull

Signed-off-by: Marc Handalian <[email protected]>

* Remove unnecessary catch for NoSuchFileException.

Signed-off-by: Marc Handalian <[email protected]>

* Add another test for non segrep.

Signed-off-by: Marc Handalian <[email protected]>

* PR Feedback.

Signed-off-by: Marc Handalian <[email protected]>

* re-compute replication checkpoint on primary promotion.

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit c3acf47)
  • Loading branch information
mch2 committed Aug 3, 2023
1 parent ae9373a commit e714442
Show file tree
Hide file tree
Showing 15 changed files with 435 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
Expand All @@ -60,6 +62,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
Expand Down Expand Up @@ -984,8 +987,11 @@ public void testScrollCreatedOnReplica() throws Exception {
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(false);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint();
final Collection<String> snapshottedSegments;
try (final GatedCloseable<SegmentInfos> closeable = tuple.v1()) {
snapshottedSegments = closeable.get().files(false);
}
// opens a scrolled query before a flush is called.
// this is for testing scroll segment consistency between refresh and flush
SearchResponse searchResponse = client(replica).prepareSearch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -516,6 +517,20 @@ protected SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}

@Override
public synchronized GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// get reference to latest infos
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
// incref all files
try {
final Collection<String> files = latestSegmentInfos.files(false);
store.incRefFileDeleter(files);
return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files));
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}

protected LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
Expand Down
69 changes: 47 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,19 @@ public void updateShardState(
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
resetEngineToGlobalCheckpoint();
// It is possible an engine can open with a SegmentInfos on a higher gen but the reader does not refresh to
// trigger our refresh listener.
// Force update the checkpoint post engine reset.
updateReplicationCheckpoint();
}

replicationTracker.activatePrimaryMode(getLocalCheckpoint());
if (indexSettings.isSegRepEnabled()) {
// force publish a checkpoint once in primary mode so that replicas not caught up to previous primary
// are brought up to date.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
}

ensurePeerRecoveryRetentionLeasesExist();
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then
Expand Down Expand Up @@ -1556,15 +1567,7 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint();
if (infosAndCheckpoint == null) {
return null;
}
try (final GatedCloseable<SegmentInfos> ignored = infosAndCheckpoint.v1()) {
return infosAndCheckpoint.v2();
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
return replicationTracker.getLatestReplicationCheckpoint();
}

/**
Expand All @@ -1578,13 +1581,11 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
*
*/
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
ReplicationCheckpoint.empty(shardId, getDefaultCodecName())
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
Expand All @@ -1603,11 +1604,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
// TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName()
)
);
Expand Down Expand Up @@ -1863,10 +1860,6 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

public void onCheckpointPublished(ReplicationCheckpoint checkpoint) {
replicationTracker.setLatestReplicationCheckpoint(checkpoint);
}

/**
* Wrapper for a non-closing reader
*
Expand Down Expand Up @@ -2347,6 +2340,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);

if (indexSettings.isSegRepEnabled()) {
// set initial replication checkpoints into tracker.
updateReplicationCheckpoint();
}
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
Expand Down Expand Up @@ -3678,6 +3676,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

internalRefreshListener.clear();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (indexSettings.isSegRepEnabled()) {
internalRefreshListener.add(new ReplicationCheckpointUpdater());
}
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
Expand Down Expand Up @@ -4482,6 +4483,30 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

/**
* Refresh listener to update the Shard's ReplicationCheckpoint post refresh.
*/
private class ReplicationCheckpointUpdater implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() throws IOException {}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
updateReplicationCheckpoint();
}
}
}

private void updateReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> ignored = tuple.v1()) {
replicationTracker.setLatestReplicationCheckpoint(tuple.v2());
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}

private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = mapperService != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private synchronized boolean syncSegments() {
return true;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -207,6 +206,10 @@ private synchronized boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@ public class SegmentReplicationTarget extends ReplicationTarget {

public final static String REPLICATION_PREFIX = "replication.";

public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
public SegmentReplicationTarget(
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
ReplicationListener listener
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand Down Expand Up @@ -90,12 +95,19 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, source, listener);
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
}

@Override
public String description() {
return String.format(Locale.ROOT, "Id:[%d] Shard:[%s] Source:[%s]", getId(), shardId(), source.getDescription());
return String.format(
Locale.ROOT,
"Id:[%d] Checkpoint [%s] Shard:[%s] Source:[%s]",
getId(),
getCheckpoint(),
shardId(),
source.getDescription()
);
}

@Override
Expand Down
Loading

0 comments on commit e714442

Please sign in to comment.