Skip to content

Commit

Permalink
SendBatch: Retry retryable errors
Browse files Browse the repository at this point in the history
To match the behavior of SendRPC, SendBatch should retry RPCs that hit
retryable errors: region.RetryableError, region.ServerError, and
region.NotServingRegionError.

SendBatch will now retry each RPC that hits a retryable error. What
used to be a single step through of assigning regions to RPCs,
grouping them by region server and then dispatching the RPCs to their
respective servers, is now done in a loop. The first iteration of
the loop operates on the entire batch. Later iterations operate on the
set of RPCs that failed with retryable errors in the previous batch.
  • Loading branch information
aaronbee committed Oct 10, 2023
1 parent f750f62 commit ecff86c
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 30 deletions.
134 changes: 104 additions & 30 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ var (
// successfully. allOK is true if all calls completed successfully,
// and false if any calls failed and the errors in the results need to
// be checked.
//
// SendBatch will continue retrying each RPC in batch until it
// succeeds, fails with an unretryable error, or the context is done.
// SendBatch does not return until all RPCs have reached a terminal
// state (success, unretryable error), or the context is done.
func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
res []hrpc.RPCResult, allOK bool) {
if len(batch) == 0 {
Expand Down Expand Up @@ -235,37 +240,79 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) (
return res, allOK
}

rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}
sendBatchSplitCount.Observe(float64(len(rpcByClient)))
// Send and wait for responses loop. This loop will partition the
// batch per-regionserver batches, send those batches to the
// region server and wait for results. Any RPCs that hit retryable
// errors will be made into a new batch and passed through this
// loop again.

// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}
// unretryableErrorSeen set to true when any RPC in the batch hits
// an error that is not retryable. This is used to remember to
// return allOK=false even after we retry RPCs that hit retryable
// errors and those all succeed.
var unretryableErrorSeen bool
var retries []hrpc.Call
backoff := backoffStart

for {
rpcByClient, ok := c.findClients(ctx, batch, res)
if !ok {
return res, false
}

Check warning on line 261 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L260-L261

Added lines #L260 - L261 were not covered by tests
sendBatchSplitCount.Observe(float64(len(rpcByClient)))

var fail bool
func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
fail = true
// Send each group of RPCs to region client to be executed.
type clientAndRPCs struct {
client hrpc.RegionClient
rpcs []hrpc.Call
}
// keep track of the order requests are queued so that we can wait
// for their responses in the same order.
cAndRs := make([]clientAndRPCs, 0, len(rpcByClient))
for client, rpcs := range rpcByClient {
client.QueueBatch(ctx, rpcs)
cAndRs = append(cAndRs, clientAndRPCs{client, rpcs})
}

// batch wil be used to hold any RPCs that need to be retried
batch = batch[:0]
var needBackoff bool

func() { // func used to scope the span
ctx, sp := observability.StartSpan(ctx, "waitForResult")
defer sp.End()
for _, cAndR := range cAndRs {
shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion(
ctx, cAndR.client, cAndR.rpcs, res, rpcToRes)
if !ok {
allOK = false
retries = append(retries, shouldRetry...)
needBackoff = needBackoff || shouldBackoff
unretryableErrorSeen = unretryableErrorSeen || unretryableError
}
}
}()

// Exit retry loop if no RPCs are retryable because they all
// succeeded or hit unretryable errors, or the context is done
if len(retries) == 0 || ctx.Err() != nil {
break
}
}()
allOK = !fail
if needBackoff {
sp.AddEvent("retrySleep")
var err error
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
break

Check warning on line 306 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L306

Added line #L306 was not covered by tests
}
} else {
sp.AddEvent("retry")
}

Check warning on line 310 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L308-L310

Added lines #L308 - L310 were not covered by tests
// Set state for next loop iteration
batch = retries
retries = retries[:0]
allOK = !unretryableErrorSeen
}

return res, allOK
}
Expand Down Expand Up @@ -296,11 +343,19 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc.
return rpcByClient, ok
}

// waitForCompletion waits for the completion of all rpcs, updating
// the appropriate index in results with the help of rpcToRes. If all
// rpcs succeed then ok will return true, otherwise ok will return
// false, retryables will contain RPCs that can be retried, and
// shouldBackoff will be true if any RPCs need a backoff before
// retrying.
func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient,
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool {
rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) (
retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) {

ok := true
ok = true
canceledIndex := len(rpcs)

loop:
for i, rpc := range rpcs {
select {
Expand All @@ -309,7 +364,17 @@ loop:
if res.Error != nil {
c.handleResultError(res.Error, rpc.Region(), rc)
ok = false
switch res.Error.(type) {
case region.RetryableError:
shouldBackoff = true
retryables = append(retryables, rpc)
case region.ServerError, region.NotServingRegionError:
retryables = append(retryables, rpc)

Check warning on line 372 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L371-L372

Added lines #L371 - L372 were not covered by tests
default:
unretryableError = true
}
}

case <-ctx.Done():
canceledIndex = i
ok = false
Expand All @@ -333,7 +398,7 @@ loop:
}
}

return ok
return retryables, shouldBackoff, unretryableError, ok
}

func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) {
Expand Down Expand Up @@ -668,7 +733,16 @@ func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
}
}

// establishRegionOverride can be set by tests to override the
// behavior of establishRegion
var establishRegionOverride func(reg hrpc.RegionInfo, addr string)

func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
if establishRegionOverride != nil {
establishRegionOverride(reg, addr)
return
}

Check warning on line 744 in rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc.go#L742-L744

Added lines #L742 - L744 were not covered by tests

var backoff time.Duration
var err error
for {
Expand Down
128 changes: 128 additions & 0 deletions rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,15 @@ func TestFindClients(t *testing.T) {
}

func TestSendBatchWaitForCompletion(t *testing.T) {
estRegCh := make(chan hrpc.RegionInfo)
establishRegionOverride = func(reg hrpc.RegionInfo, addr string) {
estRegCh <- reg
}
defer func() {
close(estRegCh) // panic any unexpected calls to establishRegion
establishRegionOverride = nil
}()

c := newMockClient(nil)
// pretend regionserver:0 has meta table
rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0"))
Expand Down Expand Up @@ -1529,4 +1538,123 @@ func TestSendBatchWaitForCompletion(t *testing.T) {
}
}
})

t.Run("retryable error some", func(t *testing.T) {
batch := make([]hrpc.Call, 9)
for i := 0; i < 9; i++ {
// Create an RPC for each region, "a"-"i"
key := string(rune('a' + i))
batch[i] = newRPC(key)
}
var (
result []hrpc.RPCResult
ok bool
done = make(chan struct{})
)
go func() {
result, ok = c.SendBatch(context.Background(), batch)
close(done)
}()

for i := 0; i < 9; i++ {
// Error some responses
if i%2 == 0 {
batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}}
continue
}
// Using an Int32 as a result. A real result would be a
// MutateResponse, but any proto.Message works for the test.
batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))}
}

// We should see retries on retryable errors. So now send the
// correct response.
for i := 0; i < 9; i++ {
if i%2 != 0 {
continue
}
batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))}
}

<-done
if !ok {
t.Errorf("unexpected !ok")
}
if len(result) != 9 {
t.Fatalf("unexpected result size: %v", result)
}

for i, r := range result {
if r.Error != nil {
t.Errorf("unexpected error: %s", r.Error)
continue
}
if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) {
t.Errorf("unexpected result: %v", r.Msg)
}
}

})

t.Run("retryable error and non-retryable errors", func(t *testing.T) {
batch := make([]hrpc.Call, 9)
for i := 0; i < 9; i++ {
// Create an RPC for each region, "a"-"i"
key := string(rune('a' + i))
batch[i] = newRPC(key)
}
var (
result []hrpc.RPCResult
ok bool
done = make(chan struct{})
)
go func() {
result, ok = c.SendBatch(context.Background(), batch)
close(done)
}()

for i := 0; i < 9; i++ {
if i%4 == 0 {
// Non-retryable error for some
batch[i].ResultChan() <- hrpc.RPCResult{Error: errors.New("error")}
continue
}
// Retryable error for some
batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}}
}

// We should see retries on retryable errors. So now send the
// correct response.
for i := 0; i < 9; i++ {
if i%4 == 0 {
continue
}
batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))}
}
<-done
if ok {
t.Errorf("expected !ok")
}
if len(result) != 9 {
t.Fatalf("unexpected result size: %v", result)
}
for i, r := range result {
if i%4 == 0 {
if r.Error == nil || r.Error.Error() != "error" {
t.Errorf("expected error but got: %v", r.Error)
}
if r.Msg != nil {
t.Errorf("unexpected Msg: %v", r.Msg)
}
continue
}
if r.Error != nil {
t.Errorf("unexpected error: %s", r.Error)
continue
}
if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) {
t.Errorf("unexpected result: %v", r.Msg)
}
}
})
}

0 comments on commit ecff86c

Please sign in to comment.