Skip to content

Commit

Permalink
sort events
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 23, 2024
1 parent d6b5c81 commit 5bedd08
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,23 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent {
switch protocol {
case config.ProtocolCanalJSON:
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})
default:
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}
}
//switch protocol {
//case config.ProtocolCanalJSON:
// sort.Slice(g.events, func(i, j int) bool {
// return g.events[i].CommitTs < g.events[j].CommitTs
// })
//default:
// if !sort.SliceIsSorted(g.events, func(i, j int) bool {
// return g.events[i].CommitTs < g.events[j].CommitTs
// }) {
// log.Warn("events are not sorted", zap.Int32("partition", g.partition),
// zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
// }
//}

sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolve
Expand Down

0 comments on commit 5bedd08

Please sign in to comment.