Skip to content

Commit

Permalink
[cdc] Add flink execution mode check for cdc table sync action. (#4378)
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang authored Oct 29, 2024
1 parent 8ca7619 commit a0727bd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -53,6 +55,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Base {@link Action} for table/database synchronizing job. */
public abstract class SynchronizationActionBase extends ActionBase {
Expand Down Expand Up @@ -137,6 +140,13 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() {
"Unsupported timestamp extractor for current cdc source.");
}

protected void validateRuntimeExecutionMode() {
checkArgument(
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING,
"It's only support STREAMING mode for flink-cdc sync table action.");
}

private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object source) {
if (source instanceof Source) {
boolean isAutomaticWatermarkCreationEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() {

@Override
protected MySqlSource<CdcSourceRecord> buildSource() {
validateRuntimeExecutionMode();
return MySqlActionUtils.buildMySqlSource(
cdcSourceConfig,
tableList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected Schema retrieveSchema() throws Exception {

@Override
protected MySqlSource<CdcSourceRecord> buildSource() {
validateRuntimeExecutionMode();
String tableList =
String.format(
"(%s)\\.(%s)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.types.RowType;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -214,6 +215,15 @@ protected <T> List<String> nullableToArgs(String argKey, @Nullable T nullable) {
}

public JobClient runActionWithDefaultEnv(ActionBase action) throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
action.withStreamExecutionEnvironment(env).build();
JobClient client = env.executeAsync();
waitJobRunning(client);
return client;
}

public JobClient runActionWithBatchEnv(ActionBase action) throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
action.withStreamExecutionEnvironment(env).build();
JobClient client = env.executeAsync();
waitJobRunning(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1497,4 +1497,39 @@ public void testUnknowMysqlScanStartupMode() {
+ scanStartupMode
+ "'. Valid scan.startup.mode for MySQL CDC are [initial, earliest-offset, latest-offset, specific-offset, timestamp, snapshot]"));
}

@Test
@Timeout(60)
public void testRuntimeExecutionModeCheckForCdcSync() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "check_cdc_sync_runtime_execution_mode");
mySqlConfig.put("table-name", "t");

Map<String, String> tableConfig = getBasicTableConfig();
tableConfig.put(CoreOptions.WRITE_ONLY.key(), "true");

MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();

assertThatThrownBy(() -> runActionWithBatchEnv(action))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"It's only support STREAMING mode for flink-cdc sync table action"));

runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

try (Statement statement = getStatement()) {
statement.executeUpdate("USE check_cdc_sync_runtime_execution_mode");
statement.executeUpdate("INSERT INTO t VALUES (1, 'one'), (2, 'two')");
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
new String[] {"k", "v1"});
List<String> primaryKeys = Collections.singletonList("k");
List<String> expected = Arrays.asList("+I[1, one]", "+I[2, two]");
waitForResult(expected, table, rowType, primaryKeys);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,14 @@ CREATE TABLE t (
k INT PRIMARY KEY,
v1 VARCHAR(10)
);

-- ################################################################################
-- testRuntimeExecutionModeCheckForCdcSync
-- ################################################################################

CREATE DATABASE check_cdc_sync_runtime_execution_mode;
USE check_cdc_sync_runtime_execution_mode;
CREATE TABLE t (
k INT PRIMARY KEY,
v1 VARCHAR(10)
);

0 comments on commit a0727bd

Please sign in to comment.