Skip to content

Commit

Permalink
test: leader package (#1850)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored Jun 21, 2024
1 parent dab6f67 commit 3147067
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 11 deletions.
2 changes: 1 addition & 1 deletion backend/controller/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type LeaderFactory[P any] func(ctx context.Context) (P, error)
// FollowerFactory is a function that is called whenever we follow a new leader.
//
// If the new leader has the same url as the previous leader, the existing follower will be used.
type FollowerFactory[P any] func(ctx context.Context, url *url.URL) (P, error)
type FollowerFactory[P any] func(ctx context.Context, leaderURL *url.URL) (P, error)

type leader[P any] struct {
value P
Expand Down
117 changes: 117 additions & 0 deletions backend/controller/leader/leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package leader

import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestExistingLeaseForURL(t *testing.T) {
// Test that if the a lease exists with the coordinator's URL but the coordinator does not have the lease, neither a leader or follower is created.
// This can occur when a leader fails to renew a lease, and we then try and coordinate while the db still has the lease active.
// If we create a new follower then we can end up in an infinte loop with the follower calling the leader's service which is really a follower.
ctx := log.ContextWithNewDefaultLogger(context.Background())
key := leases.SystemKey("leader-test")
endpoint, err := url.Parse("http://localhost:1234")
assert.NoError(t, err)
leaser := leases.NewFakeLeaser()
_, _, err = leaser.AcquireLease(ctx, key, time.Second*5, optional.Some[any](endpoint.String()))
if err != nil {
t.Fatal()
}
coordinator := NewCoordinator[interface{}](ctx,
endpoint,
key,
leaser,
time.Second*10,
func(ctx context.Context) (interface{}, error) {
t.Fatal("shouldn't be called")
return nil, nil
},
func(ctx context.Context, endpoint *url.URL) (interface{}, error) {
t.Fatal("shouldn't be called")
return nil, nil
},
)
_, err = coordinator.Get()
assert.Error(t, err)
}

func TestSingleLeader(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
leaser := leases.NewFakeLeaser()
leaseTTL := time.Second * 5
leaderFactory := func(ctx context.Context) (string, error) {
return fmt.Sprintf("leader:%v", time.Now()), nil
}
followerFactory := func(ctx context.Context, leaderURL *url.URL) (string, error) {
fmt.Printf("creating follower with leader url: %s\n", leaderURL.String())
return fmt.Sprintf("following:%s", leaderURL.String()), nil
}

// create coordinators
coordinators := []*Coordinator[string]{}
for i := range 5 {
advertise, err := url.Parse(fmt.Sprintf("http://localhost:%d", i))
assert.NoError(t, err)
coordinators = append(coordinators, NewCoordinator[string](ctx,
advertise,
leases.SystemKey("leader-test"),
leaser,
leaseTTL,
leaderFactory,
followerFactory))
}

// find leader
leaderIdx, initialLeaderStr := leaderFromCoordinators(t, coordinators)
validateAllFollowTheLeader(t, coordinators, leaderIdx)

// release the lease the leader is using, to simulate the lease not being able to be renewed by the leader
// a new leader should be elected
err := coordinators[leaderIdx].leader.MustGet().lease.Release()
assert.NoError(t, err)
time.Sleep(leaseTTL + time.Millisecond*500)

leaderIdx, finalLeaderStr := leaderFromCoordinators(t, coordinators)
assert.NotEqual(t, finalLeaderStr, initialLeaderStr, "leader should have been changed when the lease broke")
validateAllFollowTheLeader(t, coordinators, leaderIdx)
}

func leaderFromCoordinators(t *testing.T, coordinators []*Coordinator[string]) (idx int, leaderStr string) {
t.Helper()

leaderIdx := -1
for i := range 5 {
result, err := coordinators[i].Get()
assert.NoError(t, err)
if strings.HasPrefix(result, "leader:") {
leaderIdx = i
leaderStr = result
}
}
assert.NotEqual(t, -1, leaderIdx)
return idx, leaderStr
}

func validateAllFollowTheLeader(t *testing.T, coordinators []*Coordinator[string], leaderIdx int) {
t.Helper()

for i := range 5 {
if leaderIdx == i {
// known leader
continue
}
result, err := coordinators[i].Get()
assert.NoError(t, err)
assert.Equal(t, fmt.Sprintf("following:%s", coordinators[leaderIdx].advertise), result)
}
}
46 changes: 36 additions & 10 deletions backend/controller/leases/fake_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package leases
import (
"context"
"fmt"
"reflect"
"time"

"github.com/alecthomas/types/optional"
Expand All @@ -11,40 +12,65 @@ import (

func NewFakeLeaser() *FakeLeaser {
return &FakeLeaser{
leases: xsync.NewMapOf[string, struct{}](),
leases: xsync.NewMapOf[string, *FakeLease](),
}
}

var _ Leaser = (*FakeLeaser)(nil)

// FakeLeaser is a fake implementation of the [Leaser] interface.
type FakeLeaser struct {
leases *xsync.MapOf[string, struct{}]
leases *xsync.MapOf[string, *FakeLease]
}

func (f *FakeLeaser) AcquireLease(ctx context.Context, key Key, ttl time.Duration, metadata optional.Option[any]) (Lease, context.Context, error) {
if _, loaded := f.leases.LoadOrStore(key.String(), struct{}{}); loaded {
return nil, nil, ErrConflict
}
leaseCtx, cancelCtx := context.WithCancel(ctx)
return &FakeLease{
newLease := &FakeLease{
leaser: f,
key: key,
metadata: metadata,
cancelCtx: cancelCtx,
}, leaseCtx, nil
ttl: ttl,
}
if _, loaded := f.leases.LoadOrStore(key.String(), newLease); loaded {
cancelCtx()
return nil, nil, ErrConflict
}
return newLease, leaseCtx, nil
}

func (f *FakeLeaser) GetLeaseInfo(ctx context.Context, key Key, metadata any) (expiry time.Time, err error) {
if _, ok := f.leases.Load(key.String()); ok {
return time.Now().Add(time.Minute), nil
lease, ok := f.leases.Load(key.String())
if !ok {
return time.Time{}, fmt.Errorf("not found")
}
expiry = time.Now().Add(lease.ttl)
md, ok := lease.metadata.Get()
if !ok || metadata == nil {
// no need to parse metadata
return
}
metaValue := reflect.ValueOf(metadata)
if metaValue.Kind() != reflect.Ptr || metaValue.IsNil() {
return time.Time{}, fmt.Errorf("metadata must be a non-nil pointer")
}
if !metaValue.Elem().CanSet() {
return time.Time{}, fmt.Errorf("cannot set metadata value")
}
mdValue := reflect.ValueOf(md)
if mdValue.Type() != metaValue.Elem().Type() {
return time.Time{}, fmt.Errorf("type mismatch between metadata and md")
}
return time.Time{}, fmt.Errorf("not found")
metaValue.Elem().Set(mdValue)
return
}

type FakeLease struct {
leaser *FakeLeaser
key Key
cancelCtx context.CancelFunc
metadata optional.Option[any]
ttl time.Duration
}

func (f *FakeLease) Release() error {
Expand Down

0 comments on commit 3147067

Please sign in to comment.