-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: cmd reset subscription to topic head #2210
Conversation
e4a96fd
to
a0404e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Niiiice! This was way easier to digest than I expected from that line count lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome Lizzy, thanks.
backend/controller/dal/pubsub.go
Outdated
@@ -127,6 +127,47 @@ func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name str | |||
return nil | |||
} | |||
|
|||
// ResetSubscription resets the subscription cursor to the topic's head. | |||
func (d *DAL) ResetSubscription(ctx context.Context, module, name string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll need a named error return value in order for tx.CommitOrRollback()
to work - super not obvious I know.
func (d *DAL) ResetSubscription(ctx context.Context, module, name string) error { | |
func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err error) { |
@@ -0,0 +1,5 @@ | |||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call the top level command pubsub
? So maybe ftl pubsub subscription reset
? A little verbose, but I think we'll likely want more pubsub related commands, and they're not all subscriptions.
subscriptionCursorAt("echo", "emailInvoices", 1), | ||
Exec("ftl", "subscription", "reset", "echo.emailInvoices"), | ||
topicHeadAt("time", "invoices", 2), | ||
subscriptionCursorAt("echo", "emailInvoices", 2), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome.
c0365e8
to
073c639
Compare
e319872
to
a6d05c6
Compare
WHERE "key" = $2::topic_event_key | ||
) | ||
UPDATE topic_subscriptions | ||
SET state = 'idle', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could cause a race condition, maybe?
- Subscription is behind and currently consuming an event
- Reset subscription command comes in sets the state to
idle
and updates the cursor to head - 2 new events are posted to the topic
- New event starts being consumed by the subscription
- Original event (from before the reset command) finishes being processed, sets the state to
idle
even though we are processing the other event still - Then pub sub triggers the next event to be processed (without knowing the other event is being processed still)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah interesting, do you think the solution would be to only update the cursor and not the state? so that if there are any outstanding async calls executing, they'd have to complete before any new ones are kicked off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly don't know, possibly! I added it as an issue because I'm not sure whether its a priority but didn't want it to be lost: #2261
Looks good! 👏 |
fixes #2175