Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer(ticdc): do not ignore event if event commit-ts greater than the high watermark #11925

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,23 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent {
switch protocol {
case config.ProtocolCanalJSON:
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})
default:
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}
}
//switch protocol {
//case config.ProtocolCanalJSON:
// sort.Slice(g.events, func(i, j int) bool {
// return g.events[i].CommitTs < g.events[j].CommitTs
// })
//default:
// if !sort.SliceIsSorted(g.events, func(i, j int) bool {
// return g.events[i].CommitTs < g.events[j].CommitTs
// }) {
// log.Warn("events are not sorted", zap.Int32("partition", g.partition),
// zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
// }
//}

sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolve
Expand Down
107 changes: 70 additions & 37 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
group = NewEventsGroup(partition, tableID)
eventGroup[tableID] = group
}
w.appendRow2Group(row, group, progress, offset)
group.Append(row, offset)
//w.appendRow2Group(row, group, progress, offset)
}
}

Expand Down Expand Up @@ -480,50 +481,82 @@ func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int3
}

func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) {
watermark := atomic.LoadUint64(&progress.watermark)
// if the kafka cluster is normal, this should not hit.
// else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback.
watermark := progress.loadWatermark()
if row.CommitTs < watermark {
log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// if commit message failed, the consumer may read previous message,
// just ignore this message should be fine, otherwise panic.
if offset > progress.watermarkOffset {
log.Panic("RowChangedEvent fallback row",
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}
log.Warn("Row changed event fall back, ignore it, since consumer read old offset message",
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
return
}
if row.CommitTs >= group.highWatermark {
group.Append(row, offset)
return
}
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolOpen:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
// for normal table, the table id is generated by the fake table id generator by using schema and table name.
// so one event group for one normal table or one table partition, replayed messages can be ignored.
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
return
default:
// canal-json does not set table id for all messages.
// in the partition table case, all partition tables have the same table id, use the same progress,
// so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row.
}
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()))
group.Append(row, offset)
//group, ok := eventGroup[tableID]
//if !ok {
// group = NewEventsGroup()
// eventGroup[tableID] = group
//}
//group.Append(row)

// if the kafka cluster is normal, this should not hit.
// else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback.
//watermark := progress.loadWatermark()
//if row.CommitTs < watermark {
// log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
// return
//}
//if row.CommitTs >= group.highWatermark {
// group.Append(row, offset)
// return
//}
//switch w.option.protocol {
//case config.ProtocolSimple, config.ProtocolOpen:
// // simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// // also consider the table partition.
// // open protocol set the partition table id if the table is partitioned.
// // for normal table, the table id is generated by the fake table id generator by using schema and table name.
// // so one event group for one normal table or one table partition, replayed messages can be ignored.
// log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("highWatermark", group.highWatermark),
// zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
// return
//default:
// // canal-json does not set table id for all messages.
// // in the partition table case, all partition tables have the same table id, use the same progress,
// // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row.
//}
//log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("highWatermark", group.highWatermark),
// zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()))
//group.Append(row, offset)
}

type fakeTableIDGenerator struct {
Expand Down
Loading