From deffcbacc92d48953528dfbbb24e4ba2c9a64320 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 25 Jul 2024 11:42:09 +1000 Subject: [PATCH 1/2] fix: improve lease log level and include key --- go-runtime/ftl/leases.go | 70 +++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/go-runtime/ftl/leases.go b/go-runtime/ftl/leases.go index 5ebb5dc34b..70d01b6109 100644 --- a/go-runtime/ftl/leases.go +++ b/go-runtime/ftl/leases.go @@ -18,24 +18,45 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/modulecontext" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/alecthomas/types/optional" ) // ErrLeaseHeld is returned when an attempt is made to acquire a lease that is // already held. var ErrLeaseHeld = fmt.Errorf("lease already held") +type leaseState struct { + // mutex must be locked to access other fields. + mutex *sync.Mutex + + // open is true if the lease has not been released and no error has occurred. + open bool + err optional.Option[error] +} + type LeaseHandle struct { client modulecontext.LeaseClient + module string key []string - errMu *sync.Mutex - err error + state *leaseState } // Err returns an error if the lease heartbeat fails. func (l LeaseHandle) Err() error { - l.errMu.Lock() - defer l.errMu.Unlock() - return l.err + l.state.mutex.Lock() + defer l.state.mutex.Unlock() + if err, ok := l.state.err.Get(); ok { + return err + } + return nil +} + +// loggingKey mimics the full lease key for logging purposes. +// This helps us identify the lease in logs across runners and controllers. +func leaseKeyForLogs(module string, key []string) string { + components := []string{"module", module} + components = append(components, key...) + return strings.Join(components, "/") } // Release attempts to release the lease. @@ -43,13 +64,17 @@ func (l LeaseHandle) Err() error { // Will return an error if the heartbeat failed. In this situation there are no // guarantees that the lease was held to completion. func (l LeaseHandle) Release() error { - l.errMu.Lock() - defer l.errMu.Unlock() + l.state.mutex.Lock() + defer l.state.mutex.Unlock() + if !l.state.open { + return l.Err() + } err := l.client.Release(context.Background(), l.key) if err != nil { return err } - return l.err + l.state.open = false + return nil } // Lease acquires a new exclusive [lease] on a resource uniquely identified by [key]. @@ -67,32 +92,45 @@ func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle, client := newClient(ctx) module := reflection.Module() - logger.Tracef("Acquiring lease") + logger.Tracef("Acquiring lease: %s", leaseKeyForLogs(module, key)) err := client.Acquire(ctx, module, key, ttl) if err != nil { if errors.Is(err, ErrLeaseHeld) { return LeaseHandle{}, ErrLeaseHeld } - logger.Warnf("Lease acquisition failed: %s", err) + logger.Warnf("Lease acquisition failed for %s: %s", leaseKeyForLogs(module, key), err) return LeaseHandle{}, err } + lease := LeaseHandle{ + module: module, + key: key, + client: client, + state: &leaseState{ + open: true, + mutex: &sync.Mutex{}, + }, + } - lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, client: client} // Heartbeat the lease. go func() { for { - logger.Tracef("Heartbeating lease") + logger.Tracef("Heartbeating lease: %s", leaseKeyForLogs(module, key)) err := client.Heartbeat(ctx, module, key, ttl) if err == nil { time.Sleep(ttl / 2) continue } - logger.Warnf("Lease heartbeat terminated: %s", err) + lease.state.mutex.Lock() + defer lease.state.mutex.Unlock() + if !lease.state.open { + logger.Tracef("Lease heartbeat terminated for %s after being released", leaseKeyForLogs(module, key)) + return + } // Notify the handle. - lease.errMu.Lock() - lease.err = err - lease.errMu.Unlock() + logger.Warnf("Lease heartbeat terminated for %s: %s", leaseKeyForLogs(module, key), err) + lease.state.open = false + lease.state.err = optional.Some(err) return } }() From c2937a8bc33809092f0c3ba1ebbe828d89ee49e6 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 25 Jul 2024 11:59:50 +1000 Subject: [PATCH 2/2] fix lint --- go-runtime/ftl/leases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-runtime/ftl/leases.go b/go-runtime/ftl/leases.go index 70d01b6109..6bdcf15e75 100644 --- a/go-runtime/ftl/leases.go +++ b/go-runtime/ftl/leases.go @@ -46,7 +46,7 @@ func (l LeaseHandle) Err() error { l.state.mutex.Lock() defer l.state.mutex.Unlock() if err, ok := l.state.err.Get(); ok { - return err + return err //nolint:wrapcheck } return nil }