Skip to content

Commit

Permalink
Fix test testDropPrimaryDuringReplication and clean up ReplicationChe…
Browse files Browse the repository at this point in the history
…ckpoint validation.

This test is now occasionally failing with replicas having 0 documents. This occurs in a couple of ways:
1. After dropping the old primary the new primary is not publishing a checkpoint to replicas unless it indexes docs from translog after flipping to primary mode.
If there is nothing to index, it will not publish a checkpoint, but the other replica could have never sync'd with the original primary and be left out of date.
- This PR fixes this by force publishing a checkpoint after the new primary flips to primary mode.
2. The replica receives a checkpoint post failover and cancels its sync with the former primary that is still active, recognizing a primary term bump.
However this cancellation is async and immediately starting a new replication event could fail as its still replicating.
- This PR fixes this by attempting to process the latest received checkpoint on failure, if the shard is not failed and still behind.

This PR also introduces a few changes to ensure the accuracy of the ReplicationCheckpoint tracked on primary & replicas.
- Ensure the checkpoint stored in SegmentReplicationTarget is the checkpoint passed from the primary and not locally computed.  This ensures checks for primary term are accurate and not using a locally compued operationPrimaryTerm.
- Introduces a refresh listener for both primary & replica to update the ReplicationCheckpoint and store it in replicationTracker post refresh rather than redundantly computing when accessed.
- Removes unnecessary onCheckpointPublished method used to start replication timers manually.  This will happen automatically on primaries once its local cp is updated.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Jul 27, 2023
1 parent c25c175 commit fe6a88e
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
Expand All @@ -60,6 +62,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
Expand Down Expand Up @@ -982,8 +985,11 @@ public void testScrollCreatedOnReplica() throws Exception {
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(false);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint();
final Collection<String> snapshottedSegments;
try (final GatedCloseable<SegmentInfos> closeable = tuple.v1()) {
snapshottedSegments = closeable.get().files(false);
}
// opens a scrolled query before a flush is called.
// this is for testing scroll segment consistency between refresh and flush
SearchResponse searchResponse = client(replica).prepareSearch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -445,6 +446,20 @@ protected SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}

@Override
public synchronized GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// get reference to latest infos
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
// incref all files
try {
final Collection<String> files = latestSegmentInfos.files(true);
store.incRefFileDeleter(files);
return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files));
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}

protected LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
Expand Down
64 changes: 42 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,12 @@ public void updateShardState(
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());

if (indexSettings.isSegRepEnabled()) {
// force publish a checkpoint now that shard is in primary mode.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
}

ensurePeerRecoveryRetentionLeasesExist();
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then
Expand Down Expand Up @@ -1553,15 +1559,7 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint();
if (infosAndCheckpoint == null) {
return null;
}
try (final GatedCloseable<SegmentInfos> ignored = infosAndCheckpoint.v1()) {
return infosAndCheckpoint.v2();
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
return replicationTracker.getLatestReplicationCheckpoint();
}

/**
Expand All @@ -1575,13 +1573,11 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
*
*/
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
ReplicationCheckpoint.empty(shardId, getDefaultCodecName())
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
Expand All @@ -1600,11 +1596,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
// TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName()
)
);
Expand Down Expand Up @@ -1860,10 +1852,6 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

public void onCheckpointPublished(ReplicationCheckpoint checkpoint) {
replicationTracker.setLatestReplicationCheckpoint(checkpoint);
}

/**
* Wrapper for a non-closing reader
*
Expand Down Expand Up @@ -2344,6 +2332,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);

if (indexSettings.isSegRepEnabled()) {
// set initial replication checkpoints into tracker.
updateReplicationCheckpoint();
}
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
Expand Down Expand Up @@ -3669,6 +3662,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

internalRefreshListener.clear();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (indexSettings.isSegRepEnabled()) {
internalRefreshListener.add(new ReplicationCheckpointUpdater());
}
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
Expand Down Expand Up @@ -4473,6 +4469,30 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

/**
* Refresh listener to update the Shard's ReplicationCheckpoint post refresh.
*/
private class ReplicationCheckpointUpdater implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() throws IOException {}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
updateReplicationCheckpoint();
}
}
}

private void updateReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> ignored = tuple.v1()) {
replicationTracker.setLatestReplicationCheckpoint(tuple.v2());
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}

private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = mapperService != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private synchronized boolean syncSegments() {
return true;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ public class SegmentReplicationTarget extends ReplicationTarget {

public final static String REPLICATION_PREFIX = "replication.";

public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
public SegmentReplicationTarget(
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
ReplicationListener listener
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand All @@ -73,6 +74,10 @@ public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount);
}

public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

@Override
protected void closeInternal() {
try {
Expand All @@ -98,12 +103,19 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, source, listener);
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
}

@Override
public String description() {
return String.format(Locale.ROOT, "Id:[%d] Shard:[%s] Source:[%s]", getId(), shardId(), source.getDescription());
return String.format(
Locale.ROOT,
"Id:[%d] Checkpoint [%s] Shard:[%s] Source:[%s]",
getId(),
getCheckpoint(),
shardId(),
source.getDescription()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,15 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
replicaShard.getLatestReplicationCheckpoint()
ongoingReplicationTarget.getCheckpoint()
)
);
return;
}
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(replicaShard, new SegmentReplicationListener() {
startReplication(replicaShard, receivedCheckpoint, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
Expand Down Expand Up @@ -280,6 +280,8 @@ public void onReplicationFailure(
);
if (sendShardFailure == true) {
failShard(e, replicaShard);
} else {
processLatestReceivedCheckpoint(replicaShard, thread);
}
}
});
Expand Down Expand Up @@ -396,8 +398,24 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec
}
}

public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener);
/**
* Start a round of replication and sync to at least the given checkpoint.
* @param indexShard - {@link IndexShard} replica shard
* @param checkpoint - {@link ReplicationCheckpoint} checkpoint to sync to
* @param listener - {@link ReplicationListener}
* @return {@link SegmentReplicationTarget} target event orchestrating the event.
*/
public SegmentReplicationTarget startReplication(
final IndexShard indexShard,
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
indexShard,
checkpoint,
sourceFactory.get(indexShard),
listener
);
startReplication(target);
return target;
}
Expand Down Expand Up @@ -529,50 +547,59 @@ private void forceReplication(ForceSyncRequest request, ActionListener<Transport
if (indexShard == null || indexShard.getReplicationEngine().isEmpty()) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
} else {
startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
try {
logger.trace(
// We are skipping any validation for an incoming checkpoint, use the shard's latest checkpoint in the target.
startReplication(
indexShard,
indexShard.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
try {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}",
shardId,
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
} else {
// Update the replica's checkpoint on primary's replication tracker.
updateVisibleCheckpoint(state.getReplicationId(), indexShard);
}
listener.onResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
logger.error("Error while marking replication completed", e);
listener.onFailure(e);
}
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.error(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}",
shardId,
"[shardId {}] [replication id {}] Force replication Sync failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
),
e
);
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
} else {
// Update the replica's checkpoint on primary's replication tracker.
updateVisibleCheckpoint(state.getReplicationId(), indexShard);
if (sendShardFailure) {
failShard(e, indexShard);
}
listener.onResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
logger.error("Error while marking replication completed", e);
listener.onFailure(e);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
),
e
);
if (sendShardFailure) {
failShard(e, indexShard);
}
listener.onFailure(e);
}
});
);
}
}

Expand Down
Loading

0 comments on commit fe6a88e

Please sign in to comment.