Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] consumer(ticdc): do not sort events #11934

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
25 changes: 8 additions & 17 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap"
)

Expand All @@ -37,7 +36,7 @@ func NewEventsGroup(partition int32, tableID int64) *eventsGroup {
return &eventsGroup{
partition: partition,
tableID: tableID,
events: make([]*model.RowChangedEvent, 0),
events: make([]*model.RowChangedEvent, 0, 1024),
}
}

Expand All @@ -52,26 +51,19 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {
zap.Any("offset", offset),
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("highWatermark", g.highWatermark),
zap.Int64("physicalTableID", row.GetTableID()),
zap.Int64("tableID", row.GetTableID()),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent {
switch protocol {
case config.ProtocolCanalJSON:
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})
default:
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}
func (g *eventsGroup) Resolve(resolve uint64) []*model.RowChangedEvent {
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}

i := sort.Search(len(g.events), func(i int) bool {
Expand All @@ -80,7 +72,6 @@ func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model

result := g.events[:i]
g.events = g.events[i:]

if len(result) != 0 && len(g.events) != 0 {
log.Warn("not all events resolved",
zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID),
Expand Down
147 changes: 67 additions & 80 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand Down Expand Up @@ -75,29 +74,44 @@ type partitionProgress struct {
partition int32
watermark uint64
watermarkOffset kafka.Offset
// tableSinkMap -> [tableID]tableSink
tableSinkMap sync.Map

eventGroups map[int64]*eventsGroup
decoder codec.RowEventDecoder
tableSinkMap map[model.TableID]tablesink.TableSink
eventGroups map[model.TableID]*eventsGroup
decoder codec.RowEventDecoder
}

func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress {
return &partitionProgress{
partition: partition,
eventGroups: make(map[int64]*eventsGroup),
decoder: decoder,
partition: partition,
eventGroups: make(map[model.TableID]*eventsGroup),
tableSinkMap: make(map[model.TableID]tablesink.TableSink),
decoder: decoder,
}
}

func (p *partitionProgress) updateWatermark(watermark uint64, offset kafka.Offset) {
atomic.StoreUint64(&p.watermark, watermark)
p.watermarkOffset = offset
log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), zap.Uint64("watermark", watermark))
func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Offset) {
watermark := p.loadWatermark()
if newWatermark >= watermark {
p.watermark = newWatermark
p.watermarkOffset = offset
log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset),
zap.Uint64("watermark", newWatermark))
return
}
if offset > p.watermarkOffset {
log.Panic("partition resolved ts fallback",
zap.Int32("partition", p.partition),
zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset))
}
log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message",
zap.Int32("partition", p.partition),
zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset))
}

func (p *partitionProgress) loadWatermark() uint64 {
return atomic.LoadUint64(&p.watermark)
return p.watermark
}

type writer struct {
Expand Down Expand Up @@ -293,9 +307,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
)

progress := w.progresses[partition]
decoder := progress.decoder
eventGroup := progress.eventGroups
if err := decoder.AddKeyValue(key, value); err != nil {
if err := progress.decoder.AddKeyValue(key, value); err != nil {
log.Panic("add key value to the decoder failed",
zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err))
}
Expand All @@ -305,7 +317,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
messageType model.MessageType
)
for {
ty, hasNext, err := decoder.HasNext()
ty, hasNext, err := progress.decoder.HasNext()
if err != nil {
log.Panic("decode message key failed",
zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err))
Expand All @@ -330,27 +342,22 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
// then cause the consumer panic, but it was a duplicate one.
// so we only handle DDL received from partition-0 should be enough.
// but all DDL event messages should be consumed.
ddl, err := decoder.NextDDLEvent()
ddl, err := progress.decoder.NextDDLEvent()
if err != nil {
log.Panic("decode message value failed",
zap.Int32("partition", partition), zap.Any("offset", offset),
zap.ByteString("value", value), zap.Error(err))
}

if simple, ok := decoder.(*simple.Decoder); ok {
cachedEvents := simple.GetCachedEvents()
if len(cachedEvents) != 0 {
log.Info("simple protocol resolved cached events", zap.Int("resolvedCount", len(cachedEvents)))
}
if dec, ok := progress.decoder.(*simple.Decoder); ok {
cachedEvents := dec.GetCachedEvents()
for _, row := range cachedEvents {
w.checkPartition(row, partition, message.TopicPartition.Offset)
tableID := row.GetTableID()
group, ok := eventGroup[tableID]
if !ok {
group = NewEventsGroup(partition, tableID)
eventGroup[tableID] = group
}
w.appendRow2Group(row, group, progress, offset)
log.Info("simple protocol cached event resolved, append to the group",
zap.Int64("tableID", tableID), zap.Uint64("commitTs", row.CommitTs),
zap.Int32("partition", partition), zap.Any("offset", offset))
w.appendRow2Group(row, progress, offset)
}
}

Expand All @@ -364,7 +371,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
}
needFlush = true
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
row, err := progress.decoder.NextRowChangedEvent()
if err != nil {
log.Panic("decode message value failed",
zap.Int32("partition", partition), zap.Any("offset", offset),
Expand All @@ -379,31 +386,24 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
w.checkPartition(row, partition, message.TopicPartition.Offset)

tableID := row.GetTableID()
if w.option.protocol != config.ProtocolSimple {
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolCanalJSON:
default:
tableID = w.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID)
row.PhysicalTableID = tableID
}
group := eventGroup[tableID]
if group == nil {
group = NewEventsGroup(partition, tableID)
eventGroup[tableID] = group
}
w.appendRow2Group(row, group, progress, offset)
w.appendRow2Group(row, progress, offset)
case model.MessageTypeResolved:
newWatermark, err := decoder.NextResolvedEvent()
newWatermark, err := progress.decoder.NextResolvedEvent()
if err != nil {
log.Panic("decode message value failed",
zap.Int32("partition", partition), zap.Any("offset", offset),
zap.ByteString("value", value), zap.Error(err))
}

if w.checkOldMessageForWatermark(newWatermark, partition, offset) {
continue
}

w.resolveRowChangedEvents(eventGroup, newWatermark, progress)
progress.updateWatermark(newWatermark, offset)
w.resolveRowChangedEvents(progress, newWatermark)
needFlush = true
default:
log.Panic("unknown message type", zap.Any("messageType", messageType),
Expand All @@ -424,22 +424,22 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
return w.Write(ctx, messageType)
}

func (w *writer) resolveRowChangedEvents(eventGroup map[int64]*eventsGroup, newWatermark uint64, progress *partitionProgress) {
for tableID, group := range eventGroup {
events := group.Resolve(newWatermark, w.option.protocol)
func (w *writer) resolveRowChangedEvents(progress *partitionProgress, newWatermark uint64) {
for tableID, group := range progress.eventGroups {
events := group.Resolve(newWatermark)
if len(events) == 0 {
continue
}
tableSink, ok := progress.tableSinkMap.Load(tableID)
tableSink, ok := progress.tableSinkMap[tableID]
if !ok {
tableSink = w.sinkFactory.CreateTableSinkForConsumer(
model.DefaultChangeFeedID("kafka-consumer"),
spanz.TableIDToComparableSpan(tableID),
events[0].CommitTs,
)
progress.tableSinkMap.Store(tableID, tableSink)
progress.tableSinkMap[tableID] = tableSink
}
tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...)
tableSink.AppendRowChangedEvents(events...)
}
}

Expand All @@ -460,35 +460,25 @@ func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, off
}
}

func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int32, offset kafka.Offset) bool {
progress := w.progresses[partition]
watermark := progress.loadWatermark()
if newWatermark >= watermark {
return false
}
if offset > progress.watermarkOffset {
log.Panic("partition resolved ts fallback",
zap.Int32("partition", partition),
zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset))
}
log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message",
zap.Int32("partition", partition),
zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset))
return true
}

func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) {
func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partitionProgress, offset kafka.Offset) {
// if the kafka cluster is normal, this should not hit.
// else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback.
watermark := progress.loadWatermark()
partition := progress.partition

tableID := row.GetTableID()
group := progress.eventGroups[tableID]
if group == nil {
group = NewEventsGroup(partition, tableID)
progress.eventGroups[tableID] = group
}
if row.CommitTs < watermark {
log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Int64("tableID", tableID), zap.Int32("partition", partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
return
}
Expand All @@ -497,31 +487,30 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup,
return
}
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolOpen:
case config.ProtocolSimple, config.ProtocolOpen, config.ProtocolCanalJSON:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
// for normal table, the table id is generated by the fake table id generator by using schema and table name.
// so one event group for one normal table or one table partition, replayed messages can be ignored.
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Int64("tableID", tableID), zap.Int32("partition", partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
return
default:
// canal-json does not set table id for all messages.
// in the partition table case, all partition tables have the same table id, use the same progress,
// so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row.
}
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Int64("tableID", tableID), zap.Int32("partition", partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.String("protocol", w.option.protocol.String()))
group.Append(row, offset)
}
Expand Down Expand Up @@ -551,16 +540,14 @@ func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress,
default:
}
flushedResolvedTs := true
progress.tableSinkMap.Range(func(key, value interface{}) bool {
tableSink := value.(tablesink.TableSink)
for _, tableSink := range progress.tableSinkMap {
if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil {
log.Panic("Failed to update resolved ts", zap.Error(err))
}
if tableSink.GetCheckpointTs().Less(resolvedTs) {
flushedResolvedTs = false
}
return true
})
}
if flushedResolvedTs {
return
}
Expand Down
Loading