diff --git a/backend/controller/leader/leader.go b/backend/controller/leader/leader.go index 3d322a9f5c..7940dfa9bf 100644 --- a/backend/controller/leader/leader.go +++ b/backend/controller/leader/leader.go @@ -34,7 +34,7 @@ type LeaderFactory[P any] func(ctx context.Context) (P, error) // FollowerFactory is a function that is called whenever we follow a new leader. // // If the new leader has the same url as the previous leader, the existing follower will be used. -type FollowerFactory[P any] func(ctx context.Context, url *url.URL) (P, error) +type FollowerFactory[P any] func(ctx context.Context, leaderURL *url.URL) (P, error) type leader[P any] struct { value P diff --git a/backend/controller/leader/leader_test.go b/backend/controller/leader/leader_test.go new file mode 100644 index 0000000000..f3194990a5 --- /dev/null +++ b/backend/controller/leader/leader_test.go @@ -0,0 +1,117 @@ +package leader + +import ( + "context" + "fmt" + "net/url" + "strings" + "testing" + "time" + + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/internal/log" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" +) + +func TestExistingLeaseForURL(t *testing.T) { + // Test that if the a lease exists with the coordinator's URL but the coordinator does not have the lease, neither a leader or follower is created. + // This can occur when a leader fails to renew a lease, and we then try and coordinate while the db still has the lease active. + // If we create a new follower then we can end up in an infinte loop with the follower calling the leader's service which is really a follower. + ctx := log.ContextWithNewDefaultLogger(context.Background()) + key := leases.SystemKey("leader-test") + endpoint, err := url.Parse("http://localhost:1234") + assert.NoError(t, err) + leaser := leases.NewFakeLeaser() + _, _, err = leaser.AcquireLease(ctx, key, time.Second*5, optional.Some[any](endpoint.String())) + if err != nil { + t.Fatal() + } + coordinator := NewCoordinator[interface{}](ctx, + endpoint, + key, + leaser, + time.Second*10, + func(ctx context.Context) (interface{}, error) { + t.Fatal("shouldn't be called") + return nil, nil + }, + func(ctx context.Context, endpoint *url.URL) (interface{}, error) { + t.Fatal("shouldn't be called") + return nil, nil + }, + ) + _, err = coordinator.Get() + assert.Error(t, err) +} + +func TestSingleLeader(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + leaser := leases.NewFakeLeaser() + leaseTTL := time.Second * 5 + leaderFactory := func(ctx context.Context) (string, error) { + return fmt.Sprintf("leader:%v", time.Now()), nil + } + followerFactory := func(ctx context.Context, leaderURL *url.URL) (string, error) { + fmt.Printf("creating follower with leader url: %s\n", leaderURL.String()) + return fmt.Sprintf("following:%s", leaderURL.String()), nil + } + + // create coordinators + coordinators := []*Coordinator[string]{} + for i := range 5 { + advertise, err := url.Parse(fmt.Sprintf("http://localhost:%d", i)) + assert.NoError(t, err) + coordinators = append(coordinators, NewCoordinator[string](ctx, + advertise, + leases.SystemKey("leader-test"), + leaser, + leaseTTL, + leaderFactory, + followerFactory)) + } + + // find leader + leaderIdx, initialLeaderStr := leaderFromCoordinators(t, coordinators) + validateAllFollowTheLeader(t, coordinators, leaderIdx) + + // release the lease the leader is using, to simulate the lease not being able to be renewed by the leader + // a new leader should be elected + err := coordinators[leaderIdx].leader.MustGet().lease.Release() + assert.NoError(t, err) + time.Sleep(leaseTTL + time.Millisecond*500) + + leaderIdx, finalLeaderStr := leaderFromCoordinators(t, coordinators) + assert.NotEqual(t, finalLeaderStr, initialLeaderStr, "leader should have been changed when the lease broke") + validateAllFollowTheLeader(t, coordinators, leaderIdx) +} + +func leaderFromCoordinators(t *testing.T, coordinators []*Coordinator[string]) (idx int, leaderStr string) { + t.Helper() + + leaderIdx := -1 + for i := range 5 { + result, err := coordinators[i].Get() + assert.NoError(t, err) + if strings.HasPrefix(result, "leader:") { + leaderIdx = i + leaderStr = result + } + } + assert.NotEqual(t, -1, leaderIdx) + return idx, leaderStr +} + +func validateAllFollowTheLeader(t *testing.T, coordinators []*Coordinator[string], leaderIdx int) { + t.Helper() + + for i := range 5 { + if leaderIdx == i { + // known leader + continue + } + result, err := coordinators[i].Get() + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("following:%s", coordinators[leaderIdx].advertise), result) + } +} diff --git a/backend/controller/leases/fake_lease.go b/backend/controller/leases/fake_lease.go index 4660675e7c..4672229e66 100644 --- a/backend/controller/leases/fake_lease.go +++ b/backend/controller/leases/fake_lease.go @@ -3,6 +3,7 @@ package leases import ( "context" "fmt" + "reflect" "time" "github.com/alecthomas/types/optional" @@ -11,7 +12,7 @@ import ( func NewFakeLeaser() *FakeLeaser { return &FakeLeaser{ - leases: xsync.NewMapOf[string, struct{}](), + leases: xsync.NewMapOf[string, *FakeLease](), } } @@ -19,32 +20,57 @@ var _ Leaser = (*FakeLeaser)(nil) // FakeLeaser is a fake implementation of the [Leaser] interface. type FakeLeaser struct { - leases *xsync.MapOf[string, struct{}] + leases *xsync.MapOf[string, *FakeLease] } func (f *FakeLeaser) AcquireLease(ctx context.Context, key Key, ttl time.Duration, metadata optional.Option[any]) (Lease, context.Context, error) { - if _, loaded := f.leases.LoadOrStore(key.String(), struct{}{}); loaded { - return nil, nil, ErrConflict - } leaseCtx, cancelCtx := context.WithCancel(ctx) - return &FakeLease{ + newLease := &FakeLease{ leaser: f, key: key, + metadata: metadata, cancelCtx: cancelCtx, - }, leaseCtx, nil + ttl: ttl, + } + if _, loaded := f.leases.LoadOrStore(key.String(), newLease); loaded { + cancelCtx() + return nil, nil, ErrConflict + } + return newLease, leaseCtx, nil } func (f *FakeLeaser) GetLeaseInfo(ctx context.Context, key Key, metadata any) (expiry time.Time, err error) { - if _, ok := f.leases.Load(key.String()); ok { - return time.Now().Add(time.Minute), nil + lease, ok := f.leases.Load(key.String()) + if !ok { + return time.Time{}, fmt.Errorf("not found") + } + expiry = time.Now().Add(lease.ttl) + md, ok := lease.metadata.Get() + if !ok || metadata == nil { + // no need to parse metadata + return + } + metaValue := reflect.ValueOf(metadata) + if metaValue.Kind() != reflect.Ptr || metaValue.IsNil() { + return time.Time{}, fmt.Errorf("metadata must be a non-nil pointer") + } + if !metaValue.Elem().CanSet() { + return time.Time{}, fmt.Errorf("cannot set metadata value") + } + mdValue := reflect.ValueOf(md) + if mdValue.Type() != metaValue.Elem().Type() { + return time.Time{}, fmt.Errorf("type mismatch between metadata and md") } - return time.Time{}, fmt.Errorf("not found") + metaValue.Elem().Set(mdValue) + return } type FakeLease struct { leaser *FakeLeaser key Key cancelCtx context.CancelFunc + metadata optional.Option[any] + ttl time.Duration } func (f *FakeLease) Release() error {