diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java
new file mode 100644
index 000000000000..43228fca4554
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's AWS DMS json data format and provides definitions for the message
+ * queue's record json deserialization class and parsing class {@link DMSRecordParser}.
+ */
+public class DMSDataFormat extends AbstractJsonDataFormat {
+
+ @Override
+ protected RecordParserFactory parser() {
+ return DMSRecordParser::new;
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java
new file mode 100644
index 000000000000..0be1270e8341
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link DMSDataFormat}. */
+public class DMSDataFormatFactory implements DataFormatFactory {
+
+ public static final String IDENTIFIER = "aws-dms-json";
+
+ @Override
+ public DataFormat create() {
+ return new DMSDataFormat();
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java
new file mode 100644
index 000000000000..8fc4808dd2d6
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/dms/DMSRecordParser.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.dms;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code DMSRecordParser} class extends the abstract {@link AbstractJsonRecordParser} and is
+ * designed to parse records from AWS DMS's JSON change data capture (CDC) format. AWS DMS is a CDC
+ * solution for RDMS that captures row-level changes to database tables and outputs them in JSON
+ * format. This parser extracts relevant information from the DMS-JSON format and converts it into a
+ * list of {@link RichCdcMultiplexRecord} objects.
+ *
+ *
The class supports various database operations such as INSERT, UPDATE, and DELETE, and creates
+ * corresponding {@link RichCdcMultiplexRecord} objects to represent these changes.
+ *
+ *
Validation is performed to ensure that the JSON records contain all necessary fields, and the
+ * class also supports schema extraction for the Kafka topic.
+ */
+public class DMSRecordParser extends AbstractJsonRecordParser {
+
+ private static final String FIELD_DATA = "data";
+ private static final String FIELD_METADATA = "metadata";
+ private static final String FIELD_TYPE = "record-type";
+ private static final String FIELD_OP = "operation";
+ private static final String FIELD_DATABASE = "schema-name";
+ private static final String FIELD_TABLE = "table-name";
+
+ private static final String OP_LOAD = "load";
+ private static final String OP_INSERT = "insert";
+ private static final String OP_UPDATE = "update";
+ private static final String OP_DELETE = "delete";
+
+ private static final String BEFORE_PREFIX = "BI_";
+
+ public DMSRecordParser(TypeMapping typeMapping, List computedColumns) {
+ super(typeMapping, computedColumns);
+ }
+
+ @Override
+ protected @Nullable String getTableName() {
+ JsonNode metaNode = getAndCheck(FIELD_METADATA);
+ return metaNode.get(FIELD_TABLE).asText();
+ }
+
+ @Override
+ protected List extractRecords() {
+ if (isDDL()) {
+ return Collections.emptyList();
+ }
+
+ JsonNode dataNode = getAndCheck(dataField());
+ String operation = getAndCheck(FIELD_METADATA).get(FIELD_OP).asText();
+ List records = new ArrayList<>();
+
+ switch (operation) {
+ case OP_LOAD:
+ case OP_INSERT:
+ processRecord(dataNode, RowKind.INSERT, records);
+ break;
+ case OP_UPDATE:
+ Pair dataAndBeforeNodes = splitBeforeAndData(dataNode);
+ processRecord(dataAndBeforeNodes.getRight(), RowKind.DELETE, records);
+ processRecord(dataAndBeforeNodes.getLeft(), RowKind.INSERT, records);
+ break;
+ case OP_DELETE:
+ processRecord(dataNode, RowKind.DELETE, records);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record operation: " + operation);
+ }
+
+ return records;
+ }
+
+ @Override
+ protected @Nullable String getDatabaseName() {
+ JsonNode metaNode = getAndCheck(FIELD_METADATA);
+ return metaNode.get(FIELD_DATABASE).asText();
+ }
+
+ @Override
+ protected String primaryField() {
+ return null;
+ }
+
+ @Override
+ protected String dataField() {
+ return FIELD_DATA;
+ }
+
+ @Override
+ protected String format() {
+ return "aws-dms-json";
+ }
+
+ @Override
+ protected boolean isDDL() {
+ String recordType = getAndCheck(FIELD_METADATA).get(FIELD_TYPE).asText();
+ return !"data".equals(recordType);
+ }
+
+ private Pair splitBeforeAndData(JsonNode dataNode) {
+ JsonNode newDataNode = dataNode.deepCopy();
+ JsonNode beforeDataNode = dataNode.deepCopy();
+
+ Iterator> newDataFields = newDataNode.fields();
+ while (newDataFields.hasNext()) {
+ Map.Entry next = newDataFields.next();
+ if (next.getKey().startsWith(BEFORE_PREFIX)) {
+ newDataFields.remove();
+ }
+ }
+
+ Iterator> beforeDataFields = beforeDataNode.fields();
+ while (beforeDataFields.hasNext()) {
+ Map.Entry next = beforeDataFields.next();
+ if (next.getKey().startsWith(BEFORE_PREFIX)) {
+ String key = next.getKey().replaceFirst(BEFORE_PREFIX, "");
+ ((ObjectNode) beforeDataNode).set(key, next.getValue());
+ beforeDataFields.remove();
+ }
+ }
+
+ return Pair.of(newDataNode, beforeDataNode);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 9c4c4d0ac3a2..17b8b29a2009 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -33,3 +33,4 @@ org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.json.JsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellDataFormatFactory
org.apache.paimon.flink.action.cdc.format.ogg.OggDataFormatFactory
+org.apache.paimon.flink.action.cdc.format.dms.DMSDataFormatFactory
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java
new file mode 100644
index 000000000000..da9f863dc07b
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncDatabaseActionITCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Map;
+
+/** IT cases for {@link KafkaSyncDatabaseAction}. */
+public class KafkaAWSDMSSyncDatabaseActionITCase extends KafkaSyncDatabaseActionITCase {
+
+ private static final String AWSDMS = "aws-dms";
+
+ @Override
+ protected KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(
+ Map kafkaConfig) {
+ KafkaSyncDatabaseActionBuilder builder = new KafkaSyncDatabaseActionBuilder(kafkaConfig);
+ builder.withPrimaryKeys("id");
+ return builder;
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionMultiTopic() throws Exception {
+ testSchemaEvolutionMultiTopic(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionOneTopic() throws Exception {
+ testSchemaEvolutionOneTopic(AWSDMS);
+ }
+
+ @Test
+ public void testTopicIsEmpty() {
+ testTopicIsEmpty(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTableAffixMultiTopic() throws Exception {
+ testTableAffixMultiTopic(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTableAffixOneTopic() throws Exception {
+ testTableAffixOneTopic(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testIncludingTables() throws Exception {
+ testIncludingTables(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testExcludingTables() throws Exception {
+ testExcludingTables(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testIncludingAndExcludingTables() throws Exception {
+ testIncludingAndExcludingTables(AWSDMS);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
new file mode 100644
index 000000000000..02ac86cdab69
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaAWSDMSSyncTableActionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/** IT cases for {@link KafkaSyncTableAction}. */
+public class KafkaAWSDMSSyncTableActionITCase extends KafkaSyncTableActionITCase {
+
+ private static final String AWSDMS = "aws-dms";
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolution() throws Exception {
+ runSingleTableSchemaEvolution("schemaevolution", AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testAssertSchemaCompatible() throws Exception {
+ testAssertSchemaCompatible(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionSpecific() throws Exception {
+ testStarUpOptionSpecific(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionLatest() throws Exception {
+ testStarUpOptionLatest(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionTimestamp() throws Exception {
+ testStarUpOptionTimestamp(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionEarliest() throws Exception {
+ testStarUpOptionEarliest(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testStarUpOptionGroup() throws Exception {
+ testStarUpOptionGroup(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testComputedColumn() throws Exception {
+ testComputedColumn(AWSDMS);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testFieldValNullSyncTable() throws Exception {
+ testTableFiledValNull(AWSDMS);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000000..d779b9fb4ad5
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/include/topic0/aws-dms-data-1.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"paimon_1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"paimon_2","transaction-id":670014899490}}
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"ignore","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database_affix","table-name":"flink","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000000..5ac7c5dbecef
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt
new file mode 100644
index 000000000000..56e1b53c1017
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic0/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"address":"Beijing"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt
new file mode 100644
index 000000000000..a0351adb7fd6
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt
new file mode 100644
index 000000000000..e59ef1c9a479
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/prefixsuffix/topic1/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":19},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"age":25},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt
new file mode 100644
index 000000000000..5ac7c5dbecef
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt
new file mode 100644
index 000000000000..eeb254d71c4e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic0/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":19},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"age":25},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t1","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt
new file mode 100644
index 000000000000..a0351adb7fd6
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt
new file mode 100644
index 000000000000..a189a9d85df7
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/database/schemaevolution/topic1/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"address":"Beijing"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"paimon_sync_database","table-name":"t2","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt
new file mode 100644
index 000000000000..cf9112abc4bf
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/computedcolumn/aws-dms-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"_id":101,"_date":"2023-03-23"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt
new file mode 100644
index 000000000000..42a00afe5b5e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt
new file mode 100644
index 000000000000..3ecfdab8b1a4
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":18},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"age":24},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt
new file mode 100644
index 000000000000..04e18e1db548
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"delete","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"address":"Beijing","BI_id":105,"BI_name":"hammer","BI_description":"14oz carpenter's hammer","BI_weight":0.875,"BI_address":"Shanghai"},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"update","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt
new file mode 100644
index 000000000000..e93607aed68d
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/schemaevolution/aws-dms-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":null},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt
new file mode 100644
index 000000000000..42a00afe5b5e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt
new file mode 100644
index 000000000000..70c0fb1675ea
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/startupmode/aws-dms-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt
new file mode 100644
index 000000000000..42a00afe5b5e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aws-dms/table/watermark/aws-dms-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
+{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"metadata":{"timestamp":"2024-10-27T12:39:03.210671Z","record-type":"data","operation":"insert","partition-key-type":"schema-table","schema-name":"test","table-name":"product","transaction-id":670014899490}}
\ No newline at end of file