From b754fdb13c1c2a4f67f5d27a595e80986666ffc9 Mon Sep 17 00:00:00 2001 From: HunterXHunter Date: Tue, 10 Sep 2024 11:53:52 +0800 Subject: [PATCH] [cdc] Refactor codes cdcTimestampExtractor to separate different cdc dataSources. (#4144) --- .../cdc/MessageQueueSyncTableActionBase.java | 7 + .../action/cdc/SynchronizationActionBase.java | 11 +- .../cdc/kafka/KafkaSyncDatabaseAction.java | 7 + .../cdc/mongodb/MongoDBActionUtils.java | 5 + .../mongodb/MongoDBCdcTimestampExtractor.java | 39 +++++ .../mongodb/MongoDBSyncDatabaseAction.java | 6 + .../cdc/mongodb/MongoDBSyncTableAction.java | 6 + .../action/cdc/mysql/MySqlActionUtils.java | 5 + .../cdc/mysql/MySqlSyncDatabaseAction.java | 6 + .../cdc/mysql/MySqlSyncTableAction.java | 6 + .../cdc/mysql/MysqlCdcTimestampExtractor.java | 38 +++++ .../cdc/pulsar/PulsarSyncDatabaseAction.java | 7 + .../CdcDebeziumTimestampExtractor.java | 34 +++++ .../cdc/watermark/CdcTimestampExtractor.java | 31 ++++ .../CdcTimestampExtractorFactory.java | 144 ------------------ .../cdc/watermark/CdcWatermarkStrategy.java | 1 - .../MessageQueueCdcTimestampExtractor.java | 63 ++++++++ .../CdcDebeziumTimestampExtractorITCase.java | 12 +- 18 files changed, 274 insertions(+), 154 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java delete mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index 28ad5b52aec5..9c629b5a516f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -19,6 +19,8 @@ package org.apache.paimon.flink.action.cdc; import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; import org.apache.paimon.schema.Schema; import java.util.Map; @@ -69,6 +71,11 @@ protected Schema retrieveSchema() throws Exception { } } + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return new MessageQueueCdcTimestampExtractor(); + } + @Override protected Schema buildPaimonSchema(Schema retrievedSchema) { return CdcActionCommonUtils.buildPaimonSchema( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index b42366d2699a..21f6c6b3e7ed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; @@ -52,7 +53,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; -import static org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.createExtractor; /** Base {@link Action} for table/database synchronizing job. */ public abstract class SynchronizationActionBase extends ActionBase { @@ -132,6 +132,11 @@ protected Object buildSource() { return syncJobHandler.provideSource(); } + protected CdcTimestampExtractor createCdcTimestampExtractor() { + throw new IllegalArgumentException( + "Unsupported timestamp extractor for current cdc source."); + } + private DataStreamSource buildDataStreamSource(Object source) { if (source instanceof Source) { boolean isAutomaticWatermarkCreationEnabled = @@ -146,13 +151,13 @@ private DataStreamSource buildDataStreamSource(Object source) { WatermarkStrategy watermarkStrategy = isAutomaticWatermarkCreationEnabled ? watermarkAlignGroup != null - ? new CdcWatermarkStrategy(createExtractor(source)) + ? new CdcWatermarkStrategy(createCdcTimestampExtractor()) .withWatermarkAlignment( watermarkAlignGroup, options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), options.get( SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)) - : new CdcWatermarkStrategy(createExtractor(source)) + : new CdcWatermarkStrategy(createCdcTimestampExtractor()) : WatermarkStrategy.noWatermarks(); if (idleTimeout != null) { watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 4c87307e3261..cce4c4e686c4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -20,6 +20,8 @@ import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase; import org.apache.paimon.flink.action.cdc.SyncJobHandler; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; import java.util.Map; @@ -33,4 +35,9 @@ public KafkaSyncDatabaseAction( Map kafkaConfig) { super(warehouse, database, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA); } + + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return new MessageQueueCdcTimestampExtractor(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java index c86307fe7015..848e96c6960b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.flink.cdc.connectors.base.options.SourceOptions; import org.apache.flink.cdc.connectors.base.options.StartupOptions; @@ -139,4 +140,8 @@ public static MongoDBSource buildMongodbSource( return sourceBuilder.deserializer(schema).build(); } + + public static CdcTimestampExtractor createCdcTimestampExtractor() { + return new MongoDBCdcTimestampExtractor(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java new file mode 100644 index 000000000000..08ca8ce79cb5 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBCdcTimestampExtractor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mongodb; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +/** Timestamp extractor for MongoDB sources in CDC applications. */ +public class MongoDBCdcTimestampExtractor extends CdcDebeziumTimestampExtractor { + + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException { + JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class); + // If the record is a schema-change event return Long.MIN_VALUE as result. + return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "ts_ms"); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index 4120aa1ba06a..5895139609e0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase; import org.apache.paimon.flink.action.cdc.SyncJobHandler; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; @@ -58,6 +59,11 @@ public MongoDBSyncDatabaseAction( super(warehouse, database, catalogConfig, mongodbConfig, SyncJobHandler.SourceType.MONGODB); } + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return MongoDBActionUtils.createCdcTimestampExtractor(); + } + @Override protected MongoDBSource buildSource() { return MongoDBActionUtils.buildMongodbSource( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index c1bf21b5ba2a..16dbbadfd776 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.SyncJobHandler; import org.apache.paimon.flink.action.cdc.SyncTableActionBase; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.paimon.schema.Schema; import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; @@ -69,6 +70,11 @@ protected Schema retrieveSchema() { return MongodbSchemaUtils.getMongodbSchema(cdcSourceConfig); } + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return MongoDBActionUtils.createCdcTimestampExtractor(); + } + @Override protected MongoDBSource buildSource() { String tableList = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index 792b763117f9..6888f672e089 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils; import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo; import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.paimon.schema.Schema; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; @@ -275,4 +276,8 @@ public static void registerJdbcDriver() { } } } + + public static CdcTimestampExtractor createCdcTimestampExtractor() { + return new MysqlCdcTimestampExtractor(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index ecdb6c65f482..316168e1ed4c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -29,6 +29,7 @@ import org.apache.paimon.flink.action.cdc.TableNameConverter; import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo; import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; @@ -180,6 +181,11 @@ protected void beforeBuildingSourceSink() throws Exception { + "MySQL database are not compatible with those of existed Paimon tables. Please check the log."); } + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return MySqlActionUtils.createCdcTimestampExtractor(); + } + @Override protected MySqlSource buildSource() { return MySqlActionUtils.buildMySqlSource( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index eeb273265772..a05832b1d033 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -25,6 +25,7 @@ import org.apache.paimon.flink.action.cdc.SyncTableActionBase; import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo; import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; import org.apache.paimon.schema.Schema; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; @@ -109,6 +110,11 @@ protected MySqlSource buildSource() { return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList, typeMapping); } + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return MySqlActionUtils.createCdcTimestampExtractor(); + } + private void validateMySqlTableInfos(JdbcSchemasInfo mySqlSchemasInfo) { List nonPkTables = mySqlSchemasInfo.nonPkTables(); checkArgument( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java new file mode 100644 index 000000000000..cb11120fd3af --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MysqlCdcTimestampExtractor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +/** Timestamp extractor for MySQL sources in CDC applications. */ +public class MysqlCdcTimestampExtractor extends CdcDebeziumTimestampExtractor { + + @Override + public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException { + JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class); + + return JsonSerdeUtil.extractValueOrDefault( + json, Long.class, Long.MIN_VALUE, "payload", "ts_ms"); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java index fa7b7353e914..d097feec67c2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java @@ -20,6 +20,8 @@ import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase; import org.apache.paimon.flink.action.cdc.SyncJobHandler; +import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; import java.util.Map; @@ -33,4 +35,9 @@ public PulsarSyncDatabaseAction( Map pulsarConfig) { super(warehouse, database, catalogConfig, pulsarConfig, SyncJobHandler.SourceType.PULSAR); } + + @Override + protected CdcTimestampExtractor createCdcTimestampExtractor() { + return new MessageQueueCdcTimestampExtractor(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java new file mode 100644 index 000000000000..515b74bd43fc --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcDebeziumTimestampExtractor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.watermark; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** Timestamp extractor for Cdc debezium deserialization. */ +public abstract class CdcDebeziumTimestampExtractor implements CdcTimestampExtractor { + + protected final ObjectMapper objectMapper = new ObjectMapper(); + + public CdcDebeziumTimestampExtractor() { + objectMapper + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java new file mode 100644 index 000000000000..647fd01154c8 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.watermark; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.Serializable; + +/** Interface defining the contract for CDC timestamp extraction. */ +public interface CdcTimestampExtractor extends Serializable { + + long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException; +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java deleted file mode 100644 index 26733465a054..000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.watermark; - -import org.apache.paimon.flink.action.cdc.CdcSourceRecord; -import org.apache.paimon.utils.JsonSerdeUtil; - -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; -import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.pulsar.source.PulsarSource; - -import java.io.Serializable; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; - -/** Factory for creating CDC timestamp extractors based on different source types. */ -public class CdcTimestampExtractorFactory implements Serializable { - - private static final long serialVersionUID = 1L; - - private static final Map, Supplier> extractorMap = - new HashMap<>(); - - static { - extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new); - extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new); - extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new); - extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new); - } - - public static CdcTimestampExtractor createExtractor(Object source) { - Supplier extractorSupplier = extractorMap.get(source.getClass()); - if (extractorSupplier != null) { - return extractorSupplier.get(); - } - throw new IllegalArgumentException( - "Unsupported source type: " + source.getClass().getName()); - } - - /** Timestamp extractor for MongoDB sources in CDC applications. */ - public static class MongoDBCdcTimestampExtractor extends CdcDebeziumTimestampExtractor { - - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException { - JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class); - // If the record is a schema-change event return Long.MIN_VALUE as result. - return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "ts_ms"); - } - } - - /** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */ - public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor { - - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(CdcSourceRecord cdcSourceRecord) - throws JsonProcessingException { - JsonNode record = (JsonNode) cdcSourceRecord.getValue(); - if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) { - // Canal json - return JsonSerdeUtil.extractValue(record, Long.class, "ts"); - } else if (JsonSerdeUtil.isNodeExists(record, "pos")) { - // Ogg json - String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts"); - DateTimeFormatter formatter = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter); - return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - } else if (JsonSerdeUtil.isNodeExists(record, "xid")) { - // Maxwell json - return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000; - } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) { - // Dbz json - return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms"); - } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) { - // Dbz json - return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms"); - } - throw new RuntimeException( - String.format( - "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", - record)); - } - } - - /** Timestamp extractor for MySQL sources in CDC applications. */ - public static class MysqlCdcTimestampExtractor extends CdcDebeziumTimestampExtractor { - - @Override - public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException { - JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), JsonNode.class); - - return JsonSerdeUtil.extractValueOrDefault( - json, Long.class, Long.MIN_VALUE, "payload", "ts_ms"); - } - } - - /** Timestamp extractor for Cdc debezium deserialization. */ - public abstract static class CdcDebeziumTimestampExtractor implements CdcTimestampExtractor { - - protected final ObjectMapper objectMapper = new ObjectMapper(); - - public CdcDebeziumTimestampExtractor() { - objectMapper - .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - } - - /** Interface defining the contract for CDC timestamp extraction. */ - public interface CdcTimestampExtractor extends Serializable { - - long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException; - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java index 964699a47bb5..f7f8680970e0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.action.cdc.watermark; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; -import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java new file mode 100644 index 000000000000..8a9a28453bad --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.watermark; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +/** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */ +public class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor { + + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcessingException { + JsonNode record = (JsonNode) cdcSourceRecord.getValue(); + if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) { + // Canal json + return JsonSerdeUtil.extractValue(record, Long.class, "ts"); + } else if (JsonSerdeUtil.isNodeExists(record, "pos")) { + // Ogg json + String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts"); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter); + return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } else if (JsonSerdeUtil.isNodeExists(record, "xid")) { + // Maxwell json + return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000; + } else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) { + // Dbz json + return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms"); + } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) { + // Dbz json + return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms"); + } + throw new RuntimeException( + String.format( + "Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", + record)); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java index ea5bcfe12c40..71cb0edd0e43 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java @@ -18,8 +18,10 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.flink.action.cdc.mongodb.MongoDBCdcTimestampExtractor; import org.apache.paimon.flink.action.cdc.mysql.DebeziumEventTest; -import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory; +import org.apache.paimon.flink.action.cdc.mysql.MysqlCdcTimestampExtractor; +import org.apache.paimon.flink.action.cdc.watermark.CdcDebeziumTimestampExtractor; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; @@ -33,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; -/** IT cases for {@link CdcTimestampExtractorFactory.CdcDebeziumTimestampExtractor}. */ +/** IT cases for {@link CdcDebeziumTimestampExtractor}. */ public class CdcDebeziumTimestampExtractorITCase { private ObjectMapper objectMapper; @@ -49,8 +51,7 @@ public void before() { @Test public void testMysqlCdcTimestampExtractor() throws Exception { - CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor extractor = - new CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor(); + MysqlCdcTimestampExtractor extractor = new MysqlCdcTimestampExtractor(); JsonNode data = objectMapper.readValue("{\"payload\" : {\"ts_ms\": 1}}", JsonNode.class); CdcSourceRecord record = new CdcSourceRecord(data.toString()); @@ -69,8 +70,7 @@ record = new CdcSourceRecord(schemaChangeEvent.toString()); @Test public void testMongodbCdcTimestampExtractor() throws Exception { - CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor extractor = - new CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor(); + MongoDBCdcTimestampExtractor extractor = new MongoDBCdcTimestampExtractor(); JsonNode data = objectMapper.readValue("{\"ts_ms\": 1}", JsonNode.class); CdcSourceRecord record = new CdcSourceRecord(data.toString());