diff --git a/.golangci.yml b/.golangci.yml index d46adb0ba2..7ddeb6d1ec 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -136,3 +136,4 @@ issues: - "fmt.Errorf can be replaced with errors.New" - "fmt.Sprintf can be replaced with string concatenation" - "strings.Title has been deprecated" + - "error returned from external package is unwrapped.*TranslatePGError" \ No newline at end of file diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 367321deec..16dd374a1c 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -130,6 +130,7 @@ type Config struct { ControllerTimeout time.Duration `help:"Controller heartbeat timeout." default:"10s"` DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"` ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"` + EventLogRetention *time.Duration `help:"Delete call logs after this time period. 0 to disable" env:"FTL_EVENT_LOG_RETENTION" default:"24h"` ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"` EncryptionKeys CommonConfig @@ -326,6 +327,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s // Singleton tasks use leases to only run on a single controller. svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, time.Second*2, time.Second*20, time.Second*20)) svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, time.Second*2, time.Second, time.Second*10)) + svc.tasks.Singleton(maybeDevelTask(svc.reapCallEvents, time.Second*2, time.Second, time.Second*10)) svc.tasks.Singleton(maybeDevelTask(svc.releaseExpiredReservations, time.Second*2, time.Second, time.Second*20)) svc.tasks.Singleton(maybeDevelTask(svc.reconcileDeployments, time.Second*2, time.Second, time.Second*5)) svc.tasks.Singleton(maybeDevelTask(svc.reconcileRunners, time.Second*2, time.Second, time.Second*5)) @@ -1794,6 +1796,26 @@ func (s *Service) syncSchema(ctx context.Context) { } } +func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { + logger := log.FromContext(ctx) + + if s.config.EventLogRetention == nil { + logger.Tracef("Event log retention is disabled, will not prune.") + return time.Hour, nil + } + + removed, err := s.dal.DeleteOldEvents(ctx, dal.EventTypeCall, *s.config.EventLogRetention) + if err != nil { + return 0, fmt.Errorf("failed to prune call events: %w", err) + } + if removed > 0 { + logger.Debugf("Pruned %d call events older than %s", removed, s.config.EventLogRetention) + } + + // Prune every 5% of the retention period. + return *s.config.EventLogRetention / 20, nil +} + func extractIngressRoutingEntries(req *ftlv1.CreateDeploymentRequest) []dal.IngressRoutingEntry { var ingressRoutes []dal.IngressRoutingEntry for _, decl := range req.Schema.Decls { diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 2b5f95d9db..83dd3c80c4 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -1164,6 +1164,11 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { })) } +func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { + count, err := d.db.DeleteOldEvents(ctx, age, eventType) + return count, dalerrs.TranslatePGError(err) +} + func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) { rows, err := d.db.GetActiveRunners(ctx) if err != nil { diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 31fcaf7d7c..0f3be9a3a9 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -403,3 +403,98 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) { t.Helper() assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) } + +func TestDeleteOldEvents(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn, NoOpEncryptors()) + assert.NoError(t, err) + + var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) + var testSha sha256.SHA256 + + t.Run("CreateArtefact", func(t *testing.T) { + testSha, err = dal.CreateArtefact(ctx, testContent) + assert.NoError(t, err) + }) + + module := &schema.Module{Name: "test"} + var deploymentKey model.DeploymentKey + t.Run("CreateDeployment", func(t *testing.T) { + deploymentKey, err = dal.CreateDeployment(ctx, "go", module, []DeploymentArtefact{{ + Digest: testSha, + Executable: true, + Path: "dir/filename", + }}, nil, nil) + assert.NoError(t, err) + }) + + requestKey := model.NewRequestKey(model.OriginIngress, "GET /test") + // week old event + callEvent := &CallEvent{ + Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Request: []byte("{}"), + Response: []byte(`{"time": "now"}`), + DestVerb: schema.Ref{Module: "time", Name: "time"}, + } + t.Run("InsertCallEvent", func(t *testing.T) { + err = dal.InsertCallEvent(ctx, callEvent) + assert.NoError(t, err) + }) + // hour old event + callEvent = &CallEvent{ + Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Request: []byte("{}"), + Response: []byte(`{"time": "now"}`), + DestVerb: schema.Ref{Module: "time", Name: "time"}, + } + t.Run("InsertCallEvent", func(t *testing.T) { + err = dal.InsertCallEvent(ctx, callEvent) + assert.NoError(t, err) + }) + + // week old event + logEvent := &LogEvent{ + Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Level: int32(log.Warn), + Attributes: map[string]string{"attr": "value"}, + Message: "A log entry", + } + t.Run("InsertLogEntry", func(t *testing.T) { + err = dal.InsertLogEvent(ctx, logEvent) + assert.NoError(t, err) + }) + // hour old event + logEvent = &LogEvent{ + Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Level: int32(log.Warn), + Attributes: map[string]string{"attr": "value"}, + Message: "A log entry", + } + t.Run("InsertLogEntry", func(t *testing.T) { + err = dal.InsertLogEvent(ctx, logEvent) + assert.NoError(t, err) + }) + + t.Run("DeleteOldEvents", func(t *testing.T) { + count, err := dal.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) + assert.NoError(t, err) + assert.Equal(t, int64(1), count) + + count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + assert.NoError(t, err) + assert.Equal(t, int64(2), count) + + count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + assert.NoError(t, err) + assert.Equal(t, int64(0), count) + }) +} diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 39e5260a57..1331f7e476 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -30,6 +30,7 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error + DeleteOldEvents(ctx context.Context, timeout time.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 7c18b75690..7d5051475b 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -360,6 +360,16 @@ VALUES ( sqlc.arg('payload') ); +-- name: DeleteOldEvents :one +WITH deleted AS ( + DELETE FROM events + WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL + AND type = sqlc.arg('type') + RETURNING 1 +) +SELECT COUNT(*) +FROM deleted; + -- name: CreateRequest :exec INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3); diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index dadcc77bca..b5096964e6 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -277,6 +277,24 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } +const deleteOldEvents = `-- name: DeleteOldEvents :one +WITH deleted AS ( + DELETE FROM events + WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL + AND type = $2 + RETURNING 1 +) +SELECT COUNT(*) +FROM deleted +` + +func (q *Queries) DeleteOldEvents(ctx context.Context, timeout time.Duration, type_ EventType) (int64, error) { + row := q.db.QueryRow(ctx, deleteOldEvents, timeout, type_) + var count int64 + err := row.Scan(&count) + return count, err +} + const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN (