diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 28ad5b52aec5..9c629b5a516f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
+import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
 import org.apache.paimon.schema.Schema;
 
 import java.util.Map;
@@ -69,6 +71,11 @@ protected Schema retrieveSchema() throws Exception {
         }
     }
 
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return new MessageQueueCdcTimestampExtractor();
+    }
+
     @Override
     protected Schema buildPaimonSchema(Schema retrievedSchema) {
         return CdcActionCommonUtils.buildPaimonSchema(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index b42366d2699a..21f6c6b3e7ed 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -24,6 +24,7 @@
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -52,7 +53,6 @@
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
-import static org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.createExtractor;
 
 /** Base {@link Action} for table/database synchronizing job. */
 public abstract class SynchronizationActionBase extends ActionBase {
@@ -132,6 +132,11 @@ protected Object buildSource() {
         return syncJobHandler.provideSource();
     }
 
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        throw new IllegalArgumentException(
+                "Unsupported timestamp extractor for current cdc source.");
+    }
+
     private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object source) {
         if (source instanceof Source) {
             boolean isAutomaticWatermarkCreationEnabled =
@@ -146,13 +151,13 @@ private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object source) {
             WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
                     isAutomaticWatermarkCreationEnabled
                             ? watermarkAlignGroup != null
-                                    ? new CdcWatermarkStrategy(createExtractor(source))
+                                    ? new CdcWatermarkStrategy(createCdcTimestampExtractor())
                                             .withWatermarkAlignment(
                                                     watermarkAlignGroup,
                                                     options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
                                                     options.get(
                                                             SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
-                                    : new CdcWatermarkStrategy(createExtractor(source))
+                                    : new CdcWatermarkStrategy(createCdcTimestampExtractor())
                             : WatermarkStrategy.noWatermarks();
             if (idleTimeout != null) {
                 watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 4c87307e3261..cce4c4e686c4 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -20,6 +20,8 @@
 
 import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
+import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
 
 import java.util.Map;
 
@@ -33,4 +35,9 @@ public KafkaSyncDatabaseAction(
             Map<String, String> kafkaConfig) {
         super(warehouse, database, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA);
     }
+
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return new MessageQueueCdcTimestampExtractor();
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index c86307fe7015..848e96c6960b 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -20,6 +20,7 @@
 
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 
 import org.apache.flink.cdc.connectors.base.options.SourceOptions;
 import org.apache.flink.cdc.connectors.base.options.StartupOptions;
@@ -139,4 +140,8 @@ public static MongoDBSource<CdcSourceRecord> buildMongodbSource(
 
         return sourceBuilder.deserializer(schema).build();
     }
+
+    public static CdcTimestampExtractor createCdcTimestampExtractor() {
+        return new MongoDBCdcTimestampExtractor();
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java
new file mode 100644
index 000000000000..08ca8ce79cb5
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/** Timestamp extractor for MongoDB sources in CDC applications. */
+public class MongoDBCdcTimestampExtractor extends CdcDebeziumTimestampExtractor {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
+        JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class);
+        // If the record is a schema-change event return Long.MIN_VALUE as result.
+        return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "ts_ms");
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 4120aa1ba06a..5895139609e0 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -22,6 +22,7 @@
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 
 import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
 import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
@@ -58,6 +59,11 @@ public MongoDBSyncDatabaseAction(
         super(warehouse, database, catalogConfig, mongodbConfig, SyncJobHandler.SourceType.MONGODB);
     }
 
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return MongoDBActionUtils.createCdcTimestampExtractor();
+    }
+
     @Override
     protected MongoDBSource<CdcSourceRecord> buildSource() {
         return MongoDBActionUtils.buildMongodbSource(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index c1bf21b5ba2a..16dbbadfd776 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -21,6 +21,7 @@
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
@@ -69,6 +70,11 @@ protected Schema retrieveSchema() {
         return MongodbSchemaUtils.getMongodbSchema(cdcSourceConfig);
     }
 
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return MongoDBActionUtils.createCdcTimestampExtractor();
+    }
+
     @Override
     protected MongoDBSource<CdcSourceRecord> buildSource() {
         String tableList =
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 792b763117f9..6888f672e089 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -24,6 +24,7 @@
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
 import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
@@ -275,4 +276,8 @@ public static void registerJdbcDriver() {
             }
         }
     }
+
+    public static CdcTimestampExtractor createCdcTimestampExtractor() {
+        return new MysqlCdcTimestampExtractor();
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index ecdb6c65f482..316168e1ed4c 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -29,6 +29,7 @@
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
 import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
@@ -180,6 +181,11 @@ protected void beforeBuildingSourceSink() throws Exception {
                         + "MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
     }
 
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return MySqlActionUtils.createCdcTimestampExtractor();
+    }
+
     @Override
     protected MySqlSource<CdcSourceRecord> buildSource() {
         return MySqlActionUtils.buildMySqlSource(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index eeb273265772..a05832b1d033 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -25,6 +25,7 @@
 import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
 import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
 import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
@@ -109,6 +110,11 @@ protected MySqlSource<CdcSourceRecord> buildSource() {
         return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList, typeMapping);
     }
 
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return MySqlActionUtils.createCdcTimestampExtractor();
+    }
+
     private void validateMySqlTableInfos(JdbcSchemasInfo mySqlSchemasInfo) {
         List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
         checkArgument(
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java
new file mode 100644
index 000000000000..cb11120fd3af
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/** Timestamp extractor for MySQL sources in CDC applications. */
+public class MysqlCdcTimestampExtractor extends CdcDebeziumTimestampExtractor {
+
+    @Override
+    public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
+        JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class);
+
+        return JsonSerdeUtil.extractValueOrDefault(
+                json, Long.class, Long.MIN_VALUE, "payload", "ts_ms");
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java
index fa7b7353e914..d097feec67c2 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java
@@ -20,6 +20,8 @@
 
 import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
+import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
+import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
 
 import java.util.Map;
 
@@ -33,4 +35,9 @@ public PulsarSyncDatabaseAction(
             Map<String, String> pulsarConfig) {
         super(warehouse, database, catalogConfig, pulsarConfig, SyncJobHandler.SourceType.PULSAR);
     }
+
+    @Override
+    protected CdcTimestampExtractor createCdcTimestampExtractor() {
+        return new MessageQueueCdcTimestampExtractor();
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java
new file mode 100644
index 000000000000..515b74bd43fc
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Timestamp extractor for Cdc debezium deserialization. */
+public abstract class CdcDebeziumTimestampExtractor implements CdcTimestampExtractor {
+
+    protected final ObjectMapper objectMapper = new ObjectMapper();
+
+    public CdcDebeziumTimestampExtractor() {
+        objectMapper
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java
new file mode 100644
index 000000000000..647fd01154c8
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.Serializable;
+
+/** Interface defining the contract for CDC timestamp extraction. */
+public interface CdcTimestampExtractor extends Serializable {
+
+    long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException;
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
deleted file mode 100644
index 26733465a054..000000000000
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.watermark;
-
-import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
-import org.apache.paimon.utils.JsonSerdeUtil;
-
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
-import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-
-import java.io.Serializable;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Supplier;
-
-/** Factory for creating CDC timestamp extractors based on different source types. */
-public class CdcTimestampExtractorFactory implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap =
-            new HashMap<>();
-
-    static {
-        extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
-        extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
-        extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
-        extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
-    }
-
-    public static CdcTimestampExtractor createExtractor(Object source) {
-        Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
-        if (extractorSupplier != null) {
-            return extractorSupplier.get();
-        }
-        throw new IllegalArgumentException(
-                "Unsupported source type: " + source.getClass().getName());
-    }
-
-    /** Timestamp extractor for MongoDB sources in CDC applications. */
-    public static class MongoDBCdcTimestampExtractor extends CdcDebeziumTimestampExtractor {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
-            JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class);
-            // If the record is a schema-change event return Long.MIN_VALUE as result.
-            return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "ts_ms");
-        }
-    }
-
-    /** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */
-    public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public long extractTimestamp(CdcSourceRecord cdcSourceRecord)
-                throws JsonProcessingException {
-            JsonNode record = (JsonNode) cdcSourceRecord.getValue();
-            if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
-                // Canal json
-                return JsonSerdeUtil.extractValue(record, Long.class, "ts");
-            } else if (JsonSerdeUtil.isNodeExists(record, "pos")) {
-                // Ogg json
-                String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
-                DateTimeFormatter formatter =
-                        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
-                LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
-                return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
-            } else if (JsonSerdeUtil.isNodeExists(record, "xid")) {
-                // Maxwell json
-                return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000;
-            } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
-                // Dbz json
-                return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
-            } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
-                // Dbz json
-                return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
-            }
-            throw new RuntimeException(
-                    String.format(
-                            "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s",
-                            record));
-        }
-    }
-
-    /** Timestamp extractor for MySQL sources in CDC applications. */
-    public static class MysqlCdcTimestampExtractor extends CdcDebeziumTimestampExtractor {
-
-        @Override
-        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
-            JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class);
-
-            return JsonSerdeUtil.extractValueOrDefault(
-                    json, Long.class, Long.MIN_VALUE, "payload", "ts_ms");
-        }
-    }
-
-    /** Timestamp extractor for Cdc debezium deserialization. */
-    public abstract static class CdcDebeziumTimestampExtractor implements CdcTimestampExtractor {
-
-        protected final ObjectMapper objectMapper = new ObjectMapper();
-
-        public CdcDebeziumTimestampExtractor() {
-            objectMapper
-                    .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
-                    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        }
-    }
-
-    /** Interface defining the contract for CDC timestamp extraction. */
-    public interface CdcTimestampExtractor extends Serializable {
-
-        long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException;
-    }
-}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
index 964699a47bb5..f7f8680970e0 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.watermark;
 
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
-import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
new file mode 100644
index 000000000000..8a9a28453bad
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.watermark;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */
+public class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcessingException {
+        JsonNode record = (JsonNode) cdcSourceRecord.getValue();
+        if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
+            // Canal json
+            return JsonSerdeUtil.extractValue(record, Long.class, "ts");
+        } else if (JsonSerdeUtil.isNodeExists(record, "pos")) {
+            // Ogg json
+            String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
+            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+            LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
+            return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        } else if (JsonSerdeUtil.isNodeExists(record, "xid")) {
+            // Maxwell json
+            return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000;
+        } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
+            // Dbz json
+            return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
+        } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
+            // Dbz json
+            return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+        }
+        throw new RuntimeException(
+                String.format(
+                        "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s",
+                        record));
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
index ea5bcfe12c40..71cb0edd0e43 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.flink.action.cdc.mongodb.MongoDBCdcTimestampExtractor;
 import org.apache.paimon.flink.action.cdc.mysql.DebeziumEventTest;
-import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory;
+import org.apache.paimon.flink.action.cdc.mysql.MysqlCdcTimestampExtractor;
+import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -33,7 +35,7 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** IT cases for {@link CdcTimestampExtractorFactory.CdcDebeziumTimestampExtractor}. */
+/** IT cases for {@link CdcDebeziumTimestampExtractor}. */
 public class CdcDebeziumTimestampExtractorITCase {
 
     private ObjectMapper objectMapper;
@@ -49,8 +51,7 @@ public void before() {
 
     @Test
     public void testMysqlCdcTimestampExtractor() throws Exception {
-        CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor extractor =
-                new CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor();
+        MysqlCdcTimestampExtractor extractor = new MysqlCdcTimestampExtractor();
 
         JsonNode data = objectMapper.readValue("{\"payload\" : {\"ts_ms\": 1}}", JsonNode.class);
         CdcSourceRecord record = new CdcSourceRecord(data.toString());
@@ -69,8 +70,7 @@ record = new CdcSourceRecord(schemaChangeEvent.toString());
 
     @Test
     public void testMongodbCdcTimestampExtractor() throws Exception {
-        CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor extractor =
-                new CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor();
+        MongoDBCdcTimestampExtractor extractor = new MongoDBCdcTimestampExtractor();
 
         JsonNode data = objectMapper.readValue("{\"ts_ms\": 1}", JsonNode.class);
         CdcSourceRecord record = new CdcSourceRecord(data.toString());