Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11903
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Dec 24, 2024
1 parent 5969c65 commit 32093ce
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 15 deletions.
79 changes: 79 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,78 @@ import (
"golang.org/x/sync/errgroup"
)

<<<<<<< HEAD
=======
const (
// Maximum total sleep time(in ms), 20 seconds.
tikvRequestMaxBackoff = 20000

// TiCDC always interacts with region leader, every time something goes wrong,
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region anymore.
regionScheduleReload = false

scanRegionsConcurrency = 1024

loadRegionRetryInterval time.Duration = 100 * time.Millisecond
resolveLockMinInterval time.Duration = 10 * time.Second
invalidSubscriptionID SubscriptionID = SubscriptionID(0)
)

var (
// To generate an ID for a new subscription. And the subscription ID will also be used as
// `RequestId` in region requests of the table.
subscriptionIDGen atomic.Uint64
// To generate a streamID in `newStream`.
streamIDGen atomic.Uint64
)

var (
// unreachable error, only used in unit test
errUnreachable = errors.New("kv client unreachable error")
logPanic = log.Panic
)

var (
metricFeedNotLeaderCounter = eventFeedErrorCounter.WithLabelValues("NotLeader")
metricFeedEpochNotMatchCounter = eventFeedErrorCounter.WithLabelValues("EpochNotMatch")
metricFeedRegionNotFoundCounter = eventFeedErrorCounter.WithLabelValues("RegionNotFound")
metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest")
metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown")
metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable")
metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr")
metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore")
metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy")
metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested")
)

type eventError struct {
err *cdcpb.Error
}

// Error implement error interface.
func (e *eventError) Error() string {
return e.err.String()
}

type rpcCtxUnavailableErr struct {
verID tikv.RegionVerID
}

func (e *rpcCtxUnavailableErr) Error() string {
return fmt.Sprintf("cannot get rpcCtx for region %v. ver:%v, confver:%v",
e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer())
}

type getStoreErr struct{}

func (e *getStoreErr) Error() string { return "get store error" }

type sendRequestToStoreErr struct{}

func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }

>>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903))
// SubscriptionID comes from `SharedClient.AllocSubscriptionID`.
type SubscriptionID uint64

Expand Down Expand Up @@ -624,6 +696,13 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo)
metricFeedRPCCtxUnavailable.Inc()
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable)
return nil
case *getStoreErr:
metricGetStoreErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
// cannot get the store the region belongs to, so we need to reload the region.
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err)
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable)
return nil
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
Expand Down
100 changes: 100 additions & 0 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tidb/pkg/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/cdc/kv/regionlock"
Expand Down Expand Up @@ -212,6 +213,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {
}
}

func TestGetStoreFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

events1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataServer(events1)
server1, addr1 := newMockService(ctx, t, srv1, wg)

rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())

pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}

grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)

regionCache := tikv.NewRegionCache(pdClient)

pdClock := pdutil.NewClock4Test()

kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
require.Nil(t, err)
lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{})

invalidStore1 := "localhost:1"
invalidStore2 := "localhost:2"
cluster.AddStore(1, addr1)
cluster.AddStore(2, invalidStore1)
cluster.AddStore(3, invalidStore2)
cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4)

client := NewSharedClient(
model.ChangeFeedID{ID: "test"},
&config.ServerConfig{
KVClient: &config.KVClientConfig{
WorkerConcurrent: 1,
GrpcStreamConcurrent: 1,
AdvanceIntervalInMs: 10,
},
Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}},
},
false, pdClient, grpcPool, regionCache, pdClock, lockResolver,
)

defer func() {
cancel()
client.Close()
_ = kvStorage.Close()
regionCache.Close()
pdClient.Close()
srv1.wg.Wait()
server1.Stop()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := client.Run(ctx)
require.Equal(t, context.Canceled, errors.Cause(err))
}()

failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`)
subID := client.AllocSubscriptionID()
span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")}
eventCh := make(chan MultiplexingEvent, 50)
client.Subscribe(subID, span, 1, eventCh)

makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent {
return &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts},
},
},
}
}

checkTsEvent := func(event model.RegionFeedEvent, ts uint64) {
require.Equal(t, ts, event.Resolved.ResolvedTs)
}

events1 <- mockInitializedEvent(11, uint64(subID))
ts := oracle.GoTimeToTS(pdClock.CurrentTime())
events1 <- makeTsEvent(11, ts, uint64(subID))
select {
case <-eventCh:
require.True(t, false, "should not get event when get store failed")
case <-time.After(5 * time.Second):
}
failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed")
select {
case event := <-eventCh:
checkTsEvent(event.RegionFeedEvent, ts)
case <-time.After(5 * time.Second):
require.True(t, false, "reconnection not succeed in 5 second")
}
}

type mockChangeDataServer struct {
ch chan *cdcpb.ChangeDataEvent
wg sync.WaitGroup
Expand Down
45 changes: 30 additions & 15 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
"github.com/pingcap/tiflow/pkg/chann"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,12 +88,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
if err := waitForPreFetching(); err != nil {
return err
}
if canceled := stream.run(ctx, c, r); canceled {
return nil
var regionErr error
if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil {
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Uint64("storeID", r.storeID),
zap.String("addr", r.storeAddr),
zap.Error(err))
if errors.Cause(err) == context.Canceled {
return nil
} else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) {
regionErr = &getStoreErr{}
} else {
regionErr = &sendRequestToStoreErr{}
}
} else {
if canceled := stream.run(ctx, c, r); canceled {
return nil
}
regionErr = &sendRequestToStoreErr{}
}
for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
state.markStopped(regionErr)
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
_ = c.workers[slot].sendEvent(ctx, sfEvent)
Expand All @@ -105,7 +125,11 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
// It means it's a special task for stopping the table.
continue
}
<<<<<<< HEAD
c.onRegionFail(newRegionErrorInfo(sri, &sendRequestToStoreErr{}))
=======
c.onRegionFail(newRegionErrorInfo(region, regionErr))
>>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903))
}
if err := util.Hang(ctx, time.Second); err != nil {
return err
Expand All @@ -132,17 +156,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
}
}

if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil {
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return isCanceled()
}

log.Info("event feed going to create grpc stream",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
Expand Down Expand Up @@ -339,7 +352,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
Request: &cdcpb.ChangeDataRequest_Deregister_{
Deregister: &cdcpb.ChangeDataRequest_Deregister{},
},
}
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/util/engine"
Expand Down Expand Up @@ -196,6 +197,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
// If storeID is 0, it checks all TiKV.
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
failpoint.Inject("GetStoreFailed", func() {
failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID)))
})
var stores []*metapb.Store
var err error
if storeID == 0 {
Expand Down

0 comments on commit 32093ce

Please sign in to comment.