diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 789fbfa6f30..3f116c47f01 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -85,6 +85,7 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy") metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested") @@ -108,6 +109,10 @@ func (e *rpcCtxUnavailableErr) Error() string { e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) } +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } + type sendRequestToStoreErr struct{} func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } @@ -739,6 +744,13 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf metricFeedRPCCtxUnavailable.Inc() s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) return nil + case *getStoreErr: + metricGetStoreErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + // cannot get the store the region belongs to, so we need to reload the region. + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 765a38f0726..b6db06e5c82 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" @@ -261,6 +262,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { } } +func TestGetStoreFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore1 := "localhost:1" + invalidStore2 := "localhost:2" + cluster.AddStore(1, addr1) + cluster.AddStore(2, invalidStore1) + cluster.AddStore(3, invalidStore2) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + server1.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`) + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + select { + case <-eventCh: + require.True(t, false, "should not get event when get store failed") + case <-time.After(5 * time.Second): + } + failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed") + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + type mockChangeDataServer struct { ch chan *cdcpb.ChangeDataEvent wg sync.WaitGroup diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 7a9b1204f89..419cf4b0d12 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -90,12 +91,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque if err := waitForPreFetching(); err != nil { return err } - if canceled := stream.run(ctx, c, r); canceled { - return nil + var regionErr error + if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil { + log.Info("event feed check store version fails", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} + } + } else { + if canceled := stream.run(ctx, c, r); canceled { + return nil + } + regionErr = &sendRequestToStoreErr{} } for _, m := range stream.clearStates() { for _, state := range m { - state.markStopped(&sendRequestToStoreErr{}) + state.markStopped(regionErr) sfEvent := newEventItem(nil, state, stream) slot := hashRegionID(state.region.verID.GetID(), len(c.workers)) _ = c.workers[slot].sendEvent(ctx, sfEvent) @@ -108,7 +128,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // It means it's a special task for stopping the table. continue } - c.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + c.onRegionFail(newRegionErrorInfo(region, regionErr)) } if err := util.Hang(ctx, time.Second); err != nil { return err @@ -135,17 +155,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste } } - if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil { - log.Info("event feed check store version fails", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) - return isCanceled() - } - log.Info("event feed going to create grpc stream", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), @@ -310,7 +319,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request if s.multiplexing != nil { req := &cdcpb.ChangeDataRequest{ RequestId: uint64(subscriptionID), - Request: &cdcpb.ChangeDataRequest_Deregister_{}, + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, } if err = s.multiplexing.Client().Send(req); err != nil { log.Warn("event feed send deregister request to grpc stream failed", diff --git a/pkg/version/check.go b/pkg/version/check.go index 766623b0a47..835289bf90b 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/util/engine" @@ -199,6 +200,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre // CheckStoreVersion checks whether the given TiKV is compatible with this CDC. // If storeID is 0, it checks all TiKV. func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) var stores []*metapb.Store var err error if storeID == 0 {