From 32cc917fcc8344347e6bb5235f1cd661711b0cee Mon Sep 17 00:00:00 2001 From: Aaron Beitch Date: Fri, 6 Oct 2023 11:35:39 -0700 Subject: [PATCH] SendBatch: Retry retryable errors To match the behavior of SendRPC, SendBatch should retry RPCs that hit retryable errors: region.RetryableError, region.ServerError, and region.NotServingRegionError. SendBatch will now retry each RPC that hits a retryable error. What used to be a single step through of assigning regions to RPCs, grouping them by region server and then dispatching the RPCs to their respective servers, is now done in a loop. The first iteration of the loop operates on the entire batch. Later iterations operate on the set of RPCs that failed with retryable errors in the previous batch. --- rpc.go | 121 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/rpc.go b/rpc.go index 7eb7fca3..402888aa 100644 --- a/rpc.go +++ b/rpc.go @@ -180,6 +180,12 @@ var ( // successfully. allOK is true if all calls completed successfully, // and false if any calls failed and the errors in the results need to // be checked. +// +// SendBatch will continue retrying each RPC in batch until it +// succeeds, fails with an unretryable error, or the context times out +// or is canceled. SendBatch does not return until all RPCs have +// reached a terminal state (success, unretryable error), or the +// context times out or is canceled. func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( res []hrpc.RPCResult, allOK bool) { if len(batch) == 0 { @@ -235,37 +241,74 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( return res, allOK } - rpcByClient, ok := c.findClients(ctx, batch, res) - if !ok { - return res, false - } - sendBatchSplitCount.Observe(float64(len(rpcByClient))) + // Send and wait for responses loop. This loop will partition the + // batch per-regionserver batches, send those batches to the + // region server and wait for results. Any RPCs that hit retryable + // errors will be made into a new batch and passed through this + // loop again. - // Send each group of RPCs to region client to be executed. - type clientAndRPCs struct { - client hrpc.RegionClient - rpcs []hrpc.Call - } - // keep track of the order requests are queued so that we can wait - // for their responses in the same order. - cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) - for client, rpcs := range rpcByClient { - client.QueueBatch(ctx, rpcs) - cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) - } + // unretryableErrorSeen set to true when any RPC in the batch hits + // an error that is not retryable. This is used to remember to + // return allOK=false even after we retry RPCs that hit retryable + // errors and those all succeed. + var unretryableErrorSeen bool + backoff := backoffStart + for { + allOK = !unretryableErrorSeen // reset allOK in case this is a retry + rpcByClient, ok := c.findClients(ctx, batch, res) + if !ok { + return res, false + } + sendBatchSplitCount.Observe(float64(len(rpcByClient))) + + // Send each group of RPCs to region client to be executed. + type clientAndRPCs struct { + client hrpc.RegionClient + rpcs []hrpc.Call + } + // keep track of the order requests are queued so that we can wait + // for their responses in the same order. + cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) + for client, rpcs := range rpcByClient { + client.QueueBatch(ctx, rpcs) + cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) + } + + // batch wil be used to hold any RPCs that need to be retried + batch = batch[:0] + var needBackoff bool + + func() { // func used to scope the span + ctx, sp := observability.StartSpan(ctx, "waitForResult") + defer sp.End() + for _, cAndR := range cAndRs { + shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, + res, rpcToRes) + if !ok { + allOK = false + batch = append(batch, shouldRetry...) + needBackoff = needBackoff || shouldBackoff + unretryableErrorSeen = unretryableErrorSeen || unretryableError + } + } + }() - var fail bool - func() { // func used to scope the span - ctx, sp := observability.StartSpan(ctx, "waitForResult") - defer sp.End() - for _, cAndR := range cAndRs { - ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes) - if !ok { - fail = true + // Exit retry loop if no RPCs are retryable because they all + // succeeded or hit unretryable errors, or the context is done + if len(batch) == 0 || ctx.Err() != nil { + break + } + if needBackoff { + sp.AddEvent("retrySleep") + var err error + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + break } + } else { + sp.AddEvent("retry") } - }() - allOK = !fail + } return res, allOK } @@ -296,11 +339,19 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc. return rpcByClient, ok } +// waitForCompletion waits for the completion of all rpcs, updating +// the appropriate index in results with the help of rpcToRes. If all +// rpcs succeed then ok will return true, otherwise ok will return +// false, retryables will contain RPCs that can be retried, and +// shouldBackoff will be true if any RPCs need a backoff before +// retrying. func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient, - rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool { + rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) ( + retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) { - ok := true + ok = true canceledIndex := len(rpcs) + loop: for i, rpc := range rpcs { select { @@ -309,7 +360,17 @@ loop: if res.Error != nil { c.handleResultError(res.Error, rpc.Region(), rc) ok = false + switch res.Error.(type) { + case region.RetryableError: + shouldBackoff = true + retryables = append(retryables, rpc) + case region.ServerError, region.NotServingRegionError: + retryables = append(retryables, rpc) + default: + unretryableError = true + } } + case <-ctx.Done(): canceledIndex = i ok = false @@ -333,7 +394,7 @@ loop: } } - return ok + return retryables, shouldBackoff, unretryableError, ok } func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) {