Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Adding PrimaryMode check before publishing checkpoint and processing a received checkpoint. #4157

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
Comment on lines -43 to 45
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also reject checkpoints if the replica copy(where the checkpoint was sent to) is operating in primary mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on that note - we will want to cancel any ongoing replication events on both sides. I've added #4136 to cover this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it also worth checking shardRouting here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Logic to reject checkpoints if shard is in PrimaryMode in latest commit.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3522,7 +3522,7 @@ public void testCheckpointRefreshListenerWithNull() throws IOException {
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
Expand Down Expand Up @@ -56,4 +64,40 @@ public void testIgnoreShardIdle() throws Exception {
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
}
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
refreshListener.afterRefresh(true);

// verify checkpoint is published
verify(mock, times(1)).publish(any());
closeShards(shard);
}

/**
* here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer
* in PrimaryMode, and we test if the shard does not publish checkpoint after refresh.
*/
public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
String id = shard.routingEntry().allocationId().getId();

// Starting relocation handoff
shard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff
shard.getReplicationTracker().completeRelocationHandoff();
refreshListener.afterRefresh(true);

// verify checkpoint is not published
verify(mock, times(0)).publish(any());
closeShards(shard);
}
}