Skip to content

Commit

Permalink
Merge branch 'master' into canal-json-table-id
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 20, 2024
2 parents db98c2f + fa598ba commit de98de0
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 290 deletions.
3 changes: 0 additions & 3 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,6 @@ func (s *Snapshot) SchemaCount() (count int) {

// DumpToString dumps the snapshot to a string.
func (s *Snapshot) DumpToString() string {
s.rwlock.RLock()
defer s.rwlock.RUnlock()

schemas := make([]string, 0, s.inner.schemas.Len())
s.IterSchemas(func(dbInfo *timodel.DBInfo) {
schemas = append(schemas, fmt.Sprintf("%v", dbInfo))
Expand Down
3 changes: 1 addition & 2 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ func (s *schemaStorage) AllPhysicalTables(ctx context.Context, ts model.Ts) ([]m
log.Debug("get new schema snapshot",
zap.Uint64("ts", ts),
zap.Uint64("snapTs", snap.CurrentTs()),
zap.Any("tables", res),
zap.String("snapshot", snap.DumpToString()))
zap.Any("tables", res))

return res, nil
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
}
}

job, err := p.unmarshalDDL(ddlRawKV)
job, err := p.unmarshalDDL(ctx, ddlRawKV)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -232,12 +232,12 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
return nil
}

func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, error) {
func (p *ddlJobPullerImpl) unmarshalDDL(ctx context.Context, rawKV *model.RawKVEntry) (*timodel.Job, error) {
if rawKV.OpType != model.OpTypePut {
return nil, nil
}
if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initDDLTableInfo()
err := p.initDDLTableInfo(ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -254,7 +254,7 @@ func (p *ddlJobPullerImpl) setResolvedTs(ts uint64) {
atomic.StoreUint64(&p.resolvedTs, ts)
}

func (p *ddlJobPullerImpl) initDDLTableInfo() error {
func (p *ddlJobPullerImpl) initDDLTableInfo(ctx context.Context) error {
version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
Expand All @@ -271,7 +271,7 @@ func (p *ddlJobPullerImpl) initDDLTableInfo() error {
return errors.Trace(err)
}

tbls, err := snap.ListTables(db.ID)
tbls, err := snap.ListTables(ctx, db.ID)
if err != nil {
return errors.Trace(err)
}
Expand Down
26 changes: 22 additions & 4 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,30 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {

functions := map[string]func(error){
"WriteEvents": func(expected error) {
err := lw.WriteEvents(ctx, events...)
require.ErrorIs(t, errors.Cause(err), expected)
if expected == nil {
err := lw.WriteEvents(ctx, events...)
require.NoError(t, err)
} else {
require.Eventually(
t, func() bool {
err := lw.WriteEvents(ctx, events...)
return errors.Is(errors.Cause(err), expected)
}, time.Second*2, time.Microsecond*10,
)
}
},
"FlushLog": func(expected error) {
err := lw.FlushLog(ctx)
require.ErrorIs(t, errors.Cause(err), expected)
if expected == nil {
err := lw.FlushLog(ctx)
require.NoError(t, err)
} else {
require.Eventually(
t, func() bool {
err := lw.WriteEvents(ctx, events...)
return errors.Is(errors.Cause(err), expected)
}, time.Second*2, time.Microsecond*10,
)
}
},
}
firstCall := true
Expand Down
58 changes: 51 additions & 7 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,77 @@ package main
import (
"sort"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap"
)

// EventsGroup could store change event message.
type eventsGroup struct {
events []*model.RowChangedEvent
partition int32
tableID int64

events []*model.RowChangedEvent
highWatermark uint64
}

// NewEventsGroup will create new event group.
func NewEventsGroup() *eventsGroup {
func NewEventsGroup(partition int32, tableID int64) *eventsGroup {
return &eventsGroup{
events: make([]*model.RowChangedEvent, 0),
partition: partition,
tableID: tableID,
events: make([]*model.RowChangedEvent, 0),
}
}

// Append will append an event to event groups.
func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {
g.events = append(g.events, row)
if row.CommitTs > g.highWatermark {
g.highWatermark = row.CommitTs
}
log.Info("DML event received",
zap.Int32("partition", g.partition),
zap.Any("offset", offset),
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("highWatermark", g.highWatermark),
zap.Int64("physicalTableID", row.GetTableID()),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent {
switch protocol {
case config.ProtocolCanalJSON:
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})
default:
if !sort.SliceIsSorted(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
}) {
log.Warn("events are not sorted", zap.Int32("partition", g.partition),
zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events)))
}
}

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
return g.events[i].CommitTs > resolve
})

result := g.events[:i]
g.events = g.events[i:]

if len(result) != 0 && len(g.events) != 0 {
log.Warn("not all events resolved",
zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID),
zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)),
zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs))
}

return result
}
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func main() {
upstreamURIStr string
configFile string
)

groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String())
consumerOption := newOption()
flag.StringVar(&configFile, "config", "", "config file for changefeed")
Expand Down Expand Up @@ -84,6 +83,7 @@ func main() {
consumer := newConsumer(ctx, consumerOption)
var wg sync.WaitGroup
if consumerOption.enableProfiling {
log.Info("profiling is enabled")
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion cmd/kafka-consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (o *option) Adjust(upstreamURI *url.URL, configFile string) error {
zap.String("groupID", o.groupID),
zap.Int("maxMessageBytes", o.maxMessageBytes),
zap.Int("maxBatchSize", o.maxBatchSize),
zap.String("upstreamURI", upstreamURI.String()))
zap.String("upstreamURI", upstreamURI.String()),
zap.String("downstreamURI", o.downstreamURI))
return nil
}
Loading

0 comments on commit de98de0

Please sign in to comment.