Skip to content

Commit

Permalink
fix: add logs for subscription and subscriber changes (#2232)
Browse files Browse the repository at this point in the history
We should get better visibility when we create/delete/update subscriber
and subscription registrations
  • Loading branch information
matt2e authored Aug 1, 2024
1 parent 636adcf commit 8dd18e7
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 30 deletions.
40 changes: 33 additions & 7 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"context"
"fmt"
"strings"
"time"

"github.com/alecthomas/types/optional"
Expand Down Expand Up @@ -169,6 +170,8 @@ func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err e
}

func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error {
logger := log.FromContext(ctx)

for _, decl := range module.Decls {
s, ok := decl.(*schema.Subscription)
if !ok {
Expand All @@ -180,18 +183,26 @@ func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.Dep
// https://github.com/TBD54566975/ftl/issues/1685
//
// It does mean that a subscription will reset to the topic's head if all subscribers are removed and then later re-added
logger.Debugf("Skipping upsert of subscription %s for %s due to lack of subscribers", s.Name, key)
continue
}
if err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{
Key: model.NewSubscriptionKey(module.Name, s.Name),
subscriptionKey := model.NewSubscriptionKey(module.Name, s.Name)
result, err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{
Key: subscriptionKey,
Module: module.Name,
Deployment: key,
TopicModule: s.Topic.Module,
TopicName: s.Topic.Name,
Name: s.Name,
}); err != nil {
})
if err != nil {
return fmt.Errorf("could not insert subscription: %w", dalerrs.TranslatePGError(err))
}
if result.Inserted {
logger.Debugf("Inserted subscription %s for %s", subscriptionKey, key)
} else {
logger.Debugf("Updated subscription %s to %s", subscriptionKey, key)
}
}
return nil
}
Expand All @@ -216,6 +227,7 @@ func hasSubscribers(subscription *schema.Subscription, decls []schema.Decl) bool
}

func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error {
logger := log.FromContext(ctx)
for _, decl := range module.Decls {
v, ok := decl.(*schema.Verb)
if !ok {
Expand All @@ -238,8 +250,9 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo
return fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err)
}
}
subscriberKey := model.NewSubscriberKey(module.Name, s.Name, v.Name)
err = tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(module.Name, s.Name, v.Name),
Key: subscriberKey,
Module: module.Name,
SubscriptionName: s.Name,
Deployment: key,
Expand All @@ -251,17 +264,30 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo
if err != nil {
return fmt.Errorf("could not insert subscriber: %w", dalerrs.TranslatePGError(err))
}
logger.Debugf("Inserted subscriber %s for %s", subscriberKey, key)
}
}
return nil
}

func (d *DAL) removeSubscriptionsAndSubscribers(ctx context.Context, tx *sql.Tx, key model.DeploymentKey) error {
if err := tx.DeleteSubscriptions(ctx, key); err != nil {
logger := log.FromContext(ctx)

subscribers, err := tx.DeleteSubscribers(ctx, key)
if err != nil {
return fmt.Errorf("could not delete old subscribers: %w", dalerrs.TranslatePGError(err))
}
if len(subscribers) > 0 {
logger.Debugf("Deleted subscribers for %s: %s", key, strings.Join(slices.Map(subscribers, func(key model.SubscriberKey) string { return key.String() }), ", "))
}

subscriptions, err := tx.DeleteSubscriptions(ctx, key)
if err != nil {
return fmt.Errorf("could not delete old subscriptions: %w", dalerrs.TranslatePGError(err))
}
if err := tx.DeleteSubscribers(ctx, key); err != nil {
return fmt.Errorf("could not delete old subscribers: %w", dalerrs.TranslatePGError(err))
if len(subscriptions) > 0 {
logger.Debugf("Deleted subscriptions for %s: %s", key, strings.Join(slices.Map(subscriptions, func(key model.SubscriptionKey) string { return key.String() }), ", "))
}

return nil
}
6 changes: 3 additions & 3 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ UPDATE SET
type = sqlc.arg('event_type')::TEXT
RETURNING id;

-- name: UpsertSubscription :exec
-- name: UpsertSubscription :one
INSERT INTO topic_subscriptions (
key,
topic_id,
Expand All @@ -599,23 +599,30 @@ ON CONFLICT (name, module_id) DO
UPDATE SET
topic_id = excluded.topic_id,
deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key)
RETURNING id;

-- name: DeleteSubscriptions :exec
RETURNING
id,
CASE
WHEN xmax = 0 THEN true
ELSE false
END AS inserted;

-- name: DeleteSubscriptions :many
DELETE FROM topic_subscriptions
WHERE deployment_id IN (
SELECT deployments.id
FROM deployments
WHERE deployments.key = sqlc.arg('deployment')::deployment_key
);
)
RETURNING topic_subscriptions.key;

-- name: DeleteSubscribers :exec
-- name: DeleteSubscribers :many
DELETE FROM topic_subscribers
WHERE deployment_id IN (
SELECT deployments.id
FROM deployments
WHERE deployments.key = sqlc.arg('deployment')::deployment_key
);
)
RETURNING topic_subscribers.key;

-- name: InsertSubscriber :exec
INSERT INTO topic_subscribers (
Expand Down
70 changes: 57 additions & 13 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8dd18e7

Please sign in to comment.