diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index 03e09aa68e2..56aa16a39b4 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -60,19 +60,23 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) { // Resolve will get events where CommitTs is less than resolveTs. func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent { - switch protocol { - case config.ProtocolCanalJSON: - sort.Slice(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) - default: - if !sort.SliceIsSorted(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) { - log.Warn("events are not sorted", zap.Int32("partition", g.partition), - zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) - } - } + //switch protocol { + //case config.ProtocolCanalJSON: + // sort.Slice(g.events, func(i, j int) bool { + // return g.events[i].CommitTs < g.events[j].CommitTs + // }) + //default: + // if !sort.SliceIsSorted(g.events, func(i, j int) bool { + // return g.events[i].CommitTs < g.events[j].CommitTs + // }) { + // log.Warn("events are not sorted", zap.Int32("partition", g.partition), + // zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) + // } + //} + + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) i := sort.Search(len(g.events), func(i int) bool { return g.events[i].CommitTs > resolve diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index d2407c2cc30..fcc2c803aab 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -350,7 +350,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool group = NewEventsGroup(partition, tableID) eventGroup[tableID] = group } - w.appendRow2Group(row, group, progress, offset) + group.Append(row, offset) + //w.appendRow2Group(row, group, progress, offset) } } @@ -480,50 +481,82 @@ func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int3 } func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) { + watermark := atomic.LoadUint64(&progress.watermark) // if the kafka cluster is normal, this should not hit. // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. - watermark := progress.loadWatermark() if row.CommitTs < watermark { - log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // if commit message failed, the consumer may read previous message, + // just ignore this message should be fine, otherwise panic. + if offset > progress.watermarkOffset { + log.Panic("RowChangedEvent fallback row", + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) + } + log.Warn("Row changed event fall back, ignore it, since consumer read old offset message", zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) return } - if row.CommitTs >= group.highWatermark { - group.Append(row, offset) - return - } - switch w.option.protocol { - case config.ProtocolSimple, config.ProtocolOpen: - // simple protocol set the table id for all row message, it can be known which table the row message belongs to, - // also consider the table partition. - // open protocol set the partition table id if the table is partitioned. - // for normal table, the table id is generated by the fake table id generator by using schema and table name. - // so one event group for one normal table or one table partition, replayed messages can be ignored. - log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), - zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), - zap.Uint64("highWatermark", group.highWatermark), - zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) - return - default: - // canal-json does not set table id for all messages. - // in the partition table case, all partition tables have the same table id, use the same progress, - // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. - } - log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), - zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), - zap.Uint64("highWatermark", group.highWatermark), - zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String())) group.Append(row, offset) + //group, ok := eventGroup[tableID] + //if !ok { + // group = NewEventsGroup() + // eventGroup[tableID] = group + //} + //group.Append(row) + + // if the kafka cluster is normal, this should not hit. + // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. + //watermark := progress.loadWatermark() + //if row.CommitTs < watermark { + // log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + // return + //} + //if row.CommitTs >= group.highWatermark { + // group.Append(row, offset) + // return + //} + //switch w.option.protocol { + //case config.ProtocolSimple, config.ProtocolOpen: + // // simple protocol set the table id for all row message, it can be known which table the row message belongs to, + // // also consider the table partition. + // // open protocol set the partition table id if the table is partitioned. + // // for normal table, the table id is generated by the fake table id generator by using schema and table name. + // // so one event group for one normal table or one table partition, replayed messages can be ignored. + // log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("highWatermark", group.highWatermark), + // zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + // return + //default: + // // canal-json does not set table id for all messages. + // // in the partition table case, all partition tables have the same table id, use the same progress, + // // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. + //} + //log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("highWatermark", group.highWatermark), + // zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String())) + //group.Append(row, offset) } type fakeTableIDGenerator struct {