From 4fa48405e9086840b4b1d953db9c37db65d2f19b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 23 Dec 2024 15:51:52 +0800 Subject: [PATCH] fix simple protocol loss data --- cmd/kafka-consumer/writer.go | 105 +++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 37 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index d2407c2cc30..d61bf46415a 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -350,7 +350,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool group = NewEventsGroup(partition, tableID) eventGroup[tableID] = group } - w.appendRow2Group(row, group, progress, offset) + group.Append(row, offset) + //w.appendRow2Group(row, group, progress, offset) } } @@ -480,50 +481,80 @@ func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int3 } func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) { + watermark := atomic.LoadUint64(&progress.watermark) // if the kafka cluster is normal, this should not hit. // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. - watermark := progress.loadWatermark() if row.CommitTs < watermark { - log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // if commit message failed, the consumer may read previous message, + // just ignore this message should be fine, otherwise panic. + if offset > progress.watermarkOffset { + log.Panic("RowChangedEvent fallback row", + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName())) + } + log.Warn("Row changed event fall back, ignore it, since consumer read old offset message", zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName())) return } - if row.CommitTs >= group.highWatermark { - group.Append(row, offset) - return - } - switch w.option.protocol { - case config.ProtocolSimple, config.ProtocolOpen: - // simple protocol set the table id for all row message, it can be known which table the row message belongs to, - // also consider the table partition. - // open protocol set the partition table id if the table is partitioned. - // for normal table, the table id is generated by the fake table id generator by using schema and table name. - // so one event group for one normal table or one table partition, replayed messages can be ignored. - log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), - zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), - zap.Uint64("highWatermark", group.highWatermark), - zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) - return - default: - // canal-json does not set table id for all messages. - // in the partition table case, all partition tables have the same table id, use the same progress, - // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. - } - log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", - zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), - zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), - zap.Uint64("highWatermark", group.highWatermark), - zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), - zap.String("protocol", w.option.protocol.String())) group.Append(row, offset) + //group, ok := eventGroup[tableID] + //if !ok { + // group = NewEventsGroup() + // eventGroup[tableID] = group + //} + //group.Append(row) + + // if the kafka cluster is normal, this should not hit. + // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. + //watermark := progress.loadWatermark() + //if row.CommitTs < watermark { + // log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + // return + //} + //if row.CommitTs >= group.highWatermark { + // group.Append(row, offset) + // return + //} + //switch w.option.protocol { + //case config.ProtocolSimple, config.ProtocolOpen: + // // simple protocol set the table id for all row message, it can be known which table the row message belongs to, + // // also consider the table partition. + // // open protocol set the partition table id if the table is partitioned. + // // for normal table, the table id is generated by the fake table id generator by using schema and table name. + // // so one event group for one normal table or one table partition, replayed messages can be ignored. + // log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("highWatermark", group.highWatermark), + // zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + // return + //default: + // // canal-json does not set table id for all messages. + // // in the partition table case, all partition tables have the same table id, use the same progress, + // // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. + //} + //log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", + // zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + // zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + // zap.Uint64("highWatermark", group.highWatermark), + // zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + // zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + // zap.String("protocol", w.option.protocol.String())) + //group.Append(row, offset) } type fakeTableIDGenerator struct {