Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't log at error level in lease timeout #2151

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions backend/controller/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,71 @@ func (c *Coordinator[P]) retireFollower() {
f.cancelCtx()
c.follower = optional.None[*follower[P]]()
}

// ErrorFilter allows uses of leases to decide if an error might be due to the master falling over,
// or is something else that will not resolve itself after the TTL
type ErrorFilter struct {
leaseTTL time.Duration
// Error reporting utilities
// If a controller has failed over we don't want error logs while we are waiting for the lease to expire.
// This records error state and allows us to filter errors until we are past lease timeout
// and only reports the error if it persists
// firstErrorTime is the time of the first error, used to lower log levels if the errors all occur within a lease window
firstErrorTime optional.Option[time.Time]
recordedSuccessTime optional.Option[time.Time]

// errorMutex protects firstErrorTime
errorMutex sync.Mutex
}

func NewErrorFilter(leaseTTL time.Duration) *ErrorFilter {
return &ErrorFilter{
errorMutex: sync.Mutex{},
leaseTTL: leaseTTL,
}
}

// ReportLeaseError reports that an operation that relies on the leader being up has failed
// If this is either the first report or the error is within the lease timeout duration from
// the time of the first report it will return false, indicating that this may be a transient error
// If it returns true then the error has persisted over the length of a lease, and is probably serious
// this will also return true if some operations are succeeding and some are failing, indicating a non-lease
// related transient error
func (c *ErrorFilter) ReportLeaseError() bool {
c.errorMutex.Lock()
defer c.errorMutex.Unlock()
errorTime, ok := c.firstErrorTime.Get()
if !ok {
c.firstErrorTime = optional.Some(time.Now())
return false
}
// We have seen a success recorded, and a previous error
// within the lease timeout, this indicates transient errors are happening
if c.recordedSuccessTime.Ok() {
return true
}
if errorTime.Add(c.leaseTTL).After(time.Now()) {
// Within the lease window, it will probably be resolved when a new leader is elected
return false
}
return true
}

// ReportOperationSuccess reports that an operation that relies on the leader being up has succeeded
// it is used to decide if an error is transient and will be fixed with a new leader, or if the error is persistent
func (c *ErrorFilter) ReportOperationSuccess() {
c.errorMutex.Lock()
defer c.errorMutex.Unlock()
errorTime, ok := c.firstErrorTime.Get()
if !ok {
// Normal operation, no errors
return
}
if errorTime.Add(c.leaseTTL).After(time.Now()) {
c.recordedSuccessTime = optional.Some(time.Now())
} else {
// Outside the lease window, clear our state
c.recordedSuccessTime = optional.None[time.Time]()
c.firstErrorTime = optional.None[time.Time]()
}
}
27 changes: 27 additions & 0 deletions backend/controller/leader/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,33 @@ func TestSingleLeader(t *testing.T) {
ctxSlicesLock.Unlock()
}

func TestLeaseErrorFilteringPersistentError(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
assert.False(t, filter.ReportLeaseError(), "third error should be filtered")
time.Sleep(time.Millisecond * 201)
assert.True(t, filter.ReportLeaseError(), "third error should be reported")
}

func TestLeaseErrorFilteringTransientError(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
assert.False(t, filter.ReportLeaseError(), "third error should be filtered")
time.Sleep(time.Millisecond * 201)
filter.ReportOperationSuccess()
assert.False(t, filter.ReportLeaseError(), "first error again")
}

func TestLeaseErrorTransientErrors(t *testing.T) {
filter := NewErrorFilter(time.Millisecond * 200)
assert.False(t, filter.ReportLeaseError(), "first error should be filtered")
assert.False(t, filter.ReportLeaseError(), "second error should be filtered")
filter.ReportOperationSuccess()
assert.True(t, filter.ReportLeaseError(), "success and failure within the TTL should be reported")
}

func leaderFromCoordinators(t *testing.T, coordinators []*Coordinator[string]) (leaderIdx int, leaderStr string) {
t.Helper()

Expand Down
21 changes: 11 additions & 10 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client,
return override, nil
}
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
return newASMFollower(rpcClient, url.String()), nil
return newASMFollower(rpcClient, url.String(), time.Second*10), nil
}
coordinator := leader.NewCoordinator[asmClient](
ctx,
advertise,
leases.SystemKey("asm"),
leaser,
time.Second*10,
leaderFactory,
followerFactory,
)
return &ASM{
coordinator: leader.NewCoordinator[asmClient](
ctx,
advertise,
leases.SystemKey("asm"),
leaser,
time.Second*10,
leaderFactory,
followerFactory,
),
coordinator: coordinator,
}
}

Expand Down
26 changes: 19 additions & 7 deletions common/configuration/asm_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,35 @@ package configuration

import (
"context"
"errors"
"fmt"
"net/url"
"time"

"connectrpc.com/connect"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/leader"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
)

const asmFollowerSyncInterval = time.Second * 10

// asmFollower uses AdminService to get/set secrets from the leader
type asmFollower struct {
leaderName string

errorFilter *leader.ErrorFilter
leaderName string
// client requests/responses use unobfuscated values
client ftlv1connect.AdminServiceClient
}

var _ asmClient = &asmFollower{}

func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string) *asmFollower {
func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string, leaseTTL time.Duration) *asmFollower {
f := &asmFollower{
leaderName: leaderName,
client: rpcClient,
errorFilter: leader.NewErrorFilter(leaseTTL),
leaderName: leaderName,
client: rpcClient,
}
return f
}
Expand All @@ -43,6 +45,7 @@ func (f *asmFollower) syncInterval() time.Duration {

func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
// values must store obfuscated values, but f.client gives unobfuscated values
logger := log.FromContext(ctx)
obfuscator := Secrets{}.obfuscator()
module := ""
includeValues := true
Expand All @@ -51,8 +54,17 @@ func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedV
IncludeValues: &includeValues,
}))
if err != nil {
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
if connectErr.Code() == connect.CodeInternal || connectErr.Code() == connect.CodeUnavailable {
if !f.errorFilter.ReportLeaseError() {
logger.Warnf("error getting secrets list from leader, possible leader failover %s", err.Error())
return nil
}
}
}
return fmt.Errorf("error getting secrets list from leader: %w", err)
}
f.errorFilter.ReportOperationSuccess()
visited := map[Ref]bool{}
for _, s := range resp.Msg.Secrets {
ref, err := ParseRef(s.RefPath)
Expand Down
3 changes: 2 additions & 1 deletion common/configuration/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"sort"
"testing"
"time"

"connectrpc.com/connect"
"github.com/TBD54566975/ftl/backend/controller/leases"
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestFollowerSync(t *testing.T) {

// fakeRPCClient connects the follower to the leader
fakeRPCClient := &fakeAdminClient{sm: leaderManager}
follower := newASMFollower(fakeRPCClient, "fake")
follower := newASMFollower(fakeRPCClient, "fake", time.Second)

followerASM := newASMForTesting(ctx, externalClient, URL("http://localhost:1235"), leaser, optional.Some[asmClient](follower))
asmClient, err := followerASM.coordinator.Get()
Expand Down
Loading