Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support aliyun-json when sinking data from kafka with paimon-flink-action #4564

Closed
wants to merge 0 commits into from

Conversation

JackeyLee007
Copy link
Contributor

[flink]

Purpose

Linked issue: close #4529

To support the json format of aliyun Data Integration, DI in short.
The data in kafka collected by DI from mysql or oracle, are not in standard format. Not canal-json or debezium json. I would like call it aliyun-json.

We want to sink the data to paimon from kafka directly, with the kafka_sync_database/table action. So the aliyun-json must be supported, and parsed into cdc record.

Tests

Supplied with the commit.

API and Format

No.

Documentation

When used to process the data collected by ID, with paimon-flink-action, it just need to specify the json format as the following:

... paimon-flink-action-<version>.jar
kafka_sync_database 
...
--kafka_conf value.format=aliyun-json 
...

@@ -97,7 +97,7 @@ protected void applySchemaChange(
e);
}
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
// } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why modify here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be a typo. Correct with later commit.

AliyunRecordParser parser =
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
for (String json : deleteList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please translate to english

AliyunRecordParser parser =
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
for (String json : updateList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

AliyunRecordParser parser =
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
for (String json : insertList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support the json format which is used by the data integration of Aliyun Dataworks
2 participants