From e6cb26cb0455d09abbf97a78c26a9f882f1618e2 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Mon, 14 Oct 2024 11:41:59 +0800 Subject: [PATCH 1/3] [cdc] Add flink execution mode check for cdc table sync action. --- .../action/cdc/SynchronizationActionBase.java | 10 ++++++ .../cdc/mysql/MySqlSyncDatabaseAction.java | 1 + .../cdc/mysql/MySqlSyncTableAction.java | 1 + .../flink/action/cdc/CdcActionITCaseBase.java | 10 ++++++ .../cdc/mysql/MySqlSyncTableActionITCase.java | 35 +++++++++++++++++++ .../test/resources/mysql/sync_table_setup.sql | 11 ++++++ 6 files changed, 68 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 2b9b08917700..cb8cff8b7e1d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -32,10 +32,12 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -53,6 +55,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Base {@link Action} for table/database synchronizing job. */ public abstract class SynchronizationActionBase extends ActionBase { @@ -137,6 +140,13 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() { "Unsupported timestamp extractor for current cdc source."); } + protected void validateRuntimeExecutionMode() { + checkArgument( + env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + != RuntimeExecutionMode.BATCH, + "Don't support batch mode for flink-cdc sync table action."); + } + private DataStreamSource buildDataStreamSource(Object source) { if (source instanceof Source) { boolean isAutomaticWatermarkCreationEnabled = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 01be020f7405..f8ea8cdc4438 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -188,6 +188,7 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() { @Override protected MySqlSource buildSource() { + validateRuntimeExecutionMode(); return MySqlActionUtils.buildMySqlSource( cdcSourceConfig, tableList( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index a05832b1d033..d73d9702f1e1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -102,6 +102,7 @@ protected Schema retrieveSchema() throws Exception { @Override protected MySqlSource buildSource() { + validateRuntimeExecutionMode(); String tableList = String.format( "(%s)\\.(%s)", diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 08289569086a..00a8b236173b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -38,6 +38,7 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterEach; @@ -214,6 +215,15 @@ protected List nullableToArgs(String argKey, @Nullable T nullable) { } public JobClient runActionWithDefaultEnv(ActionBase action) throws Exception { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + action.withStreamExecutionEnvironment(env).build(); + JobClient client = env.executeAsync(); + waitJobRunning(client); + return client; + } + + public JobClient runActionWithBatchEnv(ActionBase action) throws Exception { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); action.withStreamExecutionEnvironment(env).build(); JobClient client = env.executeAsync(); waitJobRunning(client); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index ce18d0b1f0e8..0783fcac3e4e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1497,4 +1497,39 @@ public void testUnknowMysqlScanStartupMode() { + scanStartupMode + "'. Valid scan.startup.mode for MySQL CDC are [initial, earliest-offset, latest-offset, specific-offset, timestamp, snapshot]")); } + + @Test + @Timeout(60) + public void testRuntimeExecutionModeCheckForCdcSync() throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "check_cdc_sync_runtime_execution_mode"); + mySqlConfig.put("table-name", "t"); + + Map tableConfig = getBasicTableConfig(); + tableConfig.put(CoreOptions.WRITE_ONLY.key(), "true"); + + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + + assertThatThrownBy(() -> runActionWithBatchEnv(action)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Don't support batch mode for flink-cdc sync table action")); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + try (Statement statement = getStatement()) { + statement.executeUpdate("USE check_cdc_sync_runtime_execution_mode"); + statement.executeUpdate("INSERT INTO t VALUES (1, 'one'), (2, 'two')"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + List primaryKeys = Collections.singletonList("k"); + List expected = Arrays.asList("+I[1, one]", "+I[2, two]"); + waitForResult(expected, table, rowType, primaryKeys); + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 965f884ec680..10a0f20d45aa 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -445,3 +445,14 @@ CREATE TABLE t ( k INT PRIMARY KEY, v1 VARCHAR(10) ); + +-- ################################################################################ +-- testRuntimeExecutionModeCheckForCdcSync +-- ################################################################################ + +CREATE DATABASE check_cdc_sync_runtime_execution_mode; +USE check_cdc_sync_runtime_execution_mode; +CREATE TABLE t ( + k INT PRIMARY KEY, + v1 VARCHAR(10) +); \ No newline at end of file From 0662b5181cd73a1b9d4d67f79b557c4e9d0b0cf6 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Fri, 25 Oct 2024 22:41:53 +0800 Subject: [PATCH 2/3] [cdc] Add flink execution mode check for cdc table sync action. --- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index cb8cff8b7e1d..f103396389e5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -143,8 +143,8 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() { protected void validateRuntimeExecutionMode() { checkArgument( env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) - != RuntimeExecutionMode.BATCH, - "Don't support batch mode for flink-cdc sync table action."); + == RuntimeExecutionMode.STREAMING, + "It's only support STREAMING mode for flink-cdc sync table action."); } private DataStreamSource buildDataStreamSource(Object source) { From 4655f61f721795f66868498c1590f15f9d5f4157 Mon Sep 17 00:00:00 2001 From: LinMingQiang <1356469429@qq.com> Date: Fri, 25 Oct 2024 23:38:39 +0800 Subject: [PATCH 3/3] [cdc] fix. --- .../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 0783fcac3e4e..861dec33d782 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1514,7 +1514,7 @@ public void testRuntimeExecutionModeCheckForCdcSync() throws Exception { .satisfies( anyCauseMatches( IllegalArgumentException.class, - "Don't support batch mode for flink-cdc sync table action")); + "It's only support STREAMING mode for flink-cdc sync table action")); runActionWithDefaultEnv(action);