diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 2b9b08917700..f103396389e5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -32,10 +32,12 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -53,6 +55,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Base {@link Action} for table/database synchronizing job. */ public abstract class SynchronizationActionBase extends ActionBase { @@ -137,6 +140,13 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() { "Unsupported timestamp extractor for current cdc source."); } + protected void validateRuntimeExecutionMode() { + checkArgument( + env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING, + "It's only support STREAMING mode for flink-cdc sync table action."); + } + private DataStreamSource buildDataStreamSource(Object source) { if (source instanceof Source) { boolean isAutomaticWatermarkCreationEnabled = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 01be020f7405..f8ea8cdc4438 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -188,6 +188,7 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() { @Override protected MySqlSource buildSource() { + validateRuntimeExecutionMode(); return MySqlActionUtils.buildMySqlSource( cdcSourceConfig, tableList( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index a05832b1d033..d73d9702f1e1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -102,6 +102,7 @@ protected Schema retrieveSchema() throws Exception { @Override protected MySqlSource buildSource() { + validateRuntimeExecutionMode(); String tableList = String.format( "(%s)\\.(%s)", diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 08289569086a..00a8b236173b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -38,6 +38,7 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterEach; @@ -214,6 +215,15 @@ protected List nullableToArgs(String argKey, @Nullable T nullable) { } public JobClient runActionWithDefaultEnv(ActionBase action) throws Exception { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + action.withStreamExecutionEnvironment(env).build(); + JobClient client = env.executeAsync(); + waitJobRunning(client); + return client; + } + + public JobClient runActionWithBatchEnv(ActionBase action) throws Exception { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); action.withStreamExecutionEnvironment(env).build(); JobClient client = env.executeAsync(); waitJobRunning(client); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index ce18d0b1f0e8..861dec33d782 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1497,4 +1497,39 @@ public void testUnknowMysqlScanStartupMode() { + scanStartupMode + "'. Valid scan.startup.mode for MySQL CDC are [initial, earliest-offset, latest-offset, specific-offset, timestamp, snapshot]")); } + + @Test + @Timeout(60) + public void testRuntimeExecutionModeCheckForCdcSync() throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "check_cdc_sync_runtime_execution_mode"); + mySqlConfig.put("table-name", "t"); + + Map tableConfig = getBasicTableConfig(); + tableConfig.put(CoreOptions.WRITE_ONLY.key(), "true"); + + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + + assertThatThrownBy(() -> runActionWithBatchEnv(action)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "It's only support STREAMING mode for flink-cdc sync table action")); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + try (Statement statement = getStatement()) { + statement.executeUpdate("USE check_cdc_sync_runtime_execution_mode"); + statement.executeUpdate("INSERT INTO t VALUES (1, 'one'), (2, 'two')"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + List primaryKeys = Collections.singletonList("k"); + List expected = Arrays.asList("+I[1, one]", "+I[2, two]"); + waitForResult(expected, table, rowType, primaryKeys); + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 965f884ec680..10a0f20d45aa 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -445,3 +445,14 @@ CREATE TABLE t ( k INT PRIMARY KEY, v1 VARCHAR(10) ); + +-- ################################################################################ +-- testRuntimeExecutionModeCheckForCdcSync +-- ################################################################################ + +CREATE DATABASE check_cdc_sync_runtime_execution_mode; +USE check_cdc_sync_runtime_execution_mode; +CREATE TABLE t ( + k INT PRIMARY KEY, + v1 VARCHAR(10) +); \ No newline at end of file