Skip to content

Commit

Permalink
try to fix pulsar consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 6, 2025
1 parent de5956e commit 1126cd7
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ func (g *eventsGroup) Append(e *model.RowChangedEvent) {
}

func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
})
Expand Down Expand Up @@ -471,7 +475,6 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error {
zap.ByteString("value", msg.Payload()),
zap.Error(err))
}
log.Info("DDL event received", zap.Any("DDL", ddl))
c.appendDDL(ddl)
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
Expand Down Expand Up @@ -499,6 +502,11 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error {
c.eventGroups[tableID] = group
}
group.Append(row)
log.Info("DML event received",
zap.Int64("tableID", row.GetTableID()),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
case model.MessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
Expand Down

0 comments on commit 1126cd7

Please sign in to comment.