Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Adding PrimaryMode check before publishing checkpoint and processing a received checkpoint. #4157

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
Comment on lines -43 to 45
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also reject checkpoints if the replica copy(where the checkpoint was sent to) is operating in primary mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on that note - we will want to cancel any ongoing replication events on both sides. I've added #4136 to cover this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it also worth checking shardRouting here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Logic to reject checkpoints if shard is in PrimaryMode in latest commit.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3522,7 +3522,7 @@ public void testCheckpointRefreshListenerWithNull() throws IOException {
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
Expand Down Expand Up @@ -80,4 +86,41 @@ public void testIgnoreShardIdle() throws Exception {
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
}
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
refreshListener.afterRefresh(true);

// verify checkpoint is published
verify(mock, times(1)).publish(any());
closeShards(shard);
}

/**
* here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer
* in PrimaryMode, and we test if the shard does not publish checkpoint after refresh.
*/
public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
String id = shard.routingEntry().allocationId().getId();

// Starting relocation handoff
shard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff
shard.getReplicationTracker().completeRelocationHandoff();
refreshListener.afterRefresh(true);

// verify checkpoint is not published
verify(mock, times(0)).publish(any());
closeShards(shard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc
closeShard(indexShard, false);
}

/**
* here we are starting a new shard in PrimaryMode and testing that we don't process a checkpoint on shard when it is in PrimaryMode.
*/
public void testRejectCheckpointOnShardPrimaryMode() throws IOException {
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode.
IndexShard primaryShard = newStartedShard(true);
IndexShard spyShard = spy(primaryShard);
doNothing().when(spy).startReplication(any(), any(), any());
spy.onNewCheckpoint(aheadCheckpoint, spyShard);

// Verify that checkpoint is not processed as shard is in PrimaryMode.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicationOnDone() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
Expand Down