From b7a335a77bfd07ca77726208f1b3991d07b01145 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 20 Dec 2023 16:26:41 -0700 Subject: [PATCH] kfake: add DropControl, SleepOutOfOrder, CoordinatorFor, RehashCoordinators * Sleeping was a bit limited because if two requests came in on the same connection, you could not really chain logic. Sleeping out of order allows you to at least run some logic to gate how requests behave with each other. It's not the most obvious, so it is not the default. * Adds SleepOutOfOrder * Adds CoordinatorFor so you can see which "broker" a coordinator request will go to * Adds RehashCoordinators to change where requests are hashed to The latter two allow you to loop rehashing until a coordinator for your key changes, if you want to force NotCoordinator requests. --- pkg/kfake/cluster.go | 308 ++++++++++++++++++++++++++++++++++--------- pkg/kfake/config.go | 11 ++ pkg/kfake/groups.go | 3 +- 3 files changed, 260 insertions(+), 62 deletions(-) diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 05167c72..fb622f41 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -29,6 +29,8 @@ type ( controller *broker bs []*broker + coordinatorGen atomic.Uint64 + adminCh chan func() reqCh chan *clientReq wakeCh chan *slept @@ -64,6 +66,7 @@ type ( key int16 fn controlFn keep bool + drop bool lastReq map[*clientConn]*clientReq // used to not re-run requests that slept, see doc comments below } @@ -254,6 +257,7 @@ func (b *broker) listen() { } func (c *Cluster) run() { +outer: for { var ( creq *clientReq @@ -270,11 +274,13 @@ func (c *Cluster) run() { return case admin := <-c.adminCh: - // Run a custom request in the context of the cluster. admin() continue case creq = <-c.reqCh: + if c.cfg.sleepOutOfOrder { + break + } // If we have any sleeping request on this node, // we enqueue the new live request to the end and // wait for the sleeping request to finish. @@ -301,19 +307,28 @@ func (c *Cluster) run() { // Control flow is weird here, but is described more // fully in the finish/resleep/etc methods. c.continueSleptControl(s) - select { - case res := <-s.res: - c.finishSleptControl(s) - cctx := s.cctx - s = nil - kresp, err, handled = res.kresp, res.err, res.handled - if handled { - c.popControl(cctx) - goto afterControl + inner: + for { + select { + case <-c.die: + return + case admin := <-c.adminCh: + admin() + continue inner + case res := <-s.res: + c.finishSleptControl(s) + cctx := s.cctx + s = nil + kresp, err, handled = res.kresp, res.err, res.handled + c.maybePopControl(handled, cctx) + if handled { + goto afterControl + } + break inner + case sleepChs := <-c.controlSleep: + c.resleepSleptControl(s, sleepChs) + continue outer } - case sleepChs := <-c.controlSleep: - c.resleepSleptControl(s, sleepChs) - continue } case w = <-c.watchFetchCh: @@ -435,10 +450,15 @@ func (c *Cluster) run() { // Controlling a request drops the control function from the cluster, meaning // that a control function can only control *one* request. To keep the control // function handling more requests, you can call KeepControl within your -// control function. +// control function. Alternatively, if you want to just run some logic in your +// control function but then have the cluster handle the request as normal, +// you can call DropControl to drop a control function that was not handled. +// +// It is safe to add new control functions within a control function. // -// It is safe to add new control functions within a control function. Control -// functions are not called concurrently. +// Control functions are run serially unless you use SleepControl, multiple +// control functions are "in progress", and you run Cluster.Close. Closing a +// Cluster awakens all sleeping control functions. func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.ControlKey(-1, fn) } @@ -455,9 +475,15 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { // Controlling a request drops the control function from the cluster, meaning // that a control function can only control *one* request. To keep the control // function handling more requests, you can call KeepControl within your -// control function. +// control function. Alternatively, if you want to just run some logic in your +// control function but then have the cluster handle the request as normal, +// you can call DropControl to drop a control function that was not handled. // // It is safe to add new control functions within a control function. +// +// Control functions are run serially unless you use SleepControl, multiple +// control functions are "in progress", and you run Cluster.Close. Closing a +// Cluster awakens all sleeping control functions. func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.controlMu.Lock() defer c.controlMu.Unlock() @@ -484,6 +510,19 @@ func (c *Cluster) KeepControl() { } } +// DropControl allows you to drop the current control function. This takes +// precedence over KeepControl. The use of this function is you can run custom +// control logic *once*, drop the control function, and return that the +// function was not handled -- thus allowing other control functions to run, or +// allowing the kfake cluster to process the request as normal. +func (c *Cluster) DropControl() { + c.controlMu.Lock() + defer c.controlMu.Unlock() + if c.currentControl != nil { + c.currentControl.drop = true + } +} + // SleepControl sleeps the current control function until wakeup returns. This // yields to run any other connection. // @@ -525,7 +564,10 @@ func (c *Cluster) SleepControl(wakeup func()) { }() c.controlSleep <- sleepChs - <-sleepChs.clientCont + select { + case <-sleepChs.clientCont: + case <-c.die: + } } // CurrentNode is solely valid from within a control function; it returns @@ -560,20 +602,25 @@ func (c *Cluster) tryControlKey(key int16, creq *clientReq) (kmsg.Response, erro } cctx.lastReq[creq.cc] = creq res := c.runControl(cctx, creq) - select { - case res := <-res: - if res.handled { - c.popControl(cctx) - return res.kresp, res.err, true + for { + select { + case <-c.die: + return nil, nil, false + case admin := <-c.adminCh: + admin() + continue + case res := <-res: + c.maybePopControl(res.handled, cctx) + return res.kresp, res.err, res.handled + case sleepChs := <-c.controlSleep: + c.beginSleptControl(&slept{ + cctx: cctx, + sleepChs: sleepChs, + res: res, + creq: creq, + }) + return nil, nil, true } - case sleepChs := <-c.controlSleep: - c.beginSleptControl(&slept{ - cctx: cctx, - sleepChs: sleepChs, - res: res, - creq: creq, - }) - return nil, nil, true } } return nil, nil, false @@ -606,7 +653,11 @@ func (c *Cluster) beginSleptControl(s *slept) { // unlock us safely. bs := c.sleeping[s.creq.cc] if bs == nil { - bs = &bsleep{c: c} + bs = &bsleep{ + c: c, + set: make(map[*slept]struct{}), + setWake: make(chan *slept, 1), + } c.sleeping[s.creq.cc] = bs } bs.enqueue(s) @@ -648,10 +699,13 @@ func (c *Cluster) resleepSleptControl(s *slept, sleepChs sleepChs) { c.controlMu.Unlock() s.sleepChs = sleepChs s.continueDequeue <- struct{}{} + // For OOO requests, we need to manually trigger a goroutine to + // watch for the sleep to end. + s.bs.maybeWaitOOOWake(s) } -func (c *Cluster) popControl(cctx *controlCtx) { - if !cctx.keep { +func (c *Cluster) maybePopControl(handled bool, cctx *controlCtx) { + if handled && !cctx.keep || cctx.drop { delete(c.control[cctx.key], cctx) } } @@ -659,12 +713,15 @@ func (c *Cluster) popControl(cctx *controlCtx) { // bsleep manages sleeping requests on a connection to a broker, or // non-sleeping requests that are waiting for sleeping requests to finish. type bsleep struct { - c *Cluster - mu sync.Mutex - queue []*slept + c *Cluster + mu sync.Mutex + queue []*slept + set map[*slept]struct{} + setWake chan *slept } type slept struct { + bs *bsleep cctx *controlCtx sleepChs sleepChs res <-chan controlResp @@ -681,45 +738,60 @@ type sleepChs struct { // enqueue has a few potential behaviors. // -// * If s is waiting, this is a new request enqueueing to the back of an +// (1) If s is waiting, this is a new request enqueueing to the back of an // existing queue, where we are waiting for the head request to finish // sleeping. Easy case. // -// * If s is not waiting, this is a sleeping request. If the queue is empty, +// (2) If s is not waiting, this is a sleeping request. If the queue is empty, // this is the first sleeping request on a node. We enqueue and start our wait // goroutine. Easy. // -// * If s is not waiting, but our queue is non-empty, this must be from a -// convoluted scenario: There was a request in front of us that slept, and we -// were waiting, and now we ourselves slept OR we previously slept, but we -// returned "not handled". We are now re-enqueueing ourself. Rather than add to -// the back, we update our head request with the new enqueued values. In this -// last case, bsleep is actually waiting for a signal down 'continueDequeue', -// and it will be signaled in the 'run' goroutine once tryControl returns -// (which it will, right after we are done here). We need to update values on -// the head. +// (3) If s is not waiting, but our queue is non-empty, this must be from a +// convoluted scenario: +// +// (a) the user has SleepOutOfOrder configured, +// (b) or, there was a request in front of us that slept, we were waiting, +// and now we ourselves are sleeping +// (c) or, we are sleeping for the second time in a single control func (bs *bsleep) enqueue(s *slept) bool { if bs == nil { - return false + return false // Do not enqueue, nothing sleeping } + s.continueDequeue = make(chan struct{}, 1) + s.bs = bs bs.mu.Lock() defer bs.mu.Unlock() if s.waiting { - if len(bs.queue) > 0 { - bs.queue = append(bs.queue, s) + if bs.c.cfg.sleepOutOfOrder { + panic("enqueueing a waiting request even though we are sleeping out of order") + } + if !bs.empty() { + bs.keep(s) // Case (1) return true } - return false + return false // We do not enqueue, do not wait: nothing sleeping ahead of us } - if len(bs.queue) == 0 { - bs.queue = append(bs.queue, s) - go bs.wait() + if bs.empty() { + bs.keep(s) + go bs.wait() // Case (2) + return true + } + var q0 *slept + if !bs.c.cfg.sleepOutOfOrder { + q0 = bs.queue[0] // Case (3b) or (3c) -- just update values below + } else { + // Case (3a), out of order sleep: we need to check the entire + // queue to see if this request was already sleeping and, if + // so, update the values. If it was not already sleeping, we + // "keep" the new sleeping item. + bs.keep(s) return true } - q0 := bs.queue[0] if q0.creq != s.creq { panic("internal error: sleeping request not head request") } + // We do not update continueDequeue because it is actively being read, + // we just reuse the old value. q0.cctx = s.cctx q0.sleepChs = s.sleepChs q0.res = s.res @@ -727,23 +799,111 @@ func (bs *bsleep) enqueue(s *slept) bool { return true } +// keep stores a sleeping request to be managed. For out of order control, the +// log is a bit more complicated and we need to watch for the control sleep +// finishing here, and forward the "I'm done sleeping" notification to waitSet. +func (bs *bsleep) keep(s *slept) { + if !bs.c.cfg.sleepOutOfOrder { + bs.queue = append(bs.queue, s) + return + } + bs.set[s] = struct{}{} + bs.maybeWaitOOOWake(s) +} + +func (bs *bsleep) maybeWaitOOOWake(s *slept) { + if !bs.c.cfg.sleepOutOfOrder { + return + } + go func() { + select { + case <-bs.c.die: + case <-s.sleepChs.clientWait: + select { + case <-bs.c.die: + case bs.setWake <- s: + } + } + }() +} + +func (bs *bsleep) empty() bool { + return len(bs.queue) == 0 && len(bs.set) == 0 +} + func (bs *bsleep) wait() { + if bs.c.cfg.sleepOutOfOrder { + bs.waitSet() + } else { + bs.waitQueue() + } +} + +// For out of order control, all control functions run concurrently, serially. +// Whenever they wake up, they send themselves down setWake. waitSet manages +// handling the wake up and interacting with the serial manage goroutine to +// run everything properly. +func (bs *bsleep) waitSet() { for { bs.mu.Lock() - if len(bs.queue) == 0 { + if len(bs.set) == 0 { bs.mu.Unlock() return } - q0 := bs.queue[0] bs.mu.Unlock() - if q0.continueDequeue == nil { - q0.continueDequeue = make(chan struct{}, 1) + // Wait for a control function to awaken. + var q *slept + select { + case <-bs.c.die: + return + case q = <-bs.setWake: + q.sleepChs.clientWait = nil } + // Now, schedule ourselves with the run loop. + select { + case <-bs.c.die: + return + case bs.c.wakeCh <- q: + } + + // Wait for this control function to finish its loop in the run + // function. Once it does, if clientWait is non-nil, the + // control function went back to sleep. If it is nil, the + // control function is done and we remove this from tracking. + select { + case <-bs.c.die: + return + case <-q.continueDequeue: + } + if q.sleepChs.clientWait == nil { + bs.mu.Lock() + delete(bs.set, q) + bs.mu.Unlock() + } + } +} + +// For in-order control functions, the concept is slightly simpler but the +// logic flow is the same. We wait for the head control function to wake up, +// try to run it, and then wait for it to finish. The logic of this function is +// the same as waitSet, minus the middle part where we wait for something to +// wake up. +func (bs *bsleep) waitQueue() { + for { + bs.mu.Lock() + if len(bs.queue) == 0 { + bs.mu.Unlock() + return + } + q0 := bs.queue[0] + bs.mu.Unlock() + if q0.sleepChs.clientWait != nil { select { case <-bs.c.die: + return case <-q0.sleepChs.clientWait: q0.sleepChs.clientWait = nil } @@ -755,7 +915,11 @@ func (bs *bsleep) wait() { case bs.c.wakeCh <- q0: } - <-q0.continueDequeue + select { + case <-bs.c.die: + return + case <-q0.continueDequeue: + } if q0.sleepChs.clientWait == nil { bs.mu.Lock() bs.queue = bs.queue[1:] @@ -802,6 +966,28 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32 return err } +// CoordinatorFor returns the node ID of the group or transaction coordinator +// for the given key. +func (c *Cluster) CoordinatorFor(key string) int32 { + var n int32 + c.admin(func() { + l := len(c.bs) + if l == 0 { + n = -1 + return + } + n = c.coordinator(key).node + }) + return n +} + +// RehashCoordinators simulates group and transacational ID coordinators moving +// around. All group and transactional IDs are rekeyed. This forces clients to +// reload coordinators. +func (c *Cluster) RehashCoordinators() { + c.coordinatorGen.Add(1) +} + // AddNode adds a node to the cluster. If nodeID is -1, the next node ID is // used. If port is 0 or negative, a random port is chosen. This returns the // added node ID and the port used, or an error if the node already exists or diff --git a/pkg/kfake/config.go b/pkg/kfake/config.go index d0f01106..75b34fb2 100644 --- a/pkg/kfake/config.go +++ b/pkg/kfake/config.go @@ -34,6 +34,8 @@ type cfg struct { enableSASL bool sasls map[struct{ m, u string }]string // cleared after client initialization tls *tls.Config + + sleepOutOfOrder bool } // NumBrokers sets the number of brokers to start in the fake cluster. @@ -113,3 +115,12 @@ func TLS(c *tls.Config) Opt { func SeedTopics(partitions int32, ts ...string) Opt { return opt{func(cfg *cfg) { cfg.seedTopics = append(cfg.seedTopics, seedTopics{partitions, ts}) }} } + +// SleepOutOfOrder allows functions to be handled out of order when control +// functions are sleeping. The functions are be handled internally out of +// order, but responses still wait for the sleeping requests to finish. This +// can be used to set up complicated chains of control where functions only +// advance when you know another request is actively being handled. +func SleepOutOfOrder() Opt { + return opt{func(cfg *cfg) { cfg.sleepOutOfOrder = true }} +} diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index 853c3d68..a0e5a98e 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -101,7 +101,8 @@ func (gs groupState) String() string { } func (c *Cluster) coordinator(id string) *broker { - n := hashString(id) % uint64(len(c.bs)) + gen := c.coordinatorGen.Load() + n := hashString(fmt.Sprint("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs)) return c.bs[n] }