Skip to content

Commit

Permalink
encoder (ticdc): simple protocol send bootstrap event periodically (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored Jan 4, 2024
1 parent 59208da commit d060a7d
Show file tree
Hide file tree
Showing 31 changed files with 1,026 additions and 141 deletions.
18 changes: 18 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,13 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.Sink.DebeziumDisableSchema = util.AddressOf(*c.Sink.DebeziumDisableSchema)
}

if c.Sink.SendBootstrapIntervalInSec != nil {
res.Sink.SendBootstrapIntervalInSec = util.AddressOf(*c.Sink.SendBootstrapIntervalInSec)
}

if c.Sink.SendBootstrapInMsgCount != nil {
res.Sink.SendBootstrapInMsgCount = util.AddressOf(*c.Sink.SendBootstrapInMsgCount)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -782,6 +789,15 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}

if cloned.Sink.SendBootstrapIntervalInSec != nil {
res.Sink.SendBootstrapIntervalInSec = util.AddressOf(*cloned.Sink.SendBootstrapIntervalInSec)
}

if cloned.Sink.SendBootstrapInMsgCount != nil {
res.Sink.SendBootstrapInMsgCount = util.AddressOf(*cloned.Sink.SendBootstrapInMsgCount)
}

if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
Expand Down Expand Up @@ -959,6 +975,8 @@ type SinkConfig struct {
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var defaultAPIConfig = &ReplicaConfig{
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
DebeziumDisableSchema: util.AddressOf(false),
},
Consistent: &ConsistentConfig{
Expand Down
20 changes: 20 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,19 @@ func (d *DDLEvent) FromJobWithArgs(
}
}

// NewBootstrapDDLEvent returns a bootstrap DDL event.
// We set Bootstrap DDL event's startTs and commitTs to 0.
// Because it is generated by the TiCDC, not from the upstream TiDB.
// And they ere useless for a bootstrap DDL event.
func NewBootstrapDDLEvent(tableInfo *TableInfo) *DDLEvent {
return &DDLEvent{
StartTs: 0,
CommitTs: 0,
TableInfo: tableInfo,
IsBootstrap: true,
}
}

// SingleTableTxn represents a transaction which includes many row events in a single table
//
//msgp:ignore SingleTableTxn
Expand Down Expand Up @@ -912,3 +925,10 @@ func (t *SingleTableTxn) Append(row *RowChangedEvent) {
func (t *SingleTableTxn) ToWaitFlush() bool {
return t.FinishWg != nil
}

// TopicPartitionKey contains the topic and partition key of the message.
type TopicPartitionKey struct {
Topic string
Partition int32
PartitionKey string
}
153 changes: 153 additions & 0 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions cdc/model/sink_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func NewKafkaDMLSink(

metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh)
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
log.Info("DML sink producer created",
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
txn.Callback()
continue
}

rowCallback := toRowCallback(txn.Callback, uint64(len(txn.Event.Rows)))
for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
Expand Down Expand Up @@ -196,8 +195,10 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
// We already limit the memory usage by MemoryQuota at SinkManager level.
// So it is safe to send the event to a unbounded channel here.
s.alive.worker.msgChan.In() <- mqEvent{
key: codec.TopicPartitionKey{
Topic: topic, Partition: index, PartitionKey: key,
key: model.TopicPartitionKey{
Topic: topic,
Partition: index,
PartitionKey: key,
},
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: row,
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func NewPulsarDMLSink(
return nil, cerror.WrapError(cerror.ErrPulsarInvalidConfig, err)
}

concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
Expand Down
Loading

0 comments on commit d060a7d

Please sign in to comment.