Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Kafka debezium json supports automatic discovery of primary keys #2815

Closed

Conversation

sunxiaojian
Copy link
Contributor

@sunxiaojian sunxiaojian commented Jan 29, 2024

Purpose

Linked issue: close #2802

Tests

API and Format

Documentation

@sunxiaojian sunxiaojian force-pushed the auto-discovery-primary-key branch 3 times, most recently from c341796 to 5ba8031 Compare January 30, 2024 04:17
@sunxiaojian
Copy link
Contributor Author

sunxiaojian commented Jan 30, 2024

@sunxiaojian sunxiaojian force-pushed the auto-discovery-primary-key branch 3 times, most recently from 1d483f3 to 6b0aa0d Compare January 31, 2024 06:16
@sunxiaojian sunxiaojian force-pushed the auto-discovery-primary-key branch 2 times, most recently from dd5d186 to 138732c Compare March 27, 2024 09:30
@jiangjin-f
Copy link

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

@sunxiaojian
Copy link
Contributor Author

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

@jiangjin-f When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

@sunxiaojian sunxiaojian force-pushed the auto-discovery-primary-key branch 3 times, most recently from 0944bbe to 71890b5 Compare March 29, 2024 09:27
@sunxiaojian sunxiaojian force-pushed the auto-discovery-primary-key branch from 71890b5 to 2232472 Compare March 29, 2024 09:41
@medivh511
Copy link

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

@jiangjin-f When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

Seemed that still not committed into master branch, i hope it could get the primary key from kafka key , not from the kafka value, because the Debezium format doesn't contain the pknames

@lalalaYu
Copy link

lalalaYu commented Jul 13, 2024

我尝试实现DebeziumDeserializationSchema, 在deserialize 重载方法中获取keys ,按照Canal format 在payload嵌入pkNames,然后使用paimon的kafka_sync_database可以实现从Debezium-json获得主键建表
`
@OverRide
public void deserialize(SourceRecord record, Collector out) throws Exception {
if (this.jsonConverter == null) {
this.initializeJsonConverter();
}
byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
out.collect(initJsonString(record.key(), new String(bytes)));
}

/**
 * 参考JsonDebeziumDeserializationSchema对key的解析提取转换为value JSON的pkNames
 * 重写基于cdc data开起了schema的结构
 * @param key 记录的键,为Struct类型
 * @param value 记录的值,该值将被转换为JSON字符串的一部分
 * @return 返回一个JSON字符串
 */
private String initJsonString(Object key, String value) throws Exception {
    ObjectMapper mapper = new ObjectMapper();
    ObjectNode element = (ObjectNode) mapper.readTree(value);
    ObjectNode pNode = (ObjectNode) element.get("payload");
    if (key != null) {
        Struct struct = (Struct)key;
        ArrayNode arrayNode = mapper.createArrayNode();
        struct.schema().fields().forEach(field -> arrayNode.add(field.name()));
        pNode.putIfAbsent("pkNames", arrayNode);
    } else {
        // 目前让key为null的报错,之后根据经验可以考虑改为key为null的默认值
        throw new Exception("Conversion error-YSJ: key is null that is required and has no default value");
    }
    return element.toString();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Kafka debezium json supports automatic discovery of primary keys
4 participants