diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
new file mode 100644
index 000000000000..ccbacdc2af5e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's debezium json data format and provides definitions for the message
+ * queue's record json deserialization class and parsing class {@link AliyunRecordParser}.
+ */
+public class AliyunDataFormat extends AbstractJsonDataFormat {
+
+ @Override
+ protected RecordParserFactory parser() {
+ return AliyunRecordParser::new;
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
new file mode 100644
index 000000000000..a07e2f205c90
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link AliyunDataFormat}. */
+public class AliyunDataFormatFactory implements DataFormatFactory {
+
+ public static final String IDENTIFIER = "aliyun-json";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DataFormat create() {
+ return new AliyunDataFormat();
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
new file mode 100644
index 000000000000..824ed9145943
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+/** Converts some special types such as enum、set、geometry. */
+public class AliyunFieldParser {
+
+ protected static byte[] convertGeoType2WkbArray(byte[] mysqlGeomBytes) {
+ int sridLength = 4;
+ boolean hasSrid = false;
+ for (int i = 0; i < sridLength; ++i) {
+ if (mysqlGeomBytes[i] != 0) {
+ hasSrid = true;
+ break;
+ }
+ }
+ byte[] wkb;
+ if (hasSrid) {
+ wkb = new byte[mysqlGeomBytes.length];
+ // byteOrder + geometry
+ System.arraycopy(mysqlGeomBytes, 4, wkb, 0, 5);
+ // srid
+ System.arraycopy(mysqlGeomBytes, 0, wkb, 5, 4);
+ // geometry
+ System.arraycopy(mysqlGeomBytes, 9, wkb, 9, wkb.length - 9);
+
+ // set srid flag
+ if (wkb[0] == 0) {
+ // big endian
+ wkb[1] = (byte) (wkb[1] + 32);
+ } else {
+ wkb[4] = (byte) (wkb[4] + 32);
+ }
+ } else {
+ wkb = new byte[mysqlGeomBytes.length - 4];
+ System.arraycopy(mysqlGeomBytes, 4, wkb, 0, wkb.length);
+ }
+ return wkb;
+ }
+
+ protected static String convertSet(String value, String mysqlType) {
+ // mysql set type value can be filled with more than one, value is a bit string conversion
+ // from the long
+ int indexes = Integer.parseInt(value);
+ return getSetValuesByIndex(mysqlType, indexes);
+ }
+
+ protected static String convertEnum(String value, String mysqlType) {
+ int elementIndex = Integer.parseInt(value);
+ // enum('a','b','c')
+ return getEnumValueByIndex(mysqlType, elementIndex);
+ }
+
+ protected static String getEnumValueByIndex(String mysqlType, int elementIndex) {
+ String[] options = extractEnumValueByIndex(mysqlType);
+
+ return options[elementIndex - 1];
+ }
+
+ protected static String getSetValuesByIndex(String mysqlType, int indexes) {
+ String[] options = extractSetValuesByIndex(mysqlType);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ int index = 0;
+ boolean first = true;
+ int optionLen = options.length;
+
+ while (indexes != 0L) {
+ if (indexes % 2L != 0) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(',');
+ }
+ if (index < optionLen) {
+ sb.append(options[index]);
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "extractSetValues from mysqlType[%s],index:%d failed",
+ mysqlType, indexes));
+ }
+ }
+ ++index;
+ indexes = indexes >>> 1;
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ private static String[] extractSetValuesByIndex(String mysqlType) {
+ // set('x','y')
+ return mysqlType.substring(5, mysqlType.length() - 2).split("'\\s*,\\s*'");
+ }
+
+ private static String[] extractEnumValueByIndex(String mysqlType) {
+ // enum('x','y')
+ return mysqlType.substring(6, mysqlType.length() - 2).split("'\\s*,\\s*'");
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
new file mode 100644
index 000000000000..e31b282a76cb
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.aliyun;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ *
The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ *
Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
+public class AliyunRecordParser extends AbstractJsonRecordParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AliyunRecordParser.class);
+
+ private static final String FIELD_IS_DDL = "isDdl";
+ private static final String FIELD_TYPE = "op";
+
+ private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE";
+ private static final String OP_UPDATE_AFTER = "UPDATE_AFTER";
+ private static final String OP_INSERT = "INSERT";
+ private static final String OP_DELETE = "DELETE";
+
+ private static final String FIELD_PAYLOAD = "payload";
+ private static final String FIELD_BEFORE = "before";
+ private static final String FIELD_AFTER = "after";
+ private static final String FIELD_COLUMN = "dataColumn";
+
+ private static final String FIELD_SCHEMA = "schema";
+ private static final String FIELD_PK = "primaryKey";
+
+ @Override
+ protected boolean isDDL() {
+ JsonNode node = root.get(FIELD_IS_DDL);
+ return !isNull(node) && node.asBoolean();
+ }
+
+ public AliyunRecordParser(TypeMapping typeMapping, List computedColumns) {
+ super(typeMapping, computedColumns);
+ }
+
+ @Override
+ protected String primaryField() {
+ return "schema.primaryKey";
+ }
+
+ @Override
+ protected String dataField() {
+ return "payload.dataColumn";
+ }
+
+ @Override
+ protected List extractPrimaryKeys() {
+ JsonNode schemaNode = root.get(FIELD_SCHEMA);
+ checkNotNull(schemaNode, FIELD_SCHEMA);
+ ArrayNode pkNode = getNodeAs(schemaNode, FIELD_PK, ArrayNode.class);
+ List pkFields = new ArrayList<>();
+ pkNode.forEach(
+ pk -> {
+ if (isNull(pk)) {
+ throw new IllegalArgumentException(
+ String.format("Primary key cannot be null: %s", pk));
+ }
+
+ pkFields.add(pk.asText());
+ });
+ return pkFields;
+ }
+
+ @Override
+ public List extractRecords() {
+ if (isDDL()) {
+ return Collections.emptyList();
+ }
+
+ List records = new ArrayList<>();
+
+ JsonNode payload = root.get(FIELD_PAYLOAD);
+ checkNotNull(payload, FIELD_PAYLOAD);
+
+ String type = payload.get(FIELD_TYPE).asText();
+
+ RowKind rowKind = null;
+ String field = null;
+ switch (type) {
+ case OP_UPDATE_BEFORE:
+ rowKind = RowKind.UPDATE_BEFORE;
+ field = FIELD_BEFORE;
+ break;
+ case OP_UPDATE_AFTER:
+ rowKind = RowKind.UPDATE_AFTER;
+ field = FIELD_AFTER;
+ break;
+ case OP_INSERT:
+ rowKind = RowKind.INSERT;
+ field = FIELD_AFTER;
+ break;
+ case OP_DELETE:
+ rowKind = RowKind.DELETE;
+ field = FIELD_BEFORE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record operation: " + type);
+ }
+
+ JsonNode container = payload.get(field);
+ checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field));
+
+ JsonNode data = getNodeAs(container, FIELD_COLUMN, JsonNode.class);
+ checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field, FIELD_COLUMN));
+
+ processRecord(data, rowKind, records);
+
+ return records;
+ }
+
+ @Override
+ protected Map extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
+
+ Map recordMap =
+ JsonSerdeUtil.convertValue(record, new TypeReference