Skip to content

Commit

Permalink
Merge branch 'master' into hidden-table-id
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 19, 2024
2 parents 7dbc7de + 6a53270 commit f1993a0
Show file tree
Hide file tree
Showing 23 changed files with 457 additions and 243 deletions.
58 changes: 51 additions & 7 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,77 @@ package main
import (
"sort"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap"
)

// EventsGroup could store change event message.
type eventsGroup struct {
events []*model.RowChangedEvent
partition int32
tableID int64

events []*model.RowChangedEvent
highWatermark uint64
}

// NewEventsGroup will create new event group.
func NewEventsGroup() *eventsGroup {
func NewEventsGroup(partition int32, tableID int64) *eventsGroup {
return &eventsGroup{
events: make([]*model.RowChangedEvent, 0),
partition: partition,
tableID: tableID,
events: make([]*model.RowChangedEvent, 0),
}
}

// Append will append an event to event groups.
func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {
g.events = append(g.events, row)
if row.CommitTs > g.highWatermark {
g.highWatermark = row.CommitTs
}
log.Info("DML event received",
zap.Int32("partition", g.partition),
zap.Any("offset", offset),
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("highWatermark", g.highWatermark),
zap.Int64("physicalTableID", row.GetTableID()),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent {
switch protocol {
case config.ProtocolCanalJSON:
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})
default:
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}
}

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
return g.events[i].CommitTs > resolve
})

result := g.events[:i]
g.events = g.events[i:]

if len(result) != 0 && len(g.events) != 0 {
log.Warn("not all events resolved",
zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID),
zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)),
zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs))
}

return result
}
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func main() {
upstreamURIStr string
configFile string
)

groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String())
consumerOption := newOption()
flag.StringVar(&configFile, "config", "", "config file for changefeed")
Expand Down Expand Up @@ -84,6 +83,7 @@ func main() {
consumer := newConsumer(ctx, consumerOption)
var wg sync.WaitGroup
if consumerOption.enableProfiling {
log.Info("profiling is enabled")
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion cmd/kafka-consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (o *option) Adjust(upstreamURI *url.URL, configFile string) error {
zap.String("groupID", o.groupID),
zap.Int("maxMessageBytes", o.maxMessageBytes),
zap.Int("maxBatchSize", o.maxBatchSize),
zap.String("upstreamURI", upstreamURI.String()))
zap.String("upstreamURI", upstreamURI.String()),
zap.String("downstreamURI", o.downstreamURI))
return nil
}
Loading

0 comments on commit f1993a0

Please sign in to comment.