Skip to content

Commit

Permalink
kgo: do not cancel FindCoordinator if the parent context cancels
Browse files Browse the repository at this point in the history
Some load testing in Redpanda showed a failure where consuming quit
unexpectedly and unrecoverably.

The sequence of events is:
* if OffsetCommit is issued just before Heartbeat
* and the group needs to be loaded so FindCoordinator is triggered,
* and OffsetCommit happens again, canceling the prior commit's context
Then,
* FindCoordinator would cancel
* Heartbeat, which is waiting on the same load, would fail with
  context.Canceled
* This error is seen as a group leave error
* The group management logic would quit entirely.

Now, the context used for FindCoordinator is the client context, which
is only closed on client close. This is also better anyway -- if two
requests are waiting for the same coordinator load, we don't want the
first request canceling to error the second request. If all requests
cancel and we have a stray FindCoordinator in flight, that's ok too,
because well, worst case we'll just eventually have a little bit of
extra data cached that is likely needed in the future anyway.

Closes redpanda-data/redpanda#15131
  • Loading branch information
twmb committed Dec 21, 2023
1 parent b8b065d commit 7d050fc
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,31 @@ func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*b
}

func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
mch := make(chan map[string]brokerOrErr, 1)
go func() { mch <- cl.doLoadCoordinators(ctx, typ, keys...) }()
select {
case m := <-mch:
return m
case <-ctx.Done():
m := make(map[string]brokerOrErr, len(keys))
for _, k := range keys {
m[k] = brokerOrErr{nil, ctx.Err()}
}
return m
}
}

// doLoadCoordinators uses the caller context to cancel loading metadata
// (brokerOrErr), but we use the client context to actually issue the request.
// There should be only one direct call to doLoadCoordinators, just above in
// loadCoordinator. It is possible for two requests to be loading the same
// coordinator (in fact, that's the point of this function -- collapse these
// requests). We do not want the first request canceling it's context to cause
// errors for the second request.
//
// It is ok to leave FindCoordinator running even if the caller quits. Worst
// case, we just cache things for some time in the future; yay.
func (cl *Client) doLoadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr {
m := make(map[string]brokerOrErr, len(keys))
if len(keys) == 0 {
return m
Expand Down Expand Up @@ -1575,7 +1600,12 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string
}
}

shards := cl.RequestSharded(ctx, req)
cl.cfg.logger.Log(LogLevelDebug, "prepared to issue find coordinator request",
"coordinator_type", typ,
"coordinator_keys", req.CoordinatorKeys,
)

shards := cl.RequestSharded(cl.ctx, req)

for _, shard := range shards {
if shard.Err != nil {
Expand Down Expand Up @@ -1674,10 +1704,17 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
func (cl *Client) deleteStaleCoordinator(name string, typ int8) {
cl.coordinatorsMu.Lock()
defer cl.coordinatorsMu.Unlock()
delete(cl.coordinators, coordinatorKey{
name: name,
typ: typ,
})
k := coordinatorKey{name, typ}
v := cl.coordinators[k]
if v == nil {
return
}
select {
case <-v.loadWait:
delete(cl.coordinators, k)
default:
// We are actively reloading this coordinator.
}
}

type brokerOrErr struct {
Expand Down

0 comments on commit 7d050fc

Please sign in to comment.