Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][cdc] Handle Kafka CDC Debezium/Canal JSON from Flink SQL #2706

Open
1 of 2 tasks
JingsongLi opened this issue Jan 16, 2024 · 3 comments
Open
1 of 2 tasks

[Feature][cdc] Handle Kafka CDC Debezium/Canal JSON from Flink SQL #2706

JingsongLi opened this issue Jan 16, 2024 · 3 comments
Assignees
Labels
enhancement New feature or request

Comments

@JingsongLi
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

At present, some Kafka CDC data comes from Flink SQL, and the CDC data produced by Flink SQL lacks a lot of information (even the data from Flink CDC lacks PK information). There is no schema or PK, and we should also have the ability to handle such scenarios in our Paimon Kafka CDC.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@JingsongLi JingsongLi added the enhancement New feature or request label Jan 16, 2024
@Pandas886
Copy link
Contributor

I have the same problem.First, I use mysqlcdc to kafka topic.

image

Then I use kafka cdc action to paimon.I found that the table that automatically builds paimon does not correctly identify the primary key.

the record in topic like that:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_password"
          },
          {
            "type": "int16",
            "optional": true,
            "field": "user_type"
          },
          {
            "type": "string",
            "optional": true,
            "field": "email"
          },
          {
            "type": "string",
            "optional": true,
            "field": "phone"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "tenant_id"
          },
          {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "create_time"
          },
          {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "update_time"
          },
          {
            "type": "string",
            "optional": true,
            "field": "queue"
          },
          {
            "type": "int16",
            "optional": true,
            "field": "state"
          },
          {
            "type": "string",
            "optional": true,
            "field": "time_zone"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "ttt"
          }
        ],
        "optional": true,
        "name": "mysql_binlog_source.dolphinscheduler3.t_ds_user.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_password"
          },
          {
            "type": "int16",
            "optional": true,
            "field": "user_type"
          },
          {
            "type": "string",
            "optional": true,
            "field": "email"
          },
          {
            "type": "string",
            "optional": true,
            "field": "phone"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "tenant_id"
          },
          {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "create_time"
          },
          {
            "type": "string",
            "optional": true,
            "name": "com.darcytech.debezium.datetime.string",
            "field": "update_time"
          },
          {
            "type": "string",
            "optional": true,
            "field": "queue"
          },
          {
            "type": "int16",
            "optional": true,
            "field": "state"
          },
          {
            "type": "string",
            "optional": true,
            "field": "time_zone"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "ttt"
          }
        ],
        "optional": true,
        "name": "mysql_binlog_source.dolphinscheduler3.t_ds_user.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "mysql_binlog_source.dolphinscheduler3.t_ds_user.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "user_name": "admin",
      "user_password": "7ad2410b2f4c074479a8937a28a22b8f",
      "user_type": 0,
      "email": "[email protected]",
      "phone": "",
      "tenant_id": 1,
      "create_time": "2018-03-27 15:48:50",
      "update_time": "2022-11-30 02:46:58",
      "queue": "",
      "state": 1,
      "time_zone": null,
      "ttt": 2
    },
    "source": {
      "version": "1.6.4.Final",
      "connector": "mysql",
      "name": "mysql_binlog_source",
      "ts_ms": 0,
      "snapshot": "false",
      "db": "dolphinscheduler3",
      "sequence": null,
      "table": "t_ds_user",
      "server_id": 0,
      "gtid": null,
      "file": "",
      "pos": 0,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "r",
    "ts_ms": 1705541903980,
    "transaction": null
  }
}

the id column is my table primary key.

@yuzelin
Copy link
Contributor

yuzelin commented Jan 25, 2024

@Pandas886 In your case, maybe you can specify --primary_key ?

@Pandas886
Copy link
Contributor

@Pandas886 In your case, maybe you can specify --primary_key ?

the topic has all tables in a database.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants