diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 9043946c86c..3f4bc23dea9 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -104,6 +104,7 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore") ) @@ -660,8 +661,17 @@ func (s *eventFeedSession) requestRegionToStore( time.Sleep(delay) } bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) - errInfo := newRegionErrorInfo(sri, &connectToStoreErr{}) + var regionErr error + var scheduleReload bool + if cerror.Is(err, cerror.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + scheduleReload = true + } else { + regionErr = &connectToStoreErr{} + scheduleReload = regionScheduleReload + } + s.client.regionCache.OnSendFail(bo, rpcCtx, scheduleReload, err) + errInfo := newRegionErrorInfo(sri, regionErr) s.onRegionFail(ctx, errInfo) continue } @@ -1492,3 +1502,7 @@ func (e *connectToStoreErr) Error() string { return "connect to store error" } type sendRequestToStoreErr struct{} func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index ed15587d031..60b3038cda4 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -464,6 +464,9 @@ func (s *SharedClient) divideAndScheduleRegions( bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.StartKey, nextSpan.EndKey, limit) if err != nil { + if errors.Cause(err) == context.Canceled { + return nil + } log.Warn("event feed load regions failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -624,6 +627,13 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo) metricFeedRPCCtxUnavailable.Inc() s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) return nil + case *getStoreErr: + metricGetStoreErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + // cannot get the store the region belongs to, so we need to reload the region. + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) + return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) @@ -658,6 +668,10 @@ func (s *SharedClient) resolveLock(ctx context.Context) error { } doResolve := func(regionID uint64, state *regionlock.LockedRange, maxVersion uint64) { + if state == nil { + log.Warn("found nil state in resolve lock", zap.Uint64("regionID", regionID)) + return + } if state.ResolvedTs.Load() > maxVersion || !state.Initialzied.Load() { return } diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 3d006e01275..496ea5c6ff6 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" "github.com/pingcap/tiflow/cdc/kv/regionlock" @@ -212,6 +213,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { } } +func TestGetStoreFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore1 := "localhost:1" + invalidStore2 := "localhost:2" + cluster.AddStore(1, addr1) + cluster.AddStore(2, invalidStore1) + cluster.AddStore(3, invalidStore2) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + server1.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`) + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + select { + case <-eventCh: + require.True(t, false, "should not get event when get store failed") + case <-time.After(5 * time.Second): + } + failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed") + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + type mockChangeDataServer struct { ch chan *cdcpb.ChangeDataEvent wg sync.WaitGroup diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index a2a3f4f6540..42296288919 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -87,12 +88,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque if err := waitForPreFetching(); err != nil { return err } - if canceled := stream.run(ctx, c, r); canceled { - return nil + var regionErr error + if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil { + log.Info("event feed check store version fails", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} + } + } else { + if canceled := stream.run(ctx, c, r); canceled { + return nil + } + regionErr = &sendRequestToStoreErr{} } for _, m := range stream.clearStates() { for _, state := range m { - state.markStopped(&sendRequestToStoreErr{}) + state.markStopped(regionErr) sfEvent := newEventItem(nil, state, stream) slot := hashRegionID(state.sri.verID.GetID(), len(c.workers)) _ = c.workers[slot].sendEvent(ctx, sfEvent) @@ -105,7 +125,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // It means it's a special task for stopping the table. continue } - c.onRegionFail(newRegionErrorInfo(sri, &sendRequestToStoreErr{})) + c.onRegionFail(newRegionErrorInfo(sri, regionErr)) } if err := util.Hang(ctx, time.Second); err != nil { return err @@ -132,17 +152,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste } } - if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil { - log.Info("event feed check store version fails", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) - return isCanceled() - } - log.Info("event feed going to create grpc stream", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), @@ -339,7 +348,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request if s.multiplexing != nil { req := &cdcpb.ChangeDataRequest{ RequestId: uint64(subscriptionID), - Request: &cdcpb.ChangeDataRequest_Deregister_{}, + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, } if err = doSend(s.multiplexing, req, subscriptionID); err != nil { return err diff --git a/pkg/version/check.go b/pkg/version/check.go index 1ab4af6c1ab..043803354c1 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/util/engine" @@ -196,6 +197,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre // CheckStoreVersion checks whether the given TiKV is compatible with this CDC. // If storeID is 0, it checks all TiKV. func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) var stores []*metapb.Store var err error if storeID == 0 {