Skip to content

Commit

Permalink
SendBatch: Retry retryable errors
Browse files Browse the repository at this point in the history
To match the behavior of SendRPC, SendBatch should retry RPCs that hit
retryable errors: region.RetryableError, region.ServerError, and
region.NotServingRegionError.

SendBatch will now retry each RPC that hits a retryable error. What
used to be a single step through of assigning regions to RPCs,
grouping them by region server and then dispatching the RPCs to their
respective servers, is now done in a loop. The first iteration of
the loop operates on the entire batch. Later iterations operate on the
set of RPCs that failed with retryable errors in the previous batch.
  • Loading branch information
aaronbee committed Oct 6, 2023
1 parent f750f62 commit 32cc917
Showing 1 changed file with 91 additions and 30 deletions.
121 changes: 91 additions & 30 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ var (
// successfully. allOK is true if all calls completed successfully,
// and false if any calls failed and the errors in the results need to
// be checked.
//
// SendBatch will continue retrying each RPC in batch until it
// succeeds, fails with an unretryable error, or the context times out
// or is canceled. SendBatch does not return until all RPCs have
// reached a terminal state (success, unretryable error), or the
// context times out or is canceled.
func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
res []hrpc.RPCResult, allOK bool) {
if len(batch) == 0 {
Expand Down Expand Up @@ -235,37 +241,74 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
return res, allOK
}

rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))
// Send and wait for responses loop. This loop will partition the
// batch per-regionserver batches, send those batches to the
// region server and wait for results. Any RPCs that hit retryable
// errors will be made into a new batch and passed through this
// loop again.

// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}
// unretryableErrorSeen set to true when any RPC in the batch hits
// an error that is not retryable. This is used to remember to
// return allOK=false even after we retry RPCs that hit retryable
// errors and those all succeed.
var unretryableErrorSeen bool
backoff := backoffStart
for {
allOK = !unretryableErrorSeen // reset allOK in case this is a retry
rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))

// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

// batch wil be used to hold any RPCs that need to be retried
batch = batch[:0]
var needBackoff bool

func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs,
res, rpcToRes)
if !ok {
allOK = false
batch = append(batch, shouldRetry...)
needBackoff = needBackoff || shouldBackoff
unretryableErrorSeen = unretryableErrorSeen || unretryableError
}
}
}()

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
fail = true
// Exit retry loop if no RPCs are retryable because they all
// succeeded or hit unretryable errors, or the context is done
if len(batch) == 0 || ctx.Err() != nil {
break
}
if needBackoff {
sp.AddEvent("retrySleep")
var err error
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
break
}
} else {
sp.AddEvent("retry")
}
}()
allOK = !fail
}

return res, allOK
}
Expand Down Expand Up @@ -296,11 +339,19 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc.
return rpcByClient, ok
}

// waitForCompletion waits for the completion of all rpcs, updating
// the appropriate index in results with the help of rpcToRes. If all
// rpcs succeed then ok will return true, otherwise ok will return
// false, retryables will contain RPCs that can be retried, and
// shouldBackoff will be true if any RPCs need a backoff before
// retrying.
func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient,
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool {
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) (
retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) {

ok := true
ok = true
canceledIndex := len(rpcs)

loop:
for i, rpc := range rpcs {
select {
Expand All @@ -309,7 +360,17 @@ loop:
if res.Error != nil {
c.handleResultError(res.Error, rpc.Region(), rc)
ok = false
switch res.Error.(type) {
case region.RetryableError:
shouldBackoff = true
retryables = append(retryables, rpc)
case region.ServerError, region.NotServingRegionError:
retryables = append(retryables, rpc)
default:
unretryableError = true
}
}

case <-ctx.Done():
canceledIndex = i
ok = false
Expand All @@ -333,7 +394,7 @@ loop:
}
}

return ok
return retryables, shouldBackoff, unretryableError, ok
}

func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) {
Expand Down

0 comments on commit 32cc917

Please sign in to comment.