Skip to content

Commit

Permalink
Merge pull request #383 from liftbridge-io/fix_change_leader
Browse files Browse the repository at this point in the history
Fix partition leader failover
  • Loading branch information
tylertreat authored Jan 20, 2022
2 parents 87a61c1 + 65892d3 commit 80a935e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
5 changes: 3 additions & 2 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,9 @@ func (m *metadataAPI) electNewPartitionLeader(ctx context.Context, partition *pa
op := &proto.RaftLog{
Op: proto.Op_CHANGE_LEADER,
ChangeLeaderOp: &proto.ChangeLeaderOp{
Stream: partition.Stream,
Leader: leader,
Stream: partition.Stream,
Partition: partition.Id,
Leader: leader,
},
}

Expand Down
1 change: 1 addition & 0 deletions server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ func (p *partition) checkLeaderHealth(leader string, epoch uint64, leaderLastSee
leader, p, lastSeenElapsed)
req := &proto.ReportLeaderOp{
Stream: p.Stream,
Partition: p.Id,
Replica: p.srv.config.Clustering.ServerID,
Leader: leader,
LeaderEpoch: epoch,
Expand Down
28 changes: 15 additions & 13 deletions server/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func stopFollowing(t *testing.T, p *partition) {
require.NoError(t, p.stopFollowing())
}

// Ensure messages are replicated and the stream leader fails over when the
// Ensure messages are replicated and the partition leader fails over when the
// leader dies.
func TestStreamLeaderFailover(t *testing.T) {
func TestPartitionLeaderFailover(t *testing.T) {
defer cleanupStorage(t)

// Use an external NATS server.
Expand All @@ -101,6 +101,7 @@ func TestStreamLeaderFailover(t *testing.T) {

// Configure second server.
s2Config := getTestConfig("b", false, 5051)
s2Config.EmbeddedNATS = false
s2Config.Clustering.ReplicaMaxLeaderTimeout = time.Second
s2Config.Clustering.ReplicaMaxIdleWait = 500 * time.Millisecond
s2Config.Clustering.ReplicaFetchTimeout = 500 * time.Millisecond
Expand All @@ -109,6 +110,7 @@ func TestStreamLeaderFailover(t *testing.T) {

// Configure second server.
s3Config := getTestConfig("c", false, 5052)
s3Config.EmbeddedNATS = false
s3Config.Clustering.ReplicaMaxLeaderTimeout = time.Second
s3Config.Clustering.ReplicaMaxIdleWait = 500 * time.Millisecond
s3Config.Clustering.ReplicaFetchTimeout = 500 * time.Millisecond
Expand All @@ -125,21 +127,21 @@ func TestStreamLeaderFailover(t *testing.T) {
subject := "foo"
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = client.CreateStream(ctx, subject, name, lift.ReplicationFactor(3))
err = client.CreateStream(ctx, subject, name, lift.ReplicationFactor(3), lift.Partitions(2))
require.NoError(t, err)

leader := getPartitionLeader(t, 10*time.Second, name, 0, servers...)
leader := getPartitionLeader(t, 10*time.Second, name, 1, servers...)

// Check partition load counts.
for _, server := range servers {
partitionCounts := server.metadata.BrokerPartitionCounts()
require.Len(t, partitionCounts, 3)
for _, s := range servers {
require.Equal(t, 1, partitionCounts[s.config.Clustering.ServerID])
require.Equal(t, 2, partitionCounts[s.config.Clustering.ServerID])
}
leaderCounts := server.metadata.BrokerLeaderCounts()
require.Len(t, leaderCounts, 1)
require.Equal(t, 1, leaderCounts[leader.config.Clustering.ServerID])
require.Equal(t, 2, leaderCounts[leader.config.Clustering.ServerID])
}

num := 100
Expand All @@ -155,7 +157,7 @@ func TestStreamLeaderFailover(t *testing.T) {
// Publish messages.
for i := 0; i < num; i++ {
_, err := client.Publish(context.Background(), name, expected[i].Value,
lift.Key(expected[i].Key), lift.AckPolicyAll())
lift.Key(expected[i].Key), lift.AckPolicyAll(), lift.ToPartition(1))
require.NoError(t, err)
}

Expand All @@ -174,7 +176,7 @@ func TestStreamLeaderFailover(t *testing.T) {
if i == num {
close(ch)
}
}, lift.StartAtEarliestReceived())
}, lift.StartAtEarliestReceived(), lift.Partition(1))
require.NoError(t, err)

select {
Expand All @@ -184,9 +186,9 @@ func TestStreamLeaderFailover(t *testing.T) {
}

// Wait for HW to update on followers.
waitForHW(t, 5*time.Second, name, 0, int64(num-1), servers...)
waitForHW(t, 5*time.Second, name, 1, int64(num-1), servers...)

// Kill the stream leader.
// Kill the partition leader.
leader.Stop()
followers := []*Server{}
for _, s := range servers {
Expand All @@ -197,7 +199,7 @@ func TestStreamLeaderFailover(t *testing.T) {
}

// Wait for new leader to be elected.
leader = getPartitionLeader(t, 10*time.Second, name, 0, followers...)
leader = getPartitionLeader(t, 10*time.Second, name, 1, followers...)

// Make sure the new leader's log is consistent.
i = 0
Expand All @@ -214,7 +216,7 @@ func TestStreamLeaderFailover(t *testing.T) {
if i == num {
close(ch)
}
}, lift.StartAtEarliestReceived())
}, lift.StartAtEarliestReceived(), lift.Partition(1))
require.NoError(t, err)

select {
Expand All @@ -226,7 +228,7 @@ func TestStreamLeaderFailover(t *testing.T) {
// Check partition load counts.
partitionCounts := leader.metadata.BrokerPartitionCounts()
require.Len(t, partitionCounts, 3)
require.Equal(t, 1, partitionCounts[leader.config.Clustering.ServerID])
require.Equal(t, 2, partitionCounts[leader.config.Clustering.ServerID])
leaderCounts := leader.metadata.BrokerLeaderCounts()
require.Equal(t, 1, leaderCounts[leader.config.Clustering.ServerID])
}
Expand Down
4 changes: 2 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func getPartitionLeader(t *testing.T, timeout time.Duration, name string, partit
streamLeader, _ := partition.GetLeader()
if streamLeader == s.config.Clustering.ServerID {
if leader != nil {
stackFatalf(t, "Found more than one stream leader")
stackFatalf(t, "Found more than one partition leader")
}
leader = s
}
Expand All @@ -186,7 +186,7 @@ func getPartitionLeader(t *testing.T, timeout time.Duration, name string, partit
time.Sleep(15 * time.Millisecond)
}
if leader == nil {
stackFatalf(t, "No stream leader found")
stackFatalf(t, "No partition leader found")
}
return leader
}
Expand Down

0 comments on commit 80a935e

Please sign in to comment.