Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve lease log level and include key #2163

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions go-runtime/ftl/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,63 @@ import (
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/modulecontext"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/alecthomas/types/optional"
)

// ErrLeaseHeld is returned when an attempt is made to acquire a lease that is
// already held.
var ErrLeaseHeld = fmt.Errorf("lease already held")

type leaseState struct {
// mutex must be locked to access other fields.
mutex *sync.Mutex

// open is true if the lease has not been released and no error has occurred.
open bool
err optional.Option[error]
}

type LeaseHandle struct {
client modulecontext.LeaseClient
module string
key []string
errMu *sync.Mutex
err error
state *leaseState
}

// Err returns an error if the lease heartbeat fails.
func (l LeaseHandle) Err() error {
l.errMu.Lock()
defer l.errMu.Unlock()
return l.err
l.state.mutex.Lock()
defer l.state.mutex.Unlock()
if err, ok := l.state.err.Get(); ok {
return err //nolint:wrapcheck
}
return nil
}

// loggingKey mimics the full lease key for logging purposes.
// This helps us identify the lease in logs across runners and controllers.
func leaseKeyForLogs(module string, key []string) string {
components := []string{"module", module}
components = append(components, key...)
return strings.Join(components, "/")
}

// Release attempts to release the lease.
//
// Will return an error if the heartbeat failed. In this situation there are no
// guarantees that the lease was held to completion.
func (l LeaseHandle) Release() error {
l.errMu.Lock()
defer l.errMu.Unlock()
l.state.mutex.Lock()
defer l.state.mutex.Unlock()
if !l.state.open {
return l.Err()
}
err := l.client.Release(context.Background(), l.key)
if err != nil {
return err
}
return l.err
l.state.open = false
return nil
}

// Lease acquires a new exclusive [lease] on a resource uniquely identified by [key].
Expand All @@ -67,32 +92,45 @@ func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle,
client := newClient(ctx)

module := reflection.Module()
logger.Tracef("Acquiring lease")
logger.Tracef("Acquiring lease: %s", leaseKeyForLogs(module, key))
err := client.Acquire(ctx, module, key, ttl)
if err != nil {
if errors.Is(err, ErrLeaseHeld) {
return LeaseHandle{}, ErrLeaseHeld
}
logger.Warnf("Lease acquisition failed: %s", err)
logger.Warnf("Lease acquisition failed for %s: %s", leaseKeyForLogs(module, key), err)
return LeaseHandle{}, err
}
lease := LeaseHandle{
module: module,
key: key,
client: client,
state: &leaseState{
open: true,
mutex: &sync.Mutex{},
},
}

lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, client: client}
// Heartbeat the lease.
go func() {
for {
logger.Tracef("Heartbeating lease")
logger.Tracef("Heartbeating lease: %s", leaseKeyForLogs(module, key))
err := client.Heartbeat(ctx, module, key, ttl)
if err == nil {
time.Sleep(ttl / 2)
continue
}
logger.Warnf("Lease heartbeat terminated: %s", err)

lease.state.mutex.Lock()
defer lease.state.mutex.Unlock()
if !lease.state.open {
logger.Tracef("Lease heartbeat terminated for %s after being released", leaseKeyForLogs(module, key))
return
}
// Notify the handle.
lease.errMu.Lock()
lease.err = err
lease.errMu.Unlock()
logger.Warnf("Lease heartbeat terminated for %s: %s", leaseKeyForLogs(module, key), err)
lease.state.open = false
lease.state.err = optional.Some(err)
return
}
}()
Expand Down
Loading