From c9c9b592b2961108b3f91a72d9a117cc78e25251 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 17 Dec 2024 21:10:35 +0800 Subject: [PATCH 1/7] fix retry logic when check store version failed --- cdc/kv/shared_client.go | 12 +++++++++++ cdc/kv/shared_stream.go | 46 ++++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 8138b111570..5c33f598627 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -96,6 +96,7 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy") ) @@ -118,6 +119,10 @@ func (e *rpcCtxUnavailableErr) Error() string { e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) } +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } + type sendRequestToStoreErr struct{} func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } @@ -729,6 +734,13 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf metricFeedRPCCtxUnavailable.Inc() s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) return nil + case *getStoreErr: + metricGetStoreErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + // cannot get the store the region belongs to, so we need to reload the region. + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 1427d80daf6..f93ff2ac228 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -90,12 +91,26 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque if err := waitForPreFetching(); err != nil { return err } - if canceled := stream.run(ctx, c, r); canceled { + err := stream.run(ctx, c, r) + log.Info("event feed grpc stream exits", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + var regionErr error + if err == nil || errors.Cause(err) == context.Canceled { return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} } + for _, m := range stream.clearStates() { for _, state := range m { - state.markStopped(&sendRequestToStoreErr{}) + state.markStopped(regionErr) sfEvent := newEventItem(nil, state, stream) slot := hashRegionID(state.region.verID.GetID(), len(c.workers)) _ = c.workers[slot].sendEvent(ctx, sfEvent) @@ -108,7 +123,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // It means it's a special task for stopping the table. continue } - c.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + c.onRegionFail(newRegionErrorInfo(region, regionErr)) } if err := util.Hang(ctx, time.Second); err != nil { return err @@ -125,16 +140,7 @@ func newRequestedStream(streamID uint64) *requestedStream { return stream } -func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) { - isCanceled := func() bool { - select { - case <-ctx.Done(): - return true - default: - return false - } - } - +func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) error { if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil { log.Info("event feed check store version fails", zap.String("namespace", c.changefeed.Namespace), @@ -143,7 +149,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste zap.Uint64("storeID", rs.storeID), zap.String("addr", rs.storeAddr), zap.Error(err)) - return isCanceled() + return err } log.Info("event feed going to create grpc stream", @@ -154,13 +160,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste zap.String("addr", rs.storeAddr)) defer func() { - log.Info("event feed grpc stream exits", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Bool("canceled", canceled)) if s.multiplexing != nil { s.multiplexing = nil } else if s.tableExclusives != nil { @@ -181,7 +180,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste zap.Uint64("storeID", rs.storeID), zap.String("addr", rs.storeAddr), zap.Error(err)) - return isCanceled() + return err } if cc.Multiplexing() { @@ -211,8 +210,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste }) } g.Go(func() error { return s.send(gctx, c, rs) }) - _ = g.Wait() - return isCanceled() + return g.Wait() } func (s *requestedStream) receive( From 5c334b854272170a8a20e4883521b227326a71eb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 17 Dec 2024 22:10:38 +0800 Subject: [PATCH 2/7] test wip --- cdc/kv/client_mock_test.go | 6 +++ cdc/kv/shared_client_test.go | 95 ++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/cdc/kv/client_mock_test.go b/cdc/kv/client_mock_test.go index e077006b746..28c91c2a007 100644 --- a/cdc/kv/client_mock_test.go +++ b/cdc/kv/client_mock_test.go @@ -18,9 +18,12 @@ package kv import ( "context" + "fmt" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/metapb" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" ) @@ -33,6 +36,9 @@ type mockPDClient struct { var _ pd.Client = &mockPDClient{} func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(nil, cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) s, err := m.Client.GetStore(ctx, storeID) if err != nil { return nil, err diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 08b1488226c..71521f3eacc 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" @@ -261,6 +262,100 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { } } +func TestGetStoreFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore1 := "localhost:1" + invalidStore2 := "localhost:2" + cluster.AddStore(1, addr1) + cluster.AddStore(2, invalidStore1) + cluster.AddStore(3, invalidStore2) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + server1.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/GetStoreFailed", `return(true)`) + // defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + type mockChangeDataServer struct { ch chan *cdcpb.ChangeDataEvent wg sync.WaitGroup From 729aa639aefca6e542b7c72e0a77aece4bd11b83 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 17 Dec 2024 22:12:48 +0800 Subject: [PATCH 3/7] test wip --- cdc/kv/shared_client_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 71521f3eacc..09853608452 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -347,7 +347,13 @@ func TestGetStoreFailed(t *testing.T) { ts := oracle.GoTimeToTS(pdClock.CurrentTime()) events1 <- makeTsEvent(11, ts, uint64(subID)) failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/GetStoreFailed", `return(true)`) - // defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + select { + case event := <-eventCh: + require.True(t, false, "should not get event when get store failed") + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + } + failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/GetStoreFailed") select { case event := <-eventCh: checkTsEvent(event.RegionFeedEvent, ts) From 9ec73a86c203e57321c8bb9dec89852c53707c31 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 17 Dec 2024 22:41:39 +0800 Subject: [PATCH 4/7] add test --- cdc/kv/client_mock_test.go | 6 ------ cdc/kv/shared_client_test.go | 7 +++---- pkg/version/check.go | 4 ++++ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/cdc/kv/client_mock_test.go b/cdc/kv/client_mock_test.go index 28c91c2a007..e077006b746 100644 --- a/cdc/kv/client_mock_test.go +++ b/cdc/kv/client_mock_test.go @@ -18,12 +18,9 @@ package kv import ( "context" - "fmt" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/metapb" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" ) @@ -36,9 +33,6 @@ type mockPDClient struct { var _ pd.Client = &mockPDClient{} func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { - failpoint.Inject("GetStoreFailed", func() { - failpoint.Return(nil, cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) - }) s, err := m.Client.GetStore(ctx, storeID) if err != nil { return nil, err diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 09853608452..c2e25532359 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -322,6 +322,7 @@ func TestGetStoreFailed(t *testing.T) { require.Equal(t, context.Canceled, errors.Cause(err)) }() + failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`) subID := client.AllocSubscriptionID() span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} eventCh := make(chan MultiplexingEvent, 50) @@ -346,14 +347,12 @@ func TestGetStoreFailed(t *testing.T) { events1 <- mockInitializedEvent(11, uint64(subID)) ts := oracle.GoTimeToTS(pdClock.CurrentTime()) events1 <- makeTsEvent(11, ts, uint64(subID)) - failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/GetStoreFailed", `return(true)`) select { - case event := <-eventCh: + case <-eventCh: require.True(t, false, "should not get event when get store failed") - checkTsEvent(event.RegionFeedEvent, ts) case <-time.After(5 * time.Second): } - failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/GetStoreFailed") + failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed") select { case event := <-eventCh: checkTsEvent(event.RegionFeedEvent, ts) diff --git a/pkg/version/check.go b/pkg/version/check.go index 9237eab4154..6044d1adaf1 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/util/engine" @@ -197,6 +198,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre // CheckStoreVersion checks whether the given TiKV is compatible with this CDC. // If storeID is 0, it checks all TiKV. func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) var stores []*metapb.Store var err error if storeID == 0 { From fe8986a44a2b8c8d23467111bf83d0060992178a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 11:28:07 +0800 Subject: [PATCH 5/7] refactor --- cdc/kv/shared_stream.go | 57 +++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index f93ff2ac228..8bc4314805f 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" - cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -91,23 +90,25 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque if err := waitForPreFetching(); err != nil { return err } - err := stream.run(ctx, c, r) - log.Info("event feed grpc stream exits", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", stream.streamID), - zap.Uint64("storeID", r.storeID), - zap.String("addr", r.storeAddr), - zap.Error(err)) var regionErr error - if err == nil || errors.Cause(err) == context.Canceled { - return nil - } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil { + log.Info("event feed check store version fails", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } regionErr = &getStoreErr{} } else { + if canceled := stream.run(ctx, c, r); canceled { + return nil + } regionErr = &sendRequestToStoreErr{} } - for _, m := range stream.clearStates() { for _, state := range m { state.markStopped(regionErr) @@ -140,16 +141,14 @@ func newRequestedStream(streamID uint64) *requestedStream { return stream } -func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) error { - if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil { - log.Info("event feed check store version fails", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) - return err +func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) { + isCanceled := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } } log.Info("event feed going to create grpc stream", @@ -160,6 +159,13 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste zap.String("addr", rs.storeAddr)) defer func() { + log.Info("event feed grpc stream exits", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Bool("canceled", canceled)) if s.multiplexing != nil { s.multiplexing = nil } else if s.tableExclusives != nil { @@ -180,7 +186,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste zap.Uint64("storeID", rs.storeID), zap.String("addr", rs.storeAddr), zap.Error(err)) - return err + return isCanceled() } if cc.Multiplexing() { @@ -210,7 +216,8 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste }) } g.Go(func() error { return s.send(gctx, c, rs) }) - return g.Wait() + _ = g.Wait() + return isCanceled() } func (s *requestedStream) receive( From 472ef3b9181b331f1e8731393b52ef31b696c87e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 11:33:33 +0800 Subject: [PATCH 6/7] small fix --- cdc/kv/shared_stream.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 8bc4314805f..a9d1e01cb3e 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -347,7 +347,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request if s.multiplexing != nil { req := &cdcpb.ChangeDataRequest{ RequestId: uint64(subscriptionID), - Request: &cdcpb.ChangeDataRequest_Deregister_{}, + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, } if err = doSend(s.multiplexing, req, subscriptionID); err != nil { return err From e434beb1fb7fba7d52be6b0c0a088458349d526e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 11:39:20 +0800 Subject: [PATCH 7/7] distiguish error return by checkStoreVersion --- cdc/kv/shared_stream.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index a9d1e01cb3e..552a642e151 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -101,8 +102,11 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque zap.Error(err)) if errors.Cause(err) == context.Canceled { return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} } - regionErr = &getStoreErr{} } else { if canceled := stream.run(ctx, c, r); canceled { return nil