Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SendBatch: Retry retryable errors #235

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 117 additions & 30 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@
// successfully. allOK is true if all calls completed successfully,
// and false if any calls failed and the errors in the results need to
// be checked.
//
// SendBatch will continue retrying each RPC in batch until it
// succeeds, fails with a non-retryable error, or the context is
// canceled.
func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
res []hrpc.RPCResult, allOK bool) {
if len(batch) == 0 {
Expand Down Expand Up @@ -235,37 +239,80 @@
return res, allOK
}

rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))
// Send and wait for responses loop. This loop will partition the
// batch per-regionserver batches, send those batches to the
// region server and wait for results. Any RPCs that hit retryable
// errors will be made into a new batch and passed through this
// loop again.

// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}
// unretryableErrorSeen set to true when any RPC in the batch hits
// an error that is not retryable. This is used to remember to
// return allOK=false even after we retry RPCs that hit retryable
// errors and those all succeed.
var unretryableErrorSeen bool
var retries []hrpc.Call
backoff := backoffStart

for {
rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}

Check warning on line 260 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L259-L260

Added lines #L259 - L260 were not covered by tests
sendBatchSplitCount.Observe(float64(len(rpcByClient)))

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
fail = true
// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

// batch wil be used to hold any RPCs that need to be retried
batch = batch[:0]
var needBackoff bool

func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion(
ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
allOK = false
retries = append(retries, shouldRetry...)
needBackoff = needBackoff || shouldBackoff
unretryableErrorSeen = unretryableErrorSeen || unretryableError
}
}
}()

// Exit retry loop if no RPCs are retryable because they all
// succeeded or hit unretryable errors (this is true if
// retries is empty), or the context is done.
if len(retries) == 0 || ctx.Err() != nil {
break
}
}()
allOK = !fail
if needBackoff {
sp.AddEvent("retrySleep")
var err error
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
break

Check warning on line 306 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L306

Added line #L306 was not covered by tests
}
} else {
sp.AddEvent("retry")
}
// Set state for next loop iteration
batch = retries
retries = retries[:0]
allOK = !unretryableErrorSeen
}

return res, allOK
}
Expand Down Expand Up @@ -296,11 +343,24 @@
return rpcByClient, ok
}

// waitForCompletion waits for the completion of all rpcs, updating
// the appropriate index in results with the help of rpcToRes. If all
// rpcs succeed then ok will return true, otherwise:
// - ok will be false
// - retryables will contain RPCs that can be retried
// - shouldBackoff will be true if any retryable RPCs need a backoff before retrying
// - unretryableError will be true if there were errors seen on RPCs
// that were not retryable. It communicates that retryables does
// not contain all the RPCs that failed, so even though those
// retryable RPCs may eventually succeed we need to return !ok to
// the caller of SendBatch.
func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient,
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool {
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) (
retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a comment about about the unretryableError in the doc comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


ok := true
ok = true
canceledIndex := len(rpcs)

loop:
for i, rpc := range rpcs {
select {
Expand All @@ -309,7 +369,17 @@
if res.Error != nil {
c.handleResultError(res.Error, rpc.Region(), rc)
ok = false
switch res.Error.(type) {
case region.RetryableError:
shouldBackoff = true
retryables = append(retryables, rpc)
case region.ServerError, region.NotServingRegionError:
retryables = append(retryables, rpc)
default:
unretryableError = true
}
}

case <-ctx.Done():
canceledIndex = i
ok = false
Expand All @@ -333,7 +403,7 @@
}
}

return ok
return retryables, shouldBackoff, unretryableError, ok
}

func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) {
Expand Down Expand Up @@ -668,7 +738,16 @@
}
}

// establishRegionOverride can be set by tests to override the
// behavior of establishRegion
var establishRegionOverride func(reg hrpc.RegionInfo, addr string)

func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
if establishRegionOverride != nil {
establishRegionOverride(reg, addr)
return
}

var backoff time.Duration
var err error
for {
Expand Down Expand Up @@ -803,7 +882,15 @@
}
}

// sleepAndIncreaseBackoffOverride can be set by tests to override the
// behavior of sleepAndIncreaseBackoff
var sleepAndIncreaseBackoffOverride func(
ctx context.Context, backoff time.Duration) (time.Duration, error)

func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
if sleepAndIncreaseBackoffOverride != nil {
return sleepAndIncreaseBackoffOverride(ctx, backoff)
}
if backoff == 0 {
return backoffStart, nil
}
Expand Down
Loading
Loading