diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index 03e09aa68e2..cd87b73b4c8 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -19,7 +19,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) @@ -37,7 +36,7 @@ func NewEventsGroup(partition int32, tableID int64) *eventsGroup { return &eventsGroup{ partition: partition, tableID: tableID, - events: make([]*model.RowChangedEvent, 0), + events: make([]*model.RowChangedEvent, 0, 1024), } } @@ -52,26 +51,19 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) { zap.Any("offset", offset), zap.Uint64("commitTs", row.CommitTs), zap.Uint64("highWatermark", g.highWatermark), - zap.Int64("physicalTableID", row.GetTableID()), + zap.Int64("tableID", row.GetTableID()), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) } // Resolve will get events where CommitTs is less than resolveTs. -func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent { - switch protocol { - case config.ProtocolCanalJSON: - sort.Slice(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) - default: - if !sort.SliceIsSorted(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) { - log.Warn("events are not sorted", zap.Int32("partition", g.partition), - zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) - } +func (g *eventsGroup) Resolve(resolve uint64) []*model.RowChangedEvent { + if !sort.SliceIsSorted(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) { + log.Warn("events are not sorted", zap.Int32("partition", g.partition), + zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) } i := sort.Search(len(g.events), func(i int) bool { @@ -80,7 +72,6 @@ func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model result := g.events[:i] g.events = g.events[i:] - if len(result) != 0 && len(g.events) != 0 { log.Warn("not all events resolved", zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID), diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index d2407c2cc30..766d22b3702 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "sync" - "sync/atomic" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -75,29 +74,44 @@ type partitionProgress struct { partition int32 watermark uint64 watermarkOffset kafka.Offset - // tableSinkMap -> [tableID]tableSink - tableSinkMap sync.Map - eventGroups map[int64]*eventsGroup - decoder codec.RowEventDecoder + tableSinkMap map[model.TableID]tablesink.TableSink + eventGroups map[model.TableID]*eventsGroup + decoder codec.RowEventDecoder } func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress { return &partitionProgress{ - partition: partition, - eventGroups: make(map[int64]*eventsGroup), - decoder: decoder, + partition: partition, + eventGroups: make(map[model.TableID]*eventsGroup), + tableSinkMap: make(map[model.TableID]tablesink.TableSink), + decoder: decoder, } } -func (p *partitionProgress) updateWatermark(watermark uint64, offset kafka.Offset) { - atomic.StoreUint64(&p.watermark, watermark) - p.watermarkOffset = offset - log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), zap.Uint64("watermark", watermark)) +func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Offset) { + watermark := p.loadWatermark() + if newWatermark >= watermark { + p.watermark = newWatermark + p.watermarkOffset = offset + log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), + zap.Uint64("watermark", newWatermark)) + return + } + if offset > p.watermarkOffset { + log.Panic("partition resolved ts fallback", + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) + } + log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) } func (p *partitionProgress) loadWatermark() uint64 { - return atomic.LoadUint64(&p.watermark) + return p.watermark } type writer struct { @@ -293,9 +307,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool ) progress := w.progresses[partition] - decoder := progress.decoder - eventGroup := progress.eventGroups - if err := decoder.AddKeyValue(key, value); err != nil { + if err := progress.decoder.AddKeyValue(key, value); err != nil { log.Panic("add key value to the decoder failed", zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) } @@ -305,7 +317,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool messageType model.MessageType ) for { - ty, hasNext, err := decoder.HasNext() + ty, hasNext, err := progress.decoder.HasNext() if err != nil { log.Panic("decode message key failed", zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) @@ -330,27 +342,22 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool // then cause the consumer panic, but it was a duplicate one. // so we only handle DDL received from partition-0 should be enough. // but all DDL event messages should be consumed. - ddl, err := decoder.NextDDLEvent() + ddl, err := progress.decoder.NextDDLEvent() if err != nil { log.Panic("decode message value failed", zap.Int32("partition", partition), zap.Any("offset", offset), zap.ByteString("value", value), zap.Error(err)) } - if simple, ok := decoder.(*simple.Decoder); ok { - cachedEvents := simple.GetCachedEvents() - if len(cachedEvents) != 0 { - log.Info("simple protocol resolved cached events", zap.Int("resolvedCount", len(cachedEvents))) - } + if dec, ok := progress.decoder.(*simple.Decoder); ok { + cachedEvents := dec.GetCachedEvents() for _, row := range cachedEvents { w.checkPartition(row, partition, message.TopicPartition.Offset) tableID := row.GetTableID() - group, ok := eventGroup[tableID] - if !ok { - group = NewEventsGroup(partition, tableID) - eventGroup[tableID] = group - } - w.appendRow2Group(row, group, progress, offset) + log.Info("simple protocol cached event resolved, append to the group", + zap.Int64("tableID", tableID), zap.Uint64("commitTs", row.CommitTs), + zap.Int32("partition", partition), zap.Any("offset", offset)) + w.appendRow2Group(row, progress, offset) } } @@ -364,7 +371,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } needFlush = true case model.MessageTypeRow: - row, err := decoder.NextRowChangedEvent() + row, err := progress.decoder.NextRowChangedEvent() if err != nil { log.Panic("decode message value failed", zap.Int32("partition", partition), zap.Any("offset", offset), @@ -379,31 +386,24 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool w.checkPartition(row, partition, message.TopicPartition.Offset) tableID := row.GetTableID() - if w.option.protocol != config.ProtocolSimple { + switch w.option.protocol { + case config.ProtocolSimple, config.ProtocolCanalJSON: + default: tableID = w.fakeTableIDGenerator. generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID) row.PhysicalTableID = tableID } - group := eventGroup[tableID] - if group == nil { - group = NewEventsGroup(partition, tableID) - eventGroup[tableID] = group - } - w.appendRow2Group(row, group, progress, offset) + w.appendRow2Group(row, progress, offset) case model.MessageTypeResolved: - newWatermark, err := decoder.NextResolvedEvent() + newWatermark, err := progress.decoder.NextResolvedEvent() if err != nil { log.Panic("decode message value failed", zap.Int32("partition", partition), zap.Any("offset", offset), zap.ByteString("value", value), zap.Error(err)) } - if w.checkOldMessageForWatermark(newWatermark, partition, offset) { - continue - } - - w.resolveRowChangedEvents(eventGroup, newWatermark, progress) progress.updateWatermark(newWatermark, offset) + w.resolveRowChangedEvents(progress, newWatermark) needFlush = true default: log.Panic("unknown message type", zap.Any("messageType", messageType), @@ -424,22 +424,22 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool return w.Write(ctx, messageType) } -func (w *writer) resolveRowChangedEvents(eventGroup map[int64]*eventsGroup, newWatermark uint64, progress *partitionProgress) { - for tableID, group := range eventGroup { - events := group.Resolve(newWatermark, w.option.protocol) +func (w *writer) resolveRowChangedEvents(progress *partitionProgress, newWatermark uint64) { + for tableID, group := range progress.eventGroups { + events := group.Resolve(newWatermark) if len(events) == 0 { continue } - tableSink, ok := progress.tableSinkMap.Load(tableID) + tableSink, ok := progress.tableSinkMap[tableID] if !ok { tableSink = w.sinkFactory.CreateTableSinkForConsumer( model.DefaultChangeFeedID("kafka-consumer"), spanz.TableIDToComparableSpan(tableID), events[0].CommitTs, ) - progress.tableSinkMap.Store(tableID, tableSink) + progress.tableSinkMap[tableID] = tableSink } - tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...) + tableSink.AppendRowChangedEvents(events...) } } @@ -460,35 +460,25 @@ func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, off } } -func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int32, offset kafka.Offset) bool { - progress := w.progresses[partition] - watermark := progress.loadWatermark() - if newWatermark >= watermark { - return false - } - if offset > progress.watermarkOffset { - log.Panic("partition resolved ts fallback", - zap.Int32("partition", partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset)) - } - log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", - zap.Int32("partition", partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset)) - return true -} - -func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) { +func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partitionProgress, offset kafka.Offset) { // if the kafka cluster is normal, this should not hit. // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. watermark := progress.loadWatermark() + partition := progress.partition + + tableID := row.GetTableID() + group := progress.eventGroups[tableID] + if group == nil { + group = NewEventsGroup(partition, tableID) + progress.eventGroups[tableID] = group + } if row.CommitTs < watermark { log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) return } @@ -497,31 +487,30 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, return } switch w.option.protocol { - case config.ProtocolSimple, config.ProtocolOpen: + case config.ProtocolSimple, config.ProtocolOpen, config.ProtocolCanalJSON: // simple protocol set the table id for all row message, it can be known which table the row message belongs to, // also consider the table partition. // open protocol set the partition table id if the table is partitioned. // for normal table, the table id is generated by the fake table id generator by using schema and table name. // so one event group for one normal table or one table partition, replayed messages can be ignored. log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("highWatermark", group.highWatermark), zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) return default: - // canal-json does not set table id for all messages. - // in the partition table case, all partition tables have the same table id, use the same progress, - // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. } log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Int64("tableID", tableID), zap.Int32("partition", partition), zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("highWatermark", group.highWatermark), zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), zap.String("protocol", w.option.protocol.String())) group.Append(row, offset) } @@ -551,16 +540,14 @@ func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, default: } flushedResolvedTs := true - progress.tableSinkMap.Range(func(key, value interface{}) bool { - tableSink := value.(tablesink.TableSink) + for _, tableSink := range progress.tableSinkMap { if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil { log.Panic("Failed to update resolved ts", zap.Error(err)) } if tableSink.GetCheckpointTs().Less(resolvedTs) { flushedResolvedTs = false } - return true - }) + } if flushedResolvedTs { return }