diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index 7b70e24262e..af4e75ae624 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -422,6 +422,10 @@ func (g *eventsGroup) Append(e *model.RowChangedEvent) { } func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) + i := sort.Search(len(g.events), func(i int) bool { return g.events[i].CommitTs > resolveTs }) @@ -471,7 +475,6 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { zap.ByteString("value", msg.Payload()), zap.Error(err)) } - log.Info("DDL event received", zap.Any("DDL", ddl)) c.appendDDL(ddl) case model.MessageTypeRow: row, err := decoder.NextRowChangedEvent() @@ -499,6 +502,11 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { c.eventGroups[tableID] = group } group.Append(row) + log.Info("DML event received", + zap.Int64("tableID", row.GetTableID()), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) case model.MessageTypeResolved: ts, err := decoder.NextResolvedEvent() if err != nil {