Skip to content

Commit

Permalink
clean up coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 19, 2024
1 parent b4fdae3 commit b4bb735
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 46 deletions.
102 changes: 71 additions & 31 deletions backend/controller/leader/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,63 +10,74 @@ import (

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/types/optional"
)

// LeaderFactory is a function that is called whenever a new leader is acquired.
//
// The context provided is tied to the lease and will be cancelled when the leader is no longer leading.
type LeaderFactory[P any] func(ctx context.Context) (P, error)

// FollowerFactory is a function that is called whenever we follow a new leader.
//
// If the new leader has the same url as the previous leader, the existing follower will be used.
type FollowerFactory[P any] func(ctx context.Context, url *url.URL) (P, error)

type leader[P any] struct {
value P
lease leases.Lease
}

type follower[P any] struct {
value P
deadline time.Time
url *url.URL
}

// ActivatedLeader is an optional interface that can be implemented by the leader to be notified when it becomes the leader.
type ActivatedLeader interface {
ActivateLeader(ctx context.Context)
}

// Coordinator assigns a single leader for the rest to follow.
// It uses a lease to ensure that only one leader is active at a time.
type Coordinator[P any] struct {
// context used for leases and follower factory
// ctx is passed into the follower factory and is the parent context of leader's lease context
// it is captured at the time of Coordinator creation as the context when getting may be short lived
ctx context.Context

leaser leases.Leaser
advertise *url.URL
key leases.Key
leaser leases.Leaser
leaseTTL time.Duration

// leader is active if lease is present
lease optional.Option[leases.Lease]
leader P
// leader is active leader value is set
leaderFactory LeaderFactory[P]
leader optional.Option[leader[P]]

followerFactory FollowerFactory[P]
follower optional.Option[*follower[P]]

// mutex protects leader and follower coordination
mutex sync.Mutex
}

func New[P any](ctx context.Context,
func NewCoordinator[P any](ctx context.Context,
advertise *url.URL,
key leases.Key,
leaser leases.Leaser,
leaseTTL time.Duration,
leader P,
leaderFactory LeaderFactory[P],
followerFactory FollowerFactory[P]) *Coordinator[P] {
coordinator := &Coordinator[P]{
ctx: ctx,
leaser: leaser,
leaseTTL: leaseTTL,
key: key,
advertise: advertise,
leader: leader,
leaderFactory: leaderFactory,
followerFactory: followerFactory,
}

// Attempt to coordinate proactively without blocking
go func() {
_, _ = coordinator.Get() //nolint:errcheck
}()

return coordinator
}

Expand All @@ -76,50 +87,79 @@ func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) {
c.mutex.Lock()
defer c.mutex.Unlock()

if _, ok := c.lease.Get(); ok {
logger := log.FromContext(c.ctx)
if l, ok := c.leader.Get(); ok {
// currently leading
return c.leader, nil
return l.value, nil
}
if f, ok := c.follower.Get(); ok && time.Now().Before(f.deadline) {
// currently following
return f.value, nil
}

lease, leaseErr := c.leaser.AcquireLease(c.ctx, c.key, c.leaseTTL, optional.Some[any](c.advertise.String()))
lease, leaderCtx, leaseErr := c.leaser.AcquireLease(c.ctx, c.key, c.leaseTTL, optional.Some[any](c.advertise.String()))
if leaseErr == nil {
// became leader
c.lease = optional.Some(lease)
if activatable, ok := any(c.leader).(ActivatedLeader); ok {
activatable.ActivateLeader(c.ctx)
l, err := c.leaderFactory(leaderCtx)
if err != nil {
lease.Release()

Check failure on line 105 in backend/controller/leader/coordinator.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `lease.Release` is not checked (errcheck)
return leaderOrFollower, fmt.Errorf("could not create leader for %s: %w", c.key, err)
}
return c.leader, nil
c.leader = optional.Some(leader[P]{
lease: lease,
value: l,
})
go func() {
c.watchForLeaderExpiration(leaderCtx)
}()
logger.Tracef("new leader for %s: %s", c.key, c.advertise)
return l, nil
}

if !errors.Is(leaseErr, dal.ErrConflict) {
return leaderOrFollower, fmt.Errorf("could not acquire lease for %s: %w", c.key, leaseErr)
}
// lease already held
return c.createFollower()
}

// watchForLeaderExpiration will remove the leader when the lease's context is cancelled due to failure to heartbeat the lease
func (c *Coordinator[P]) watchForLeaderExpiration(ctx context.Context) {
<-ctx.Done()

logger := log.FromContext(c.ctx)
logger.Warnf("removing leader for %s", c.key)

c.mutex.Lock()
c.leader = optional.None[leader[P]]()
c.mutex.Unlock()
}

func (c *Coordinator[P]) createFollower() (out P, err error) {
var urlString string
expiry, err := c.leaser.GetLeaseInfo(c.ctx, c.key, &urlString)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
return leaderOrFollower, fmt.Errorf("could not acquire or find lease for %s", c.key)
return out, fmt.Errorf("could not acquire or find lease for %s", c.key)
}
return leaderOrFollower, fmt.Errorf("could not get lease for %s: %w", c.key, err)
return out, fmt.Errorf("could not get lease for %s: %w", c.key, err)
}
url, err := url.Parse(urlString)
if err != nil {
return leaderOrFollower, fmt.Errorf("could not parse leader url for %s: %w", c.key, err)
if urlString == c.advertise.String() {
// This prevents endless loops after a lease breaks.
// If we create a follower pointing locally, the receiver will likely try to then call the leader, which starts the loop again.
return out, fmt.Errorf("could not follow %s leader at own url: %s", c.key, urlString)
}
// check if url matches existing follower's url, just with newer deadline
if f, ok := c.follower.Get(); ok && f.url.String() == url.String() {
if f, ok := c.follower.Get(); ok && f.url.String() == urlString {
f.deadline = expiry
return f.value, nil
}

url, err := url.Parse(urlString)
if err != nil {
return out, fmt.Errorf("could not parse leader url for %s: %w", c.key, err)
}
f, err := c.followerFactory(c.ctx, url)
if err != nil {
return leaderOrFollower, fmt.Errorf("could not generate follower for %s: %w", c.key, err)
return out, fmt.Errorf("could not generate follower for %s: %w", c.key, err)
}
c.follower = optional.Some(&follower[P]{
value: f,
Expand Down
19 changes: 12 additions & 7 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,23 @@ var _ Resolver[Secrets] = &ASM{}
var _ Provider[Secrets] = &ASM{}

func NewASM(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser) *ASM {
lead := newASMLeader(secretsClient)
leaderFactory := func(ctx context.Context) (asmClient, error) {
return newASMLeader(ctx, secretsClient), nil
}
followerFactory := func(ctx context.Context, url *url.URL) (client asmClient, err error) {
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
return &asmFollower{client: rpcClient}, nil
}
return &ASM{
coordinator: leader.New[asmClient](ctx,
coordinator: leader.NewCoordinator[asmClient](
ctx,
advertise,
leases.SystemKey("asm"),
leaser,
time.Second*10,
lead,
func(ctx context.Context, url *url.URL) (client asmClient, err error) {
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
return &asmFollower{client: rpcClient}, nil
}),
leaderFactory,
followerFactory,
),
}
}

Expand Down
11 changes: 3 additions & 8 deletions common/configuration/asm_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/alecthomas/atomic"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/leader"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
Expand Down Expand Up @@ -61,21 +60,17 @@ type asmLeader struct {
}

var _ asmClient = &asmLeader{}
var _ leader.ActivatedLeader = &asmLeader{}

func newASMLeader(client *secretsmanager.Client) *asmLeader {
return &asmLeader{
func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader {
l := &asmLeader{
client: client,
secrets: xsync.NewMapOf[Ref, asmCacheValue](),
topic: pubsub.New[updateSecretEvent](),
}
}

// ActivateLeader is called when this controller becomes the leader
func (l *asmLeader) ActivateLeader(ctx context.Context) {
go func() {
l.watchForUpdates(ctx)
}()
return l
}

func (l *asmLeader) watchForUpdates(ctx context.Context) {
Expand Down

0 comments on commit b4bb735

Please sign in to comment.