From 8dd18e78e9404832f378101e3cee6f1a8b514f66 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 1 Aug 2024 15:50:53 +1000 Subject: [PATCH] fix: add logs for subscription and subscriber changes (#2232) We should get better visibility when we create/delete/update subscriber and subscription registrations --- backend/controller/dal/pubsub.go | 40 ++++++++++++--- backend/controller/sql/querier.go | 6 +-- backend/controller/sql/queries.sql | 21 +++++--- backend/controller/sql/queries.sql.go | 70 ++++++++++++++++++++++----- 4 files changed, 107 insertions(+), 30 deletions(-) diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 2813b7e91f..16d818b246 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -3,6 +3,7 @@ package dal import ( "context" "fmt" + "strings" "time" "github.com/alecthomas/types/optional" @@ -169,6 +170,8 @@ func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err e } func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error { + logger := log.FromContext(ctx) + for _, decl := range module.Decls { s, ok := decl.(*schema.Subscription) if !ok { @@ -180,18 +183,26 @@ func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.Dep // https://github.com/TBD54566975/ftl/issues/1685 // // It does mean that a subscription will reset to the topic's head if all subscribers are removed and then later re-added + logger.Debugf("Skipping upsert of subscription %s for %s due to lack of subscribers", s.Name, key) continue } - if err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{ - Key: model.NewSubscriptionKey(module.Name, s.Name), + subscriptionKey := model.NewSubscriptionKey(module.Name, s.Name) + result, err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{ + Key: subscriptionKey, Module: module.Name, Deployment: key, TopicModule: s.Topic.Module, TopicName: s.Topic.Name, Name: s.Name, - }); err != nil { + }) + if err != nil { return fmt.Errorf("could not insert subscription: %w", dalerrs.TranslatePGError(err)) } + if result.Inserted { + logger.Debugf("Inserted subscription %s for %s", subscriptionKey, key) + } else { + logger.Debugf("Updated subscription %s to %s", subscriptionKey, key) + } } return nil } @@ -216,6 +227,7 @@ func hasSubscribers(subscription *schema.Subscription, decls []schema.Decl) bool } func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error { + logger := log.FromContext(ctx) for _, decl := range module.Decls { v, ok := decl.(*schema.Verb) if !ok { @@ -238,8 +250,9 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo return fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err) } } + subscriberKey := model.NewSubscriberKey(module.Name, s.Name, v.Name) err = tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{ - Key: model.NewSubscriberKey(module.Name, s.Name, v.Name), + Key: subscriberKey, Module: module.Name, SubscriptionName: s.Name, Deployment: key, @@ -251,17 +264,30 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo if err != nil { return fmt.Errorf("could not insert subscriber: %w", dalerrs.TranslatePGError(err)) } + logger.Debugf("Inserted subscriber %s for %s", subscriberKey, key) } } return nil } func (d *DAL) removeSubscriptionsAndSubscribers(ctx context.Context, tx *sql.Tx, key model.DeploymentKey) error { - if err := tx.DeleteSubscriptions(ctx, key); err != nil { + logger := log.FromContext(ctx) + + subscribers, err := tx.DeleteSubscribers(ctx, key) + if err != nil { + return fmt.Errorf("could not delete old subscribers: %w", dalerrs.TranslatePGError(err)) + } + if len(subscribers) > 0 { + logger.Debugf("Deleted subscribers for %s: %s", key, strings.Join(slices.Map(subscribers, func(key model.SubscriberKey) string { return key.String() }), ", ")) + } + + subscriptions, err := tx.DeleteSubscriptions(ctx, key) + if err != nil { return fmt.Errorf("could not delete old subscriptions: %w", dalerrs.TranslatePGError(err)) } - if err := tx.DeleteSubscribers(ctx, key); err != nil { - return fmt.Errorf("could not delete old subscribers: %w", dalerrs.TranslatePGError(err)) + if len(subscriptions) > 0 { + logger.Debugf("Deleted subscriptions for %s: %s", key, strings.Join(slices.Map(subscriptions, func(key model.SubscriptionKey) string { return key.String() }), ", ")) } + return nil } diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 99070044ed..8856d1109a 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -28,8 +28,8 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) error - DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) error + DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) + DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) ExpireLeases(ctx context.Context) (int64, error) ExpireRunnerReservations(ctx context.Context) (int64, error) @@ -111,7 +111,7 @@ type Querier interface { // there is no corresponding deployment, then the deployment ID is -1 // and the parent statement will fail due to a foreign key constraint. UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error) - UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error + UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error } diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index d8eea458b7..86548b00de 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -575,7 +575,7 @@ UPDATE SET type = sqlc.arg('event_type')::TEXT RETURNING id; --- name: UpsertSubscription :exec +-- name: UpsertSubscription :one INSERT INTO topic_subscriptions ( key, topic_id, @@ -599,23 +599,30 @@ ON CONFLICT (name, module_id) DO UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key) -RETURNING id; - --- name: DeleteSubscriptions :exec +RETURNING + id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END AS inserted; + +-- name: DeleteSubscriptions :many DELETE FROM topic_subscriptions WHERE deployment_id IN ( SELECT deployments.id FROM deployments WHERE deployments.key = sqlc.arg('deployment')::deployment_key -); +) +RETURNING topic_subscriptions.key; --- name: DeleteSubscribers :exec +-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN ( SELECT deployments.id FROM deployments WHERE deployments.key = sqlc.arg('deployment')::deployment_key -); +) +RETURNING topic_subscribers.key; -- name: InsertSubscriber :exec INSERT INTO topic_subscribers ( diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index ccf7917eab..8fd8315e2e 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -222,32 +222,64 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } -const deleteSubscribers = `-- name: DeleteSubscribers :exec +const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN ( SELECT deployments.id FROM deployments WHERE deployments.key = $1::deployment_key ) +RETURNING topic_subscribers.key ` -func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) error { - _, err := q.db.Exec(ctx, deleteSubscribers, deployment) - return err +func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) { + rows, err := q.db.Query(ctx, deleteSubscribers, deployment) + if err != nil { + return nil, err + } + defer rows.Close() + var items []model.SubscriberKey + for rows.Next() { + var key model.SubscriberKey + if err := rows.Scan(&key); err != nil { + return nil, err + } + items = append(items, key) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil } -const deleteSubscriptions = `-- name: DeleteSubscriptions :exec +const deleteSubscriptions = `-- name: DeleteSubscriptions :many DELETE FROM topic_subscriptions WHERE deployment_id IN ( SELECT deployments.id FROM deployments WHERE deployments.key = $1::deployment_key ) +RETURNING topic_subscriptions.key ` -func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) error { - _, err := q.db.Exec(ctx, deleteSubscriptions, deployment) - return err +func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) { + rows, err := q.db.Query(ctx, deleteSubscriptions, deployment) + if err != nil { + return nil, err + } + defer rows.Close() + var items []model.SubscriptionKey + for rows.Next() { + var key model.SubscriptionKey + if err := rows.Scan(&key); err != nil { + return nil, err + } + items = append(items, key) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil } const deregisterRunner = `-- name: DeregisterRunner :one @@ -2166,7 +2198,7 @@ func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (opt return deployment_id, err } -const upsertSubscription = `-- name: UpsertSubscription :exec +const upsertSubscription = `-- name: UpsertSubscription :one INSERT INTO topic_subscriptions ( key, topic_id, @@ -2190,7 +2222,12 @@ ON CONFLICT (name, module_id) DO UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) -RETURNING id +RETURNING + id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END AS inserted ` type UpsertSubscriptionParams struct { @@ -2202,8 +2239,13 @@ type UpsertSubscriptionParams struct { Name string } -func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error { - _, err := q.db.Exec(ctx, upsertSubscription, +type UpsertSubscriptionRow struct { + ID int64 + Inserted bool +} + +func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) { + row := q.db.QueryRow(ctx, upsertSubscription, arg.Key, arg.TopicModule, arg.TopicName, @@ -2211,7 +2253,9 @@ func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscription arg.Deployment, arg.Name, ) - return err + var i UpsertSubscriptionRow + err := row.Scan(&i.ID, &i.Inserted) + return i, err } const upsertTopic = `-- name: UpsertTopic :exec