Skip to content

Commit

Permalink
also print ignored column
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 23, 2024
1 parent 4fa4840 commit d6b5c81
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,16 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup,
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()))
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}
log.Warn("Row changed event fall back, ignore it, since consumer read old offset message",
zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset),
zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.Int32("partition", progress.partition), zap.Int64("tableID", group.tableID),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()))
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
return
}
group.Append(row, offset)
Expand Down

0 comments on commit d6b5c81

Please sign in to comment.