From 271113020dc66284b07b12a9b611b01840c0472b Mon Sep 17 00:00:00 2001 From: Aaron Beitch Date: Fri, 6 Oct 2023 11:35:39 -0700 Subject: [PATCH] SendBatch: Retry retryable errors To match the behavior of SendRPC, SendBatch should retry RPCs that hit retryable errors: region.RetryableError, region.ServerError, and region.NotServingRegionError. SendBatch will now retry each RPC that hits a retryable error. What used to be a single step through of assigning regions to RPCs, grouping them by region server and then dispatching the RPCs to their respective servers, is now done in a loop. The first iteration of the loop operates on the entire batch. Later iterations operate on the set of RPCs that failed with retryable errors in the previous batch. --- rpc.go | 147 ++++++++++++++++++++++------ rpc_test.go | 271 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 388 insertions(+), 30 deletions(-) diff --git a/rpc.go b/rpc.go index 7eb7fca3..d86ec48c 100644 --- a/rpc.go +++ b/rpc.go @@ -180,6 +180,10 @@ var ( // successfully. allOK is true if all calls completed successfully, // and false if any calls failed and the errors in the results need to // be checked. +// +// SendBatch will continue retrying each RPC in batch until it +// succeeds, fails with a non-retryable error, or the context is +// canceled. func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( res []hrpc.RPCResult, allOK bool) { if len(batch) == 0 { @@ -235,37 +239,80 @@ func (c *client) SendBatch(ctx context.Context, batch []hrpc.Call) ( return res, allOK } - rpcByClient, ok := c.findClients(ctx, batch, res) - if !ok { - return res, false - } - sendBatchSplitCount.Observe(float64(len(rpcByClient))) + // Send and wait for responses loop. This loop will partition the + // batch per-regionserver batches, send those batches to the + // region server and wait for results. Any RPCs that hit retryable + // errors will be made into a new batch and passed through this + // loop again. - // Send each group of RPCs to region client to be executed. - type clientAndRPCs struct { - client hrpc.RegionClient - rpcs []hrpc.Call - } - // keep track of the order requests are queued so that we can wait - // for their responses in the same order. - cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) - for client, rpcs := range rpcByClient { - client.QueueBatch(ctx, rpcs) - cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) - } + // unretryableErrorSeen set to true when any RPC in the batch hits + // an error that is not retryable. This is used to remember to + // return allOK=false even after we retry RPCs that hit retryable + // errors and those all succeed. + var unretryableErrorSeen bool + var retries []hrpc.Call + backoff := backoffStart + + for { + rpcByClient, ok := c.findClients(ctx, batch, res) + if !ok { + return res, false + } + sendBatchSplitCount.Observe(float64(len(rpcByClient))) - var fail bool - func() { // func used to scope the span - ctx, sp := observability.StartSpan(ctx, "waitForResult") - defer sp.End() - for _, cAndR := range cAndRs { - ok := c.waitForCompletion(ctx, cAndR.client, cAndR.rpcs, res, rpcToRes) - if !ok { - fail = true + // Send each group of RPCs to region client to be executed. + type clientAndRPCs struct { + client hrpc.RegionClient + rpcs []hrpc.Call + } + // keep track of the order requests are queued so that we can wait + // for their responses in the same order. + cAndRs := make([]clientAndRPCs, 0, len(rpcByClient)) + for client, rpcs := range rpcByClient { + client.QueueBatch(ctx, rpcs) + cAndRs = append(cAndRs, clientAndRPCs{client, rpcs}) + } + + // batch wil be used to hold any RPCs that need to be retried + batch = batch[:0] + var needBackoff bool + + func() { // func used to scope the span + ctx, sp := observability.StartSpan(ctx, "waitForResult") + defer sp.End() + for _, cAndR := range cAndRs { + shouldRetry, shouldBackoff, unretryableError, ok := c.waitForCompletion( + ctx, cAndR.client, cAndR.rpcs, res, rpcToRes) + if !ok { + allOK = false + retries = append(retries, shouldRetry...) + needBackoff = needBackoff || shouldBackoff + unretryableErrorSeen = unretryableErrorSeen || unretryableError + } } + }() + + // Exit retry loop if no RPCs are retryable because they all + // succeeded or hit unretryable errors (this is true if + // retries is empty), or the context is done. + if len(retries) == 0 || ctx.Err() != nil { + break } - }() - allOK = !fail + if needBackoff { + sp.AddEvent("retrySleep") + var err error + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + break + } + } else { + sp.AddEvent("retry") + } + // Set state for next loop iteration + batch = retries + retries = retries[:0] + allOK = !unretryableErrorSeen + } return res, allOK } @@ -296,11 +343,24 @@ func (c *client) findClients(ctx context.Context, batch []hrpc.Call, res []hrpc. return rpcByClient, ok } +// waitForCompletion waits for the completion of all rpcs, updating +// the appropriate index in results with the help of rpcToRes. If all +// rpcs succeed then ok will return true, otherwise: +// - ok will be false +// - retryables will contain RPCs that can be retried +// - shouldBackoff will be true if any retryable RPCs need a backoff before retrying +// - unretryableError will be true if there were errors seen on RPCs +// that were not retryable. It communicates that retryables does +// not contain all the RPCs that failed, so even though those +// retryable RPCs may eventually succeed we need to return !ok to +// the caller of SendBatch. func (c *client) waitForCompletion(ctx context.Context, rc hrpc.RegionClient, - rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) bool { + rpcs []hrpc.Call, results []hrpc.RPCResult, rpcToRes map[hrpc.Call]int) ( + retryables []hrpc.Call, shouldBackoff, unretryableError, ok bool) { - ok := true + ok = true canceledIndex := len(rpcs) + loop: for i, rpc := range rpcs { select { @@ -309,7 +369,17 @@ loop: if res.Error != nil { c.handleResultError(res.Error, rpc.Region(), rc) ok = false + switch res.Error.(type) { + case region.RetryableError: + shouldBackoff = true + retryables = append(retryables, rpc) + case region.ServerError, region.NotServingRegionError: + retryables = append(retryables, rpc) + default: + unretryableError = true + } } + case <-ctx.Done(): canceledIndex = i ok = false @@ -333,7 +403,7 @@ loop: } } - return ok + return retryables, shouldBackoff, unretryableError, ok } func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.RegionClient) { @@ -668,7 +738,16 @@ func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error { } } +// establishRegionOverride can be set by tests to override the +// behavior of establishRegion +var establishRegionOverride func(reg hrpc.RegionInfo, addr string) + func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { + if establishRegionOverride != nil { + establishRegionOverride(reg, addr) + return + } + var backoff time.Duration var err error for { @@ -803,7 +882,15 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { } } +// sleepAndIncreaseBackoffOverride can be set by tests to override the +// behavior of sleepAndIncreaseBackoff +var sleepAndIncreaseBackoffOverride func( + ctx context.Context, backoff time.Duration) (time.Duration, error) + func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) { + if sleepAndIncreaseBackoffOverride != nil { + return sleepAndIncreaseBackoffOverride(ctx, backoff) + } if backoff == 0 { return backoffStart, nil } diff --git a/rpc_test.go b/rpc_test.go index 1d9c0375..12c88837 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -1172,6 +1172,23 @@ func TestFindClients(t *testing.T) { } func TestSendBatchWaitForCompletion(t *testing.T) { + sleepCh := make(chan struct{}) + sleepAndIncreaseBackoffOverride = func(ctx context.Context, backoff time.Duration) ( + time.Duration, error) { + sleepCh <- struct{}{} + return backoff, nil + } + estRegCh := make(chan hrpc.RegionInfo) + establishRegionOverride = func(reg hrpc.RegionInfo, addr string) { + estRegCh <- reg + } + defer func() { + close(sleepCh) // panic any unexpected calls to sleepAndIncreaseBackoff + sleepAndIncreaseBackoffOverride = nil + close(estRegCh) // panic any unexpected calls to establishRegion + establishRegionOverride = nil + }() + c := newMockClient(nil) // pretend regionserver:0 has meta table rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0")) @@ -1519,4 +1536,258 @@ func TestSendBatchWaitForCompletion(t *testing.T) { } } }) + + t.Run("retryable error some", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + // Error some responses + if i%2 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + continue + } + // Using an Int32 as a result. A real result would be a + // MutateResponse, but any proto.Message works for the test. + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + // We should see retries on retryable errors. So, handle the + // sleep and then send back new results. Half succeed, the + // other half get more retryable errors. + <-sleepCh // Expect one call to sleepAndIncreaseBackoff + for i := 0; i < 9; i++ { + if i%4 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + } else if i%2 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + } + + // Send successes for the remaining requests + <-sleepCh // Expect one call to sleepAndIncreaseBackoff + for i := 0; i < 9; i++ { + if i%4 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + } + + <-done + if !ok { + t.Errorf("unexpected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + + for i, r := range result { + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) + + t.Run("retryable error and non-retryable errors", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + if i%4 == 0 { + // Non-retryable error for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: errors.New("error")} + continue + } + // Retryable error for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + } + + <-sleepCh // Expect one call to sleepAndIncreaseBackoff + + // We should see retries on retryable errors. So now send the + // correct response. + for i := 0; i < 9; i++ { + if i%4 == 0 { + continue + } + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + <-done + if ok { + t.Errorf("expected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + for i, r := range result { + if i%4 == 0 { + if r.Error == nil || r.Error.Error() != "error" { + t.Errorf("expected error but got: %v", r.Error) + } + if r.Msg != nil { + t.Errorf("unexpected Msg: %v", r.Msg) + } + continue + } + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) + + t.Run("not serving region error", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ) + go func() { + result, ok = c.SendBatch(context.Background(), batch) + close(done) + }() + + for i := 0; i < 9; i++ { + if i%4 == 0 { + // NSRE for some + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.NotServingRegionError{}} + continue + } + // success for some + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + for i := 0; i < 9; i++ { + if i%4 != 0 { + continue + } + // For each failed NSRE we should expect an establishRegion call + reg := <-estRegCh + // reestablish the region: + reg.MarkAvailable() + } + + // We should see retries on the RPCs hitting NSREs. So now + // send the correct response. + for i := 0; i < 9; i++ { + if i%4 != 0 { + continue + } + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + <-done + if !ok { + t.Errorf("unexpected !ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + + for i, r := range result { + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) + + t.Run("retryable error some with context cancellation", func(t *testing.T) { + batch := make([]hrpc.Call, 9) + for i := 0; i < 9; i++ { + // Create an RPC for each region, "a"-"i" + key := string(rune('a' + i)) + batch[i] = newRPC(key) + } + var ( + result []hrpc.RPCResult + ok bool + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + result, ok = c.SendBatch(ctx, batch) + close(done) + }() + + for i := 0; i < 9; i++ { + // Error some responses + if i%2 == 0 { + batch[i].ResultChan() <- hrpc.RPCResult{Error: region.RetryableError{}} + continue + } + // Using an Int32 as a result. A real result would be a + // MutateResponse, but any proto.Message works for the test. + batch[i].ResultChan() <- hrpc.RPCResult{Msg: wrapperspb.Int32(int32(i))} + } + + cancel() + + <-done + if ok { + t.Errorf("unexpected ok") + } + if len(result) != 9 { + t.Fatalf("unexpected result size: %v", result) + } + + for i, r := range result { + if i%2 == 0 { + if r.Error == nil || !errors.Is(r.Error, region.RetryableError{}) { + t.Errorf("expected error but got: %v", r.Error) + } + if r.Msg != nil { + t.Errorf("unexpected Msg: %v", r.Msg) + } + continue + } + if r.Error != nil { + t.Errorf("unexpected error: %s", r.Error) + continue + } + if r.Msg.(*wrapperspb.Int32Value).Value != int32(i) { + t.Errorf("unexpected result: %v", r.Msg) + } + } + }) }