From 5bedd08d8f687ddd6ef993752677769397fa7185 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 23 Dec 2024 17:48:31 +0800 Subject: [PATCH] sort events --- cmd/kafka-consumer/event_group.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index 03e09aa68e2..56aa16a39b4 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -60,19 +60,23 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) { // Resolve will get events where CommitTs is less than resolveTs. func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent { - switch protocol { - case config.ProtocolCanalJSON: - sort.Slice(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) - default: - if !sort.SliceIsSorted(g.events, func(i, j int) bool { - return g.events[i].CommitTs < g.events[j].CommitTs - }) { - log.Warn("events are not sorted", zap.Int32("partition", g.partition), - zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) - } - } + //switch protocol { + //case config.ProtocolCanalJSON: + // sort.Slice(g.events, func(i, j int) bool { + // return g.events[i].CommitTs < g.events[j].CommitTs + // }) + //default: + // if !sort.SliceIsSorted(g.events, func(i, j int) bool { + // return g.events[i].CommitTs < g.events[j].CommitTs + // }) { + // log.Warn("events are not sorted", zap.Int32("partition", g.partition), + // zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) + // } + //} + + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) i := sort.Search(len(g.events), func(i int) bool { return g.events[i].CommitTs > resolve