diff --git a/backend/controller/leader/leader.go b/backend/controller/leader/leader.go index a12b6f018a..643adc327e 100644 --- a/backend/controller/leader/leader.go +++ b/backend/controller/leader/leader.go @@ -220,3 +220,71 @@ func (c *Coordinator[P]) retireFollower() { f.cancelCtx() c.follower = optional.None[*follower[P]]() } + +// ErrorFilter allows uses of leases to decide if an error might be due to the master falling over, +// or is something else that will not resolve itself after the TTL +type ErrorFilter struct { + leaseTTL time.Duration + // Error reporting utilities + // If a controller has failed over we don't want error logs while we are waiting for the lease to expire. + // This records error state and allows us to filter errors until we are past lease timeout + // and only reports the error if it persists + // firstErrorTime is the time of the first error, used to lower log levels if the errors all occur within a lease window + firstErrorTime optional.Option[time.Time] + recordedSuccessTime optional.Option[time.Time] + + // errorMutex protects firstErrorTime + errorMutex sync.Mutex +} + +func NewErrorFilter(leaseTTL time.Duration) *ErrorFilter { + return &ErrorFilter{ + errorMutex: sync.Mutex{}, + leaseTTL: leaseTTL, + } +} + +// ReportLeaseError reports that an operation that relies on the leader being up has failed +// If this is either the first report or the error is within the lease timeout duration from +// the time of the first report it will return false, indicating that this may be a transient error +// If it returns true then the error has persisted over the length of a lease, and is probably serious +// this will also return true if some operations are succeeding and some are failing, indicating a non-lease +// related transient error +func (c *ErrorFilter) ReportLeaseError() bool { + c.errorMutex.Lock() + defer c.errorMutex.Unlock() + errorTime, ok := c.firstErrorTime.Get() + if !ok { + c.firstErrorTime = optional.Some(time.Now()) + return false + } + // We have seen a success recorded, and a previous error + // within the lease timeout, this indicates transient errors are happening + if c.recordedSuccessTime.Ok() { + return true + } + if errorTime.Add(c.leaseTTL).After(time.Now()) { + // Within the lease window, it will probably be resolved when a new leader is elected + return false + } + return true +} + +// ReportOperationSuccess reports that an operation that relies on the leader being up has succeeded +// it is used to decide if an error is transient and will be fixed with a new leader, or if the error is persistent +func (c *ErrorFilter) ReportOperationSuccess() { + c.errorMutex.Lock() + defer c.errorMutex.Unlock() + errorTime, ok := c.firstErrorTime.Get() + if !ok { + // Normal operation, no errors + return + } + if errorTime.Add(c.leaseTTL).After(time.Now()) { + c.recordedSuccessTime = optional.Some(time.Now()) + } else { + // Outside the lease window, clear our state + c.recordedSuccessTime = optional.None[time.Time]() + c.firstErrorTime = optional.None[time.Time]() + } +} diff --git a/backend/controller/leader/leader_test.go b/backend/controller/leader/leader_test.go index 7d239b8a41..dd6f9ef295 100644 --- a/backend/controller/leader/leader_test.go +++ b/backend/controller/leader/leader_test.go @@ -125,6 +125,33 @@ func TestSingleLeader(t *testing.T) { ctxSlicesLock.Unlock() } +func TestLeaseErrorFilteringPersistentError(t *testing.T) { + filter := NewErrorFilter(time.Millisecond * 200) + assert.False(t, filter.ReportLeaseError(), "first error should be filtered") + assert.False(t, filter.ReportLeaseError(), "second error should be filtered") + assert.False(t, filter.ReportLeaseError(), "third error should be filtered") + time.Sleep(time.Millisecond * 201) + assert.True(t, filter.ReportLeaseError(), "third error should be reported") +} + +func TestLeaseErrorFilteringTransientError(t *testing.T) { + filter := NewErrorFilter(time.Millisecond * 200) + assert.False(t, filter.ReportLeaseError(), "first error should be filtered") + assert.False(t, filter.ReportLeaseError(), "second error should be filtered") + assert.False(t, filter.ReportLeaseError(), "third error should be filtered") + time.Sleep(time.Millisecond * 201) + filter.ReportOperationSuccess() + assert.False(t, filter.ReportLeaseError(), "first error again") +} + +func TestLeaseErrorTransientErrors(t *testing.T) { + filter := NewErrorFilter(time.Millisecond * 200) + assert.False(t, filter.ReportLeaseError(), "first error should be filtered") + assert.False(t, filter.ReportLeaseError(), "second error should be filtered") + filter.ReportOperationSuccess() + assert.True(t, filter.ReportLeaseError(), "success and failure within the TTL should be reported") +} + func leaderFromCoordinators(t *testing.T, coordinators []*Coordinator[string]) (leaderIdx int, leaderStr string) { t.Helper() diff --git a/common/configuration/asm.go b/common/configuration/asm.go index 4136409a2a..bcab586a00 100644 --- a/common/configuration/asm.go +++ b/common/configuration/asm.go @@ -52,18 +52,19 @@ func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client, return override, nil } rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error) - return newASMFollower(rpcClient, url.String()), nil + return newASMFollower(rpcClient, url.String(), time.Second*10), nil } + coordinator := leader.NewCoordinator[asmClient]( + ctx, + advertise, + leases.SystemKey("asm"), + leaser, + time.Second*10, + leaderFactory, + followerFactory, + ) return &ASM{ - coordinator: leader.NewCoordinator[asmClient]( - ctx, - advertise, - leases.SystemKey("asm"), - leaser, - time.Second*10, - leaderFactory, - followerFactory, - ), + coordinator: coordinator, } } diff --git a/common/configuration/asm_follower.go b/common/configuration/asm_follower.go index 60346483b7..de4710624a 100644 --- a/common/configuration/asm_follower.go +++ b/common/configuration/asm_follower.go @@ -2,6 +2,7 @@ package configuration import ( "context" + "errors" "fmt" "net/url" "time" @@ -9,26 +10,27 @@ import ( "connectrpc.com/connect" "github.com/puzpuzpuz/xsync/v3" + "github.com/TBD54566975/ftl/backend/controller/leader" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/internal/log" ) const asmFollowerSyncInterval = time.Second * 10 // asmFollower uses AdminService to get/set secrets from the leader type asmFollower struct { - leaderName string - + errorFilter *leader.ErrorFilter + leaderName string // client requests/responses use unobfuscated values client ftlv1connect.AdminServiceClient } -var _ asmClient = &asmFollower{} - -func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string) *asmFollower { +func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string, leaseTTL time.Duration) *asmFollower { f := &asmFollower{ - leaderName: leaderName, - client: rpcClient, + errorFilter: leader.NewErrorFilter(leaseTTL), + leaderName: leaderName, + client: rpcClient, } return f } @@ -43,6 +45,7 @@ func (f *asmFollower) syncInterval() time.Duration { func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error { // values must store obfuscated values, but f.client gives unobfuscated values + logger := log.FromContext(ctx) obfuscator := Secrets{}.obfuscator() module := "" includeValues := true @@ -51,8 +54,17 @@ func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedV IncludeValues: &includeValues, })) if err != nil { + if connectErr := new(connect.Error); errors.As(err, &connectErr) { + if connectErr.Code() == connect.CodeInternal || connectErr.Code() == connect.CodeUnavailable { + if !f.errorFilter.ReportLeaseError() { + logger.Warnf("error getting secrets list from leader, possible leader failover %s", err.Error()) + return nil + } + } + } return fmt.Errorf("error getting secrets list from leader: %w", err) } + f.errorFilter.ReportOperationSuccess() visited := map[Ref]bool{} for _, s := range resp.Msg.Secrets { ref, err := ParseRef(s.RefPath) diff --git a/common/configuration/asm_test.go b/common/configuration/asm_test.go index 489b9b0f01..e580624b00 100644 --- a/common/configuration/asm_test.go +++ b/common/configuration/asm_test.go @@ -11,6 +11,7 @@ import ( "path" "sort" "testing" + "time" "connectrpc.com/connect" "github.com/TBD54566975/ftl/backend/controller/leases" @@ -181,7 +182,7 @@ func TestFollowerSync(t *testing.T) { // fakeRPCClient connects the follower to the leader fakeRPCClient := &fakeAdminClient{sm: leaderManager} - follower := newASMFollower(fakeRPCClient, "fake") + follower := newASMFollower(fakeRPCClient, "fake", time.Second) followerASM := newASMForTesting(ctx, externalClient, URL("http://localhost:1235"), leaser, optional.Some[asmClient](follower)) asmClient, err := followerASM.coordinator.Get()