Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add event key output for debezium (#11649) #11965

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 13 additions & 55 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,61 +445,19 @@ func mysqlTypeFromTiDBType(tidbType string) byte {
return result
}

const (
replacementChar = "_"
numberPrefix = "_"
)

// sanitizeName escapes not permitted chars for avro
// debezium-core/src/main/java/io/debezium/schema/FieldNameSelector.java
// https://avro.apache.org/docs/current/spec.html#names
func sanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && (c >= '0' && c <= '9') {
sb.WriteString(numberPrefix)
sb.WriteRune(c)
changed = true
} else if !(c == '_' ||
('a' <= c && c <= 'z') ||
('A' <= c && c <= 'Z') ||
('0' <= c && c <= '9')) {
sb.WriteString(replacementChar)
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}

// sanitizeTopic escapes ".", it may have special meanings for sink connectors
func sanitizeTopic(name string) string {
return strings.ReplaceAll(name, ".", replacementChar)
}

// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func escapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option
return strings.ReplaceAll(name, ".", "_")
}

// <empty> | <name>[(<dot><name>)*]
func getAvroNamespace(namespace string, schema string) string {
return sanitizeName(namespace) + "." + sanitizeName(schema)
ns := common.SanitizeName(namespace)
s := common.SanitizeName(schema)
if s != "" {
return ns + "." + s
}
return ns
}

type avroSchema struct {
Expand Down Expand Up @@ -564,7 +522,7 @@ func (a *BatchEncoder) columns2AvroSchema(
) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Name: sanitizeName(tableName.Table),
Name: common.SanitizeName(tableName.Table),
Namespace: getAvroNamespace(a.namespace, tableName.Schema),
Fields: nil,
}
Expand All @@ -577,7 +535,7 @@ func (a *BatchEncoder) columns2AvroSchema(
return nil, err
}
field := make(map[string]interface{})
field["name"] = sanitizeName(col.Name)
field["name"] = common.SanitizeName(col.Name)

copied := *col
copied.Value = copied.Default
Expand Down Expand Up @@ -676,9 +634,9 @@ func (a *BatchEncoder) columns2AvroData(

// https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[sanitizeName(col.Name)] = goavro.Union(str, data)
ret[common.SanitizeName(col.Name)] = goavro.Union(str, data)
} else {
ret[sanitizeName(col.Name)] = data
ret[common.SanitizeName(col.Name)] = data
}
}

Expand Down Expand Up @@ -787,7 +745,7 @@ func (a *BatchEncoder) columnToAvroSchema(
case mysql.TypeEnum, mysql.TypeSet:
es := make([]string, 0, len(ft.GetElems()))
for _, e := range ft.GetElems() {
e = escapeEnumAndSetOptions(e)
e = common.EscapeEnumAndSetOptions(e)
es = append(es, e)
}
return avroSchema{
Expand Down
14 changes: 10 additions & 4 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,14 @@ func TestAvroEnvelope(t *testing.T) {
func TestSanitizeName(t *testing.T) {
t.Parallel()

require.Equal(t, "normalColumnName123", sanitizeName("normalColumnName123"))
require.Equal(t, "normalColumnName123", common.SanitizeName("normalColumnName123"))
require.Equal(
t,
"_1ColumnNameStartWithNumber",
sanitizeName("1ColumnNameStartWithNumber"),
common.SanitizeName("1ColumnNameStartWithNumber"),
)
require.Equal(t, "A_B", sanitizeName("A.B"))
require.Equal(t, "columnNameWith__", sanitizeName("columnNameWith中文"))
require.Equal(t, "A_B", common.SanitizeName("A.B"))
require.Equal(t, "columnNameWith______", common.SanitizeName("columnNameWith中文"))
}

func TestGetAvroNamespace(t *testing.T) {
Expand All @@ -933,6 +933,12 @@ func TestGetAvroNamespace(t *testing.T) {
"N_amespace.S_chema",
getAvroNamespace("N-amespace", "S.chema"),
)

require.Equal(
t,
"normalNamespace",
getAvroNamespace("normalNamespace", ""),
)
}

func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
Expand Down
92 changes: 92 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"database/sql"
"fmt"
"math"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -180,3 +181,94 @@
}
return bytes[pos:]
}

const (
replacementChar = "_"
numberPrefix = 'x'
)

// EscapeEnumAndSetOptions escapes ",", "\" and "”"
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func EscapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option

Check warning on line 198 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L194-L198

Added lines #L194 - L198 were not covered by tests
}

func isValidFirstCharacter(c rune) bool {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'

Check warning on line 202 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}

func isValidNonFirstCharacter(c rune) bool {
return isValidFirstCharacter(c) || (c >= '0' && c <= '9')

Check warning on line 206 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L205-L206

Added lines #L205 - L206 were not covered by tests
}

func isValidNonFirstCharacterForTopicName(c rune) bool {
return isValidNonFirstCharacter(c) || c == '.'

Check warning on line 210 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L209-L210

Added lines #L209 - L210 were not covered by tests
}

// SanitizeName escapes not permitted chars
// https://avro.apache.org/docs/1.12.0/specification/#names
// see https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/schema/SchemaNameAdjuster.java
func SanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && !isValidFirstCharacter(c) {
sb.WriteString(replacementChar)
if c >= '0' && c <= '9' {
sb.WriteRune(c)
}
changed = true
} else if !isValidNonFirstCharacter(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 234 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L216-L234

Added lines #L216 - L234 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 245 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L237-L245

Added lines #L237 - L245 were not covered by tests
}

// SanitizeTopicName escapes not permitted chars for topic name
// https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java
func SanitizeTopicName(name string) string {
changed := false
var sb strings.Builder
for _, c := range name {
if !isValidNonFirstCharacterForTopicName(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 262 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L250-L262

Added lines #L250 - L262 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Table name sanitize",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 273 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L265-L273

Added lines #L265 - L273 were not covered by tests
}
Loading
Loading