Skip to content

Commit

Permalink
codec(ticdc): simple protocol support avro encoding format (#10359)
Browse files Browse the repository at this point in the history
close #10358
  • Loading branch information
3AceShowHand authored Jan 8, 2024
1 parent cae7e8c commit 1372b29
Show file tree
Hide file tree
Showing 37 changed files with 2,993 additions and 722 deletions.
5 changes: 4 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
AvroEnableWatermark: oldConfig.AvroEnableWatermark,
AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode,
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
EncodingFormat: oldConfig.EncodingFormat,
}
}

Expand Down Expand Up @@ -634,6 +635,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
AvroEnableWatermark: oldConfig.AvroEnableWatermark,
AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode,
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
EncodingFormat: oldConfig.EncodingFormat,
}
}

Expand Down Expand Up @@ -1192,9 +1194,10 @@ type Capture struct {
type CodecConfig struct {
EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty"`
MaxBatchSize *int `json:"max_batch_size,omitempty"`
AvroEnableWatermark *bool `json:"avro_enable_watermark"`
AvroEnableWatermark *bool `json:"avro_enable_watermark,omitempty"`
AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"`
EncodingFormat *string `json:"encoding_format,omitempty"`
}

// PulsarConfig represents a pulsar sink configuration
Expand Down
105 changes: 33 additions & 72 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ type consumerOption struct {
maxMessageBytes int
maxBatchSize int

protocol config.Protocol
enableTiDBExtension bool
enableRowChecksum bool
protocol config.Protocol
codecConfig *common.Config

// the replicaConfig of the changefeed which produce data to the kafka topic
replicaConfig *config.ReplicaConfig
Expand Down Expand Up @@ -159,53 +158,34 @@ func (o *consumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
}

s = upstreamURI.Query().Get("protocol")
if s != "" {
protocol, err := config.ParseSinkProtocolFromString(s)
if err != nil {
log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s))
}
o.protocol = protocol
}

s = upstreamURI.Query().Get("enable-tidb-extension")
if s != "" {
enableTiDBExtension, err := strconv.ParseBool(s)
if err != nil {
log.Panic("invalid enable-tidb-extension of upstream-uri")
}
if enableTiDBExtension {
if o.protocol != config.ProtocolCanalJSON && o.protocol != config.ProtocolAvro {
log.Panic("enable-tidb-extension only work with canal-json / avro")
}
}
o.enableTiDBExtension = enableTiDBExtension
if s == "" {
log.Panic("cannot found the protocol from the sink url")
}

s = upstreamURI.Query().Get("enable-row-checksum")
if s != "" {
enableRowChecksum, err := strconv.ParseBool(s)
if err != nil {
log.Panic("invalid enable-row-checksum of upstream-uri")
}
if enableRowChecksum {
if o.protocol != config.ProtocolAvro {
log.Panic("enable-row-checksum only work with avro")
}
}
o.enableRowChecksum = enableRowChecksum
protocol, err := config.ParseSinkProtocolFromString(s)
if err != nil {
log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s))
}
o.protocol = protocol

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = util.AddressOf(protocol.String())
if configFile != "" {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = util.AddressOf(o.protocol.String())
err := cmdUtil.StrictDecodeFile(configFile, "kafka consumer", replicaConfig)
err = cmdUtil.StrictDecodeFile(configFile, "kafka consumer", replicaConfig)
if err != nil {
return cerror.Trace(err)
}
if _, err := filter.VerifyTableRules(replicaConfig.Filter); err != nil {
if _, err = filter.VerifyTableRules(replicaConfig.Filter); err != nil {
return cerror.Trace(err)
}
o.replicaConfig = replicaConfig
}
o.replicaConfig = replicaConfig

o.codecConfig = common.NewConfig(protocol)
if err = o.codecConfig.Apply(upstreamURI, o.replicaConfig); err != nil {
return cerror.Trace(err)
}
if protocol == config.ProtocolAvro {
o.codecConfig.AvroEnableWatermark = true
}

log.Info("consumer option adjusted",
Expand All @@ -217,10 +197,7 @@ func (o *consumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
zap.String("groupID", o.groupID),
zap.Int("maxMessageBytes", o.maxMessageBytes),
zap.Int("maxBatchSize", o.maxBatchSize),
zap.Any("protocol", o.protocol),
zap.Bool("enableTiDBExtension", o.enableTiDBExtension),
zap.Bool("enableRowChecksum", o.enableRowChecksum))

zap.String("upstreamURI", upstreamURI.String()))
return nil
}

Expand Down Expand Up @@ -463,8 +440,6 @@ type Consumer struct {

tz *time.Location

codecConfig *common.Config

option *consumerOption

upstreamTiDB *sql.DB
Expand All @@ -490,33 +465,19 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) {
tableIDs: make(map[string]int64),
}

c.codecConfig = common.NewConfig(o.protocol)
c.codecConfig.EnableTiDBExtension = o.enableTiDBExtension
c.codecConfig.EnableRowChecksum = o.enableRowChecksum
if c.codecConfig.Protocol == config.ProtocolAvro {
c.codecConfig.AvroEnableWatermark = true
}

if o.replicaConfig != nil && o.replicaConfig.Sink != nil && o.replicaConfig.Sink.KafkaConfig != nil {
c.codecConfig.LargeMessageHandle = o.replicaConfig.Sink.KafkaConfig.LargeMessageHandle
}

if c.codecConfig.LargeMessageHandle.HandleKeyOnly() {
if o.codecConfig.LargeMessageHandle.HandleKeyOnly() {
db, err := openDB(ctx, c.option.upstreamTiDBDSN)
if err != nil {
return nil, err
}
c.upstreamTiDB = db
}

if o.replicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka")
if err != nil {
return nil, cerror.Trace(err)
}
c.eventRouter = eventRouter
c.codecConfig.EnableRowChecksum = o.replicaConfig.Integrity.Enabled()
eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka")
if err != nil {
return nil, cerror.Trace(err)
}
c.eventRouter = eventRouter

c.sinks = make([]*partitionSinks, o.partitionNum)
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -616,11 +577,11 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
err error
)

switch c.codecConfig.Protocol {
switch c.option.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
decoder, err = open.NewBatchDecoder(ctx, c.option.codecConfig, c.upstreamTiDB)
case config.ProtocolCanalJSON:
decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
decoder, err = canal.NewBatchDecoder(ctx, c.option.codecConfig, c.upstreamTiDB)
if err != nil {
return err
}
Expand All @@ -629,11 +590,11 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if err != nil {
return cerror.Trace(err)
}
decoder = avro.NewDecoder(c.codecConfig, schemaM, c.option.topic, c.tz)
decoder = avro.NewDecoder(c.option.codecConfig, schemaM, c.option.topic, c.tz)
case config.ProtocolSimple:
decoder, err = simple.NewDecoder(ctx, c.codecConfig, c.upstreamTiDB)
decoder, err = simple.NewDecoder(ctx, c.option.codecConfig, c.upstreamTiDB)
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol))
log.Panic("Protocol not supported", zap.Any("Protocol", c.option.protocol))
}
if err != nil {
return cerror.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ dispatcher failed

["CDC:ErrEncodeFailed"]
error = '''
encode failed: %s
encode failed
'''

["CDC:ErrEtcdIgnore"]
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ const (
"max-batch-size": 100000,
"avro-enable-watermark": true,
"avro-decimal-handling-mode": "string",
"avro-bigint-unsigned-handling-mode": "string"
"avro-bigint-unsigned-handling-mode": "string",
"encoding-format": "json"
},
"large-message-handle": {
"large-message-handle-option": "handle-key-only",
Expand Down Expand Up @@ -445,7 +446,8 @@ const (
"max-batch-size": 100000,
"avro-enable-watermark": true,
"avro-decimal-handling-mode": "string",
"avro-bigint-unsigned-handling-mode": "string"
"avro-bigint-unsigned-handling-mode": "string",
"encoding-format": "json"
},
"large-message-handle": {
"large-message-handle-option": "handle-key-only",
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
AvroEnableWatermark: aws.Bool(true),
AvroDecimalHandlingMode: aws.String("string"),
AvroBigintUnsignedHandlingMode: aws.String("string"),
EncodingFormat: aws.String("json"),
},
LargeMessageHandle: &LargeMessageHandleConfig{
LargeMessageHandleOption: LargeMessageHandleOptionHandleKeyOnly,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ type CodecConfig struct {
AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"`
AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"`
EncodingFormat *string `toml:"encoding-format" json:"encoding-format,omitempty"`
}

// KafkaConfig represents a kafka sink configuration
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var (

// codec related errors
ErrEncodeFailed = errors.Normalize(
"encode failed: %s",
"encode failed",
errors.RFCCodeText("CDC:ErrEncodeFailed"),
)
ErrDecodeFailed = errors.Normalize(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}

if found {
if err := common.VerifyChecksum(event.Columns, expectedChecksum); err != nil {
if err := common.VerifyChecksum(event.Columns, uint32(expectedChecksum)); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,26 @@ type Config struct {
// for open protocol
OnlyOutputUpdatedColumns bool

// for the simple protocol, can be "json" and "avro", default to "json"
EncodingFormat EncodingFormatType

// Currently only Debezium protocol is aware of the time zone
TimeZone *time.Location

// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema bool
}

// EncodingFormatType is the type of encoding format
type EncodingFormatType string

const (
// EncodingFormatJSON is the json format
EncodingFormatJSON EncodingFormatType = "json"
// EncodingFormatAvro is the avro format
EncodingFormatAvro EncodingFormatType = "avro"
)

// NewConfig return a Config for codec
func NewConfig(protocol config.Protocol) *Config {
return &Config{
Expand All @@ -103,6 +116,8 @@ func NewConfig(protocol config.Protocol) *Config {
DeleteOnlyHandleKeyColumns: false,
LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(),

EncodingFormat: EncodingFormatJSON,

TimeZone: time.Local,
}
}
Expand Down Expand Up @@ -143,6 +158,9 @@ type urlConfig struct {
ContentCompatible *bool `form:"content-compatible"`

DebeziumDisableSchema *bool `form:"debezium-disable-schema"`
// EncodingFormatType is only works for the simple protocol,
// can be `json` and `avro`, default to `json`.
EncodingFormatType *string `form:"encoding-format"`
}

// Apply fill the Config
Expand Down Expand Up @@ -235,6 +253,19 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
}
}

if c.Protocol == config.ProtocolSimple {
s := util.GetOrZero(urlParameter.EncodingFormatType)
if s != "" {
encodingFormat := EncodingFormatType(s)
switch encodingFormat {
case EncodingFormatJSON, EncodingFormatAvro:
c.EncodingFormat = encodingFormat
default:
return cerror.ErrCodecInvalidConfig.GenWithStack(
"unsupported encoding format type: %s for the simple protocol", encodingFormat)
}
}
}
if urlParameter.DebeziumDisableSchema != nil {
c.DebeziumDisableSchema = *urlParameter.DebeziumDisableSchema
}
Expand Down Expand Up @@ -263,6 +294,7 @@ func mergeConfig(
dest.AvroEnableWatermark = codecConfig.AvroEnableWatermark
dest.AvroDecimalHandlingMode = codecConfig.AvroDecimalHandlingMode
dest.AvroBigintUnsignedHandlingMode = codecConfig.AvroBigintUnsignedHandlingMode
dest.EncodingFormatType = codecConfig.EncodingFormat
}
}
if replicaConfig.Sink.DebeziumDisableSchema != nil {
Expand Down
30 changes: 30 additions & 0 deletions pkg/sink/codec/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func TestMergeConfig(t *testing.T) {
AvroEnableWatermark: aws.Bool(true),
AvroBigintUnsignedHandlingMode: aws.String("ab"),
AvroDecimalHandlingMode: aws.String("cd"),
EncodingFormat: aws.String("json"),
},
LargeMessageHandle: &config.LargeMessageHandleConfig{
LargeMessageHandleOption: config.LargeMessageHandleOptionHandleKeyOnly,
Expand Down Expand Up @@ -440,6 +441,7 @@ func TestMergeConfig(t *testing.T) {
AvroEnableWatermark: aws.Bool(false),
AvroBigintUnsignedHandlingMode: aws.String("adb"),
AvroDecimalHandlingMode: aws.String("cde"),
EncodingFormat: aws.String("avro"),
},
LargeMessageHandle: &config.LargeMessageHandleConfig{
LargeMessageHandleOption: config.LargeMessageHandleOptionClaimCheck,
Expand Down Expand Up @@ -483,3 +485,31 @@ func TestApplyConfig4CanalJSON(t *testing.T) {
require.True(t, codecConfig.ContentCompatible)
require.True(t, codecConfig.OnlyOutputUpdatedColumns)
}

func TestConfig4Simple(t *testing.T) {
uri := "kafka://127.0.0.1:9092/abc?protocol=simple"
sinkURL, err := url.Parse(uri)
require.NoError(t, err)

codecConfig := NewConfig(config.ProtocolSimple)
err = codecConfig.Apply(sinkURL, config.GetDefaultReplicaConfig())
require.NoError(t, err)
require.Equal(t, EncodingFormatJSON, codecConfig.EncodingFormat)

uri = "kafka://127.0.0.1:9092/abc?protocol=simple&encoding-format=avro"
sinkURL, err = url.Parse(uri)
require.NoError(t, err)

codecConfig = NewConfig(config.ProtocolSimple)
err = codecConfig.Apply(sinkURL, config.GetDefaultReplicaConfig())
require.NoError(t, err)
require.Equal(t, EncodingFormatAvro, codecConfig.EncodingFormat)

uri = "kafka://127.0.0.1:9092/abc?protocol=simple&encoding-format=xxx"
sinkURL, err = url.Parse(uri)
require.NoError(t, err)

codecConfig = NewConfig(config.ProtocolSimple)
err = codecConfig.Apply(sinkURL, config.GetDefaultReplicaConfig())
require.ErrorIs(t, err, cerror.ErrCodecInvalidConfig)
}
Loading

0 comments on commit 1372b29

Please sign in to comment.