Skip to content

Commit

Permalink
fix: don't log at error level in lease timeout (#2151)
Browse files Browse the repository at this point in the history
We don't want error alerts if a controller is just failing over, this PR
adds some infrasturcture to allow for a lower level of error reporting
when the errors all occur within the lease TTL.

fixes #2133
  • Loading branch information
stuartwdouglas authored Aug 12, 2024
1 parent d6d432d commit 1fa898f
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 18 deletions.
68 changes: 68 additions & 0 deletions backend/controller/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,71 @@ func (c *Coordinator[P]) retireFollower() {
f.cancelCtx()
c.follower = optional.None[*follower[P]]()
}

// ErrorFilter allows uses of leases to decide if an error might be due to the master falling over,
// or is something else that will not resolve itself after the TTL
type ErrorFilter struct {
leaseTTL time.Duration
// Error reporting utilities
// If a controller has failed over we don't want error logs while we are waiting for the lease to expire.
// This records error state and allows us to filter errors until we are past lease timeout
// and only reports the error if it persists
// firstErrorTime is the time of the first error, used to lower log levels if the errors all occur within a lease window
firstErrorTime optional.Option[time.Time]
recordedSuccessTime optional.Option[time.Time]

// errorMutex protects firstErrorTime
errorMutex sync.Mutex
}

func NewErrorFilter(leaseTTL time.Duration) *ErrorFilter {
return &ErrorFilter{
errorMutex: sync.Mutex{},
leaseTTL: leaseTTL,
}
}

// ReportLeaseError reports that an operation that relies on the leader being up has failed
// If this is either the first report or the error is within the lease timeout duration from
// the time of the first report it will return false, indicating that this may be a transient error
// If it returns true then the error has persisted over the length of a lease, and is probably serious
// this will also return true if some operations are succeeding and some are failing, indicating a non-lease
// related transient error
func (c *ErrorFilter) ReportLeaseError() bool {
c.errorMutex.Lock()
defer c.errorMutex.Unlock()
errorTime, ok := c.firstErrorTime.Get()
if !ok {
c.firstErrorTime = optional.Some(time.Now())
return false
}
// We have seen a success recorded, and a previous error
// within the lease timeout, this indicates transient errors are happening
if c.recordedSuccessTime.Ok() {
return true
}
if errorTime.Add(c.leaseTTL).After(time.Now()) {
// Within the lease window, it will probably be resolved when a new leader is elected
return false
}
return true
}

// ReportOperationSuccess reports that an operation that relies on the leader being up has succeeded
// it is used to decide if an error is transient and will be fixed with a new leader, or if the error is persistent
func (c *ErrorFilter) ReportOperationSuccess() {
c.errorMutex.Lock()
defer c.errorMutex.Unlock()
errorTime, ok := c.firstErrorTime.Get()
if !ok {
// Normal operation, no errors
return
}
if errorTime.Add(c.leaseTTL).After(time.Now()) {
c.recordedSuccessTime = optional.Some(time.Now())
} else {
// Outside the lease window, clear our state
c.recordedSuccessTime = optional.None[time.Time]()
c.firstErrorTime = optional.None[time.Time]()
}
}
27 changes: 27 additions & 0 deletions backend/controller/leader/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,33 @@ func TestSingleLeader(t *testing.T) {
ctxSlicesLock.Unlock()
}

func TestLeaseErrorFilteringPersistentError(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
assert.False(t, filter.ReportLeaseError(), "third error should be filtered")
time.Sleep(time.Millisecond * 201)
assert.True(t, filter.ReportLeaseError(), "third error should be reported")
}

func TestLeaseErrorFilteringTransientError(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
assert.False(t, filter.ReportLeaseError(), "third error should be filtered")
time.Sleep(time.Millisecond * 201)
filter.ReportOperationSuccess()
assert.False(t, filter.ReportLeaseError(), "first error again")
}

func TestLeaseErrorTransientErrors(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
filter.ReportOperationSuccess()
assert.True(t, filter.ReportLeaseError(), "success and failure within the TTL should be reported")
}

func leaderFromCoordinators(t *testing.T, coordinators []*Coordinator[string]) (leaderIdx int, leaderStr string) {
t.Helper()

Expand Down
21 changes: 11 additions & 10 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client,
return override, nil
}
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
return newASMFollower(rpcClient, url.String()), nil
return newASMFollower(rpcClient, url.String(), time.Second*10), nil
}
coordinator := leader.NewCoordinator[asmClient](
ctx,
advertise,
leases.SystemKey("asm"),
leaser,
time.Second*10,
leaderFactory,
followerFactory,
)
return &ASM{
coordinator: leader.NewCoordinator[asmClient](
ctx,
advertise,
leases.SystemKey("asm"),
leaser,
time.Second*10,
leaderFactory,
followerFactory,
),
coordinator: coordinator,
}
}

Expand Down
26 changes: 19 additions & 7 deletions common/configuration/asm_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,35 @@ package configuration

import (
"context"
"errors"
"fmt"
"net/url"
"time"

"connectrpc.com/connect"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/leader"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
)

const asmFollowerSyncInterval = time.Second * 10

// asmFollower uses AdminService to get/set secrets from the leader
type asmFollower struct {
leaderName string

errorFilter *leader.ErrorFilter
leaderName string
// client requests/responses use unobfuscated values
client ftlv1connect.AdminServiceClient
}

var _ asmClient = &asmFollower{}

func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string) *asmFollower {
func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string, leaseTTL time.Duration) *asmFollower {
f := &asmFollower{
leaderName: leaderName,
client: rpcClient,
errorFilter: leader.NewErrorFilter(leaseTTL),
leaderName: leaderName,
client: rpcClient,
}
return f
}
Expand All @@ -43,6 +45,7 @@ func (f *asmFollower) syncInterval() time.Duration {

func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
// values must store obfuscated values, but f.client gives unobfuscated values
logger := log.FromContext(ctx)
obfuscator := Secrets{}.obfuscator()
module := ""
includeValues := true
Expand All @@ -51,8 +54,17 @@ func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedV
IncludeValues: &includeValues,
}))
if err != nil {
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
if connectErr.Code() == connect.CodeInternal || connectErr.Code() == connect.CodeUnavailable {
if !f.errorFilter.ReportLeaseError() {
logger.Warnf("error getting secrets list from leader, possible leader failover %s", err.Error())
return nil
}
}
}
return fmt.Errorf("error getting secrets list from leader: %w", err)
}
f.errorFilter.ReportOperationSuccess()
visited := map[Ref]bool{}
for _, s := range resp.Msg.Secrets {
ref, err := ParseRef(s.RefPath)
Expand Down
3 changes: 2 additions & 1 deletion common/configuration/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"sort"
"testing"
"time"

"github.com/TBD54566975/ftl/backend/controller/leases"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -177,7 +178,7 @@ func TestFollowerSync(t *testing.T) {

// fakeRPCClient connects the follower to the leader
fakeRPCClient := &fakeAdminClient{sm: leaderManager}
follower := newASMFollower(fakeRPCClient, "fake")
follower := newASMFollower(fakeRPCClient, "fake", time.Second)

followerASM := newASMForTesting(ctx, externalClient, URL("http://localhost:1235"), leaser, optional.Some[asmClient](follower))
asmClient, err := followerASM.coordinator.Get()
Expand Down

0 comments on commit 1fa898f

Please sign in to comment.