Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cmd reset subscription to topic head #2210

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,14 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
}

func (s *Service) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) {
err := s.dal.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name)
if err != nil {
return nil, fmt.Errorf("could not reset subscription: %w", err)
}
return connect.NewResponse(&ftlv1.ResetSubscriptionResponse{}), nil
}

// Load schemas for existing modules, combine with our new one, and validate the new module in the context
// of the whole schema.
func (s *Service) validateModuleSchema(ctx context.Context, module *schema.Module) (*schema.Module, error) {
Expand Down
41 changes: 41 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,47 @@ func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name str
return nil
}

// ResetSubscription resets the subscription cursor to the topic's head.
func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err error) {
tx, err := d.db.Begin(ctx)
if err != nil {
return fmt.Errorf("could not start transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

qtx := NewQTx(d.db.Conn(), tx.Tx())

subscription, err := qtx.GetSubscription(ctx, name, module)
if err != nil {
if dalerrs.IsNotFound(err) {
return fmt.Errorf("subscription %s.%s not found", module, name)
}
return fmt.Errorf("could not fetch subscription: %w", dalerrs.TranslatePGError(err))
}

topic, err := qtx.GetTopic(ctx, subscription.TopicID)
if err != nil {
return fmt.Errorf("could not fetch topic: %w", dalerrs.TranslatePGError(err))
}

headEventID, ok := topic.Head.Get()
if !ok {
return fmt.Errorf("no events published to topic %s", topic.Name)
}

headEvent, err := qtx.GetTopicEvent(ctx, headEventID)
if err != nil {
return fmt.Errorf("could not fetch topic head: %w", dalerrs.TranslatePGError(err))
}

err = qtx.SetSubscriptionCursor(ctx, subscription.Key, headEvent.Key)
if err != nil {
return fmt.Errorf("failed to reset subscription: %w", dalerrs.TranslatePGError(err))
}

return nil
}

func (d *DAL) createSubscriptions(ctx context.Context, tx *sql.Tx, key model.DeploymentKey, module *schema.Module) error {
for _, decl := range module.Decls {
s, ok := decl.(*schema.Subscription)
Expand Down
4 changes: 4 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,35 @@ UPDATE topic_subscriptions
SET state = 'idle'
WHERE name = @name::TEXT
AND module_id = (SELECT id FROM module);

-- name: GetSubscription :one
WITH module AS (
SELECT id
FROM modules
WHERE name = $2::TEXT
)
SELECT *
FROM topic_subscriptions
WHERE name = $1::TEXT
AND module_id = (SELECT id FROM module);

-- name: SetSubscriptionCursor :exec
WITH event AS (
SELECT id, created_at, key, topic_id, payload
FROM topic_events
WHERE "key" = $2::topic_event_key
)
UPDATE topic_subscriptions
SET state = 'idle',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could cause a race condition, maybe?

  • Subscription is behind and currently consuming an event
  • Reset subscription command comes in sets the state to idle and updates the cursor to head
  • 2 new events are posted to the topic
  • New event starts being consumed by the subscription
  • Original event (from before the reset command) finishes being processed, sets the state to idle even though we are processing the other event still
  • Then pub sub triggers the next event to be processed (without knowing the other event is being processed still)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah interesting, do you think the solution would be to only update the cursor and not the state? so that if there are any outstanding async calls executing, they'd have to complete before any new ones are kicked off

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't know, possibly! I added it as an issue because I'm not sure whether its a priority but didn't want it to be lost: #2261

cursor = (SELECT id FROM event)
WHERE key = $1::subscription_key;

-- name: GetTopic :one
SELECT *
FROM topics
WHERE id = $1::BIGINT;

-- name: GetTopicEvent :one
SELECT *
FROM topic_events
WHERE id = $1::BIGINT;
86 changes: 86 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading