Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: do not rotate the consumer session when pausing topics/partitions #601

Merged
merged 1 commit into from
Oct 22, 2023

Conversation

twmb
Copy link
Owner

@twmb twmb commented Oct 21, 2023

Issue #489 asked to stop returning data after a partition was paused -- the original implementation of pausing kept returning any data that was in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all in flight fetch requests. This was easy, but can cause a lot of connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data from fetches based on what is paused at the moment the fetches are being returned to the client. This does make polling paused fetches very slightly slower (a map lookup per partition), but there's only so much that's possible. If a partition is paused, we drop the data and do not advance the internal offset. If a partition is not paused, we keep the data and return it -- same as before.

Closes #585.

pkg/kgo/source.go Outdated Show resolved Hide resolved
@twmb twmb force-pushed the 589 branch 3 times, most recently from c1092c5 to 23b0878 Compare October 21, 2023 16:43
@twmb
Copy link
Owner Author

twmb commented Oct 21, 2023 via email

@twmb
Copy link
Owner Author

twmb commented Oct 21, 2023 via email

var strip map[string]map[int32]struct{}
f := s.takeBufferedFn(true, func(os usedOffsets) {
for t, ps := range os {
// If the entire topic is paused, we allowUsable all

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you checked whether the topic was paused some way at the top here, you could get rid of some of the conditionals below. For example (may want to add a method or two to pausedPartitions if you didn't want to access all or m) :

pp, ok := pausedTopics[t]
if !ok {
	continue
}

// you know something is paused at this point
if strip == nil {
	strip = make(map[string]map[int32]struct{})
}

if pp.all {
	for _, o := range ps {
		o.from.allowUsable()
	}
	strip[t] = nil // initialize key, for existence-but-len-0 check below
	continue
}

// you know that specific partitions are paused at this point
stript := make(map[int32]struct{})
for _, o := range ps {
	if _, ok := pp.m[o.from.partition]; ok {
		o.from.allowUsable()
		stript[o.from.partition] = struct{}{}
		continue
	}
	o.from.setOffset(o.cursorOffset)
	o.from.allowUsable()
}
strip[t] = stript

Copy link
Owner Author

@twmb twmb Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely -- it's possible that you paused "foo", but this fetch contains only data for "bar". This block of checking/stripping runs on every fetch response until you unpause everything, but once you pause something, what you paused is also not added to fetch requests. This stripping is only necessary basically for what was in flight at the time you added something to be paused.

edit: Rereading, I think you said something slightly differently from what I read it as.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this logic is better, changing.

Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
@twmb twmb merged commit 3d115f1 into master Oct 22, 2023
@twmb twmb deleted the 589 branch October 22, 2023 03:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pausing/Resuming partitions causes lots of errors
2 participants