Skip to content

Commit

Permalink
fix simple protocol loss data
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 23, 2024
1 parent 7f57e1f commit 4fa4840
Showing 1 changed file with 68 additions and 37 deletions.
105 changes: 68 additions & 37 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
group = NewEventsGroup(partition, tableID)
eventGroup[tableID] = group
}
w.appendRow2Group(row, group, progress, offset)
group.Append(row, offset)
//w.appendRow2Group(row, group, progress, offset)
}
}

Expand Down Expand Up @@ -480,50 +481,80 @@ func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int3
}

func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) {
watermark := atomic.LoadUint64(&progress.watermark)
// if the kafka cluster is normal, this should not hit.
// else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback.
watermark := progress.loadWatermark()
if row.CommitTs < watermark {
log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// if commit message failed, the consumer may read previous message,
// just ignore this message should be fine, otherwise panic.
if offset > progress.watermarkOffset {
log.Panic("RowChangedEvent fallback row",
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()))
}
log.Warn("Row changed event fall back, ignore it, since consumer read old offset message",
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()))
return
}
if row.CommitTs >= group.highWatermark {
group.Append(row, offset)
return
}
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolOpen:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
// for normal table, the table id is generated by the fake table id generator by using schema and table name.
// so one event group for one normal table or one table partition, replayed messages can be ignored.
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
return
default:
// canal-json does not set table id for all messages.
// in the partition table case, all partition tables have the same table id, use the same progress,
// so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row.
}
log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it",
zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("highWatermark", group.highWatermark),
zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
zap.String("protocol", w.option.protocol.String()))
group.Append(row, offset)
//group, ok := eventGroup[tableID]
//if !ok {
// group = NewEventsGroup()
// eventGroup[tableID] = group
//}
//group.Append(row)

// if the kafka cluster is normal, this should not hit.
// else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback.
//watermark := progress.loadWatermark()
//if row.CommitTs < watermark {
// log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
// return
//}
//if row.CommitTs >= group.highWatermark {
// group.Append(row, offset)
// return
//}
//switch w.option.protocol {
//case config.ProtocolSimple, config.ProtocolOpen:
// // simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// // also consider the table partition.
// // open protocol set the partition table id if the table is partitioned.
// // for normal table, the table id is generated by the fake table id generator by using schema and table name.
// // so one event group for one normal table or one table partition, replayed messages can be ignored.
// log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("highWatermark", group.highWatermark),
// zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition))
// return
//default:
// // canal-json does not set table id for all messages.
// // in the partition table case, all partition tables have the same table id, use the same progress,
// // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row.
//}
//log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it",
// zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition),
// zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
// zap.Uint64("highWatermark", group.highWatermark),
// zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
// zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()),
// zap.String("protocol", w.option.protocol.String()))
//group.Append(row, offset)
}

type fakeTableIDGenerator struct {
Expand Down

0 comments on commit 4fa4840

Please sign in to comment.