Skip to content

Commit

Permalink
feat: cmd reset subscription to topic head
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Jul 31, 2024
1 parent f3441cc commit 073c639
Show file tree
Hide file tree
Showing 17 changed files with 1,164 additions and 638 deletions.
8 changes: 8 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,14 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
}

func (s *Service) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) {
err := s.dal.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name)
if err != nil {
return nil, fmt.Errorf("could not reset subscription: %w", err)
}
return connect.NewResponse(&ftlv1.ResetSubscriptionResponse{}), nil
}

// Load schemas for existing modules, combine with our new one, and validate the new module in the context
// of the whole schema.
func (s *Service) validateModuleSchema(ctx context.Context, module *schema.Module) (*schema.Module, error) {
Expand Down
41 changes: 41 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,47 @@ func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name str
return nil
}

// ResetSubscription resets the subscription cursor to the topic's head.
func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err error) {
tx, err := d.db.Begin(ctx)
if err != nil {
return fmt.Errorf("could not start transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

qtx := NewQTx(d.db.Conn(), tx.Tx())

subscription, err := qtx.GetSubscription(ctx, name, module)
if err != nil {
if dalerrs.IsNotFound(err) {
return fmt.Errorf("subscription %s.%s not found", module, name)
}
return fmt.Errorf("could not fetch subscription: %w", dalerrs.TranslatePGError(err))
}

topic, err := qtx.GetTopic(ctx, subscription.TopicID)
if err != nil {
return fmt.Errorf("could not fetch topic: %w", dalerrs.TranslatePGError(err))
}

headEventID, ok := topic.Head.Get()
if !ok {
return fmt.Errorf("no events published to topic %s", topic.Name)
}

headEvent, err := qtx.GetTopicEvent(ctx, headEventID)
if err != nil {
return fmt.Errorf("could not fetch topic head: %w", dalerrs.TranslatePGError(err))
}

err = qtx.SetSubscriptionCursor(ctx, subscription.Key, headEvent.Key)
if err != nil {
return fmt.Errorf("failed to reset subscription: %w", dalerrs.TranslatePGError(err))
}

return nil
}

func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error {
for _, decl := range module.Decls {
s, ok := decl.(*schema.Subscription)
Expand Down
4 changes: 4 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,35 @@ UPDATE topic_subscriptions
SET state = 'idle'
WHERE name = @name::TEXT
AND module_id = (SELECT id FROM module);

-- name: GetSubscription :one
WITH module AS (
SELECT id
FROM modules
WHERE name = $2::TEXT
)
SELECT *
FROM topic_subscriptions
WHERE name = $1::TEXT
AND module_id = (SELECT id FROM module);

-- name: SetSubscriptionCursor :exec
WITH event AS (
SELECT id, created_at, key, topic_id, payload
FROM topic_events
WHERE "key" = $2::topic_event_key
)
UPDATE topic_subscriptions
SET state = 'idle',
cursor = (SELECT id FROM event)
WHERE key = $1::subscription_key;

-- name: GetTopic :one
SELECT *
FROM topics
WHERE id = $1::BIGINT;

-- name: GetTopicEvent :one
SELECT *
FROM topic_events
WHERE id = $1::BIGINT;
86 changes: 86 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 073c639

Please sign in to comment.