Skip to content

Commit

Permalink
Merge branch 'master' into improve-column
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Sep 25, 2024
2 parents eb539d1 + 93f6ab6 commit c0fcb0c
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 80 deletions.
2 changes: 1 addition & 1 deletion cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type regionInfo struct {
lockedRangeState *regionlock.LockedRangeState
}

func (s regionInfo) isStoped() bool {
func (s regionInfo) isStopped() bool {
// lockedRange only nil when the region's subscribedTable is stopped.
return s.lockedRangeState == nil
}
Expand Down
45 changes: 31 additions & 14 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startT
s.totalSpans.Lock()
s.totalSpans.v[subID] = rt
s.totalSpans.Unlock()

s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: rt}
log.Info("event feed subscribes table success",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -306,13 +305,17 @@ func (s *SharedClient) Unsubscribe(subID SubscriptionID) {
s.totalSpans.Unlock()
if rt != nil {
s.setTableStopped(rt)
log.Info("event feed unsubscribes table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.String("span", rt.span.String()))
return
}

log.Info("event feed unsubscribes table",
log.Warn("event feed unsubscribes table, but not found",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Bool("exists", rt != nil))
zap.Any("subscriptionID", subID))
}

// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is
Expand Down Expand Up @@ -382,7 +385,8 @@ func (s *SharedClient) setTableStopped(rt *subscribedTable) {
log.Info("event feed starts to stop table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

// Set stopped to true so we can stop handling region events from the table.
// Then send a special singleRegionInfo to regionRouter to deregister the table
Expand All @@ -399,7 +403,8 @@ func (s *SharedClient) onTableDrained(rt *subscribedTable) {
log.Info("event feed stop table is finished",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

s.totalSpans.Lock()
defer s.totalSpans.Unlock()
Expand All @@ -418,7 +423,7 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er
case <-ctx.Done():
return errors.Trace(ctx.Err())
case region := <-s.regionCh.Out():
if region.isStoped() {
if region.isStopped() {
for _, rs := range s.stores {
s.broadcastRequest(rs, region)
}
Expand All @@ -441,6 +446,7 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", region.subscribedTable.subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.String("span", region.span.String()),
zap.Uint64("storeID", store.storeID),
zap.String("addr", store.storeAddr))
}
Expand Down Expand Up @@ -554,7 +560,7 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan),
zap.String("span", nextSpan.String()),
zap.Error(err))
backoffBeforeLoad = true
continue
Expand All @@ -572,7 +578,7 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan))
zap.String("span", nextSpan.String()))
backoffBeforeLoad = true
continue
}
Expand All @@ -590,7 +596,8 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
log.Panic("event feed check spans intersect shouldn't fail",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID))
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.String("span", nextSpan.String()))
}

verID := tikv.NewRegionVerID(regionMeta.Id, regionMeta.RegionEpoch.ConfVer, regionMeta.RegionEpoch.Version)
Expand Down Expand Up @@ -676,6 +683,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -720,6 +729,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),
zap.Stringer("error", innerErr))
metricFeedUnknownErrorCounter.Inc()
s.scheduleRegionRequest(ctx, errInfo.regionInfo)
Expand All @@ -740,6 +751,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),
zap.Error(err))
return err
}
Expand Down Expand Up @@ -816,7 +829,7 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
slowInitializeRegion := 0
var slowInitializeRegionCount int
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.IterAll(nil)
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs)
Expand All @@ -826,32 +839,36 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
slowInitializeRegion += 1
slowInitializeRegionCount += 1
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("slowRegion", attr.SlowestRegion))
} else if currTime.Sub(ckptTime) > 10*time.Minute {
log.Info("event feed finds a uninitialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.UnLockedRanges) > 0 {
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("holes", attr.UnLockedRanges))
}
}
s.totalSpans.RUnlock()
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegion))
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegionCount))
}
}

Expand Down
10 changes: 3 additions & 7 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.region.verID.GetID()),
zap.Int64("tableID", state.region.span.TableID),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
Expand Down Expand Up @@ -228,12 +229,6 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even
}
}
tableID := state.region.subscribedTable.span.TableID
log.Debug("region worker get an Event",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Any("subscriptionID", state.region.subscribedTable.subscriptionID),
zap.Int64("tableID", tableID),
zap.Int("rows", len(x.Entries.GetEntries())))
return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID, w.client.logRegionDetails)
}

Expand All @@ -250,7 +245,7 @@ func handleEventEntry(
regionID, regionSpan, _ := state.getRegionMeta()
for _, entry := range x.Entries.GetEntries() {
// NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side.
// We can remove the check in future.
// We can remove the check in the future.
comparableKey := spanz.ToComparableKey(entry.GetKey())
if entry.Type != cdcpb.Event_INITIALIZED &&
!spanz.KeyInSpan(comparableKey, regionSpan) {
Expand All @@ -266,6 +261,7 @@ func handleEventEntry(
zap.String("changefeed", changefeed.ID),
zap.Int64("tableID", tableID),
zap.Uint64("regionID", regionID),
zap.Int64("tableID", state.region.span.TableID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.region.span))

Expand Down
73 changes: 32 additions & 41 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
case <-ctx.Done():
return ctx.Err()
case region := <-stream.requests.Out():
if !region.isStoped() {
if !region.isStopped() {
stream.preFetchForConnecting = new(regionInfo)
*stream.preFetchForConnecting = region
return nil
Expand Down Expand Up @@ -104,7 +104,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
// Why we need to re-schedule pending regions? This because the store can
// fail forever, and all regions are scheduled to other stores.
for _, region := range stream.clearPendingRegions() {
if region.isStoped() {
if region.isStopped() {
// It means it's a special task for stopping the table.
continue
}
Expand Down Expand Up @@ -254,30 +254,6 @@ func (s *requestedStream) receive(
}

func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error {
if err := cc.Client().Send(req); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return errors.Trace(err)
}
log.Debug("event feed send request to grpc stream success",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
return nil
}

fetchMoreReq := func() (regionInfo, error) {
waitReqTicker := time.NewTicker(60 * time.Second)
defer waitReqTicker.Stop()
Expand Down Expand Up @@ -329,23 +305,24 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
s.preFetchForConnecting = nil
for {
subscriptionID := region.subscribedTable.subscriptionID
log.Debug("event feed gets a singleRegionInfo",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
// It means it's a special task for stopping the table.
if region.isStoped() {
if region.isStopped() {
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
}
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
return err
if err = s.multiplexing.Client().Send(req); err != nil {
log.Warn("event feed send deregister request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
}
} else if cc := tableExclusives[subscriptionID]; cc != nil {
delete(tableExclusives, subscriptionID)
Expand Down Expand Up @@ -385,8 +362,17 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
} else if cc, err = getTableExclusiveConn(subscriptionID); err != nil {
return err
}
if err = doSend(cc, c.createRegionRequest(region), subscriptionID); err != nil {
return err
if err = cc.Client().Send(c.createRegionRequest(region)); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
}
}

Expand Down Expand Up @@ -483,14 +469,19 @@ func (s *requestedStream) sendRegionChangeEvents(
state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
s.logRegionDetails("event feed receives a region error",
fields := []zap.Field{
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
zap.Any("error", x.Error),
}
if state != nil {
fields = append(fields, zap.Int64("tableID", state.region.span.TableID))
}
s.logRegionDetails("event feed receives a region error", fields...)
}

if state != nil {
Expand Down
Loading

0 comments on commit c0fcb0c

Please sign in to comment.