Skip to content

Commit

Permalink
[cdc] Complementary flink exec mode check for mongodb and postgres cd…
Browse files Browse the repository at this point in the history
…c table sync. (#4408)
  • Loading branch information
LinMingQiang authored Oct 31, 2024
1 parent 29a89c9 commit 4c1777f
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() {

@Override
protected MongoDBSource<CdcSourceRecord> buildSource() {
validateRuntimeExecutionMode();
return MongoDBActionUtils.buildMongodbSource(
cdcSourceConfig,
CdcActionCommonUtils.combinedModeTableList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected CdcTimestampExtractor createCdcTimestampExtractor() {

@Override
protected MongoDBSource<CdcSourceRecord> buildSource() {
validateRuntimeExecutionMode();
String tableList =
cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)
+ "\\."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected Schema retrieveSchema() throws Exception {

@Override
protected JdbcIncrementalSource<CdcSourceRecord> buildSource() {
validateRuntimeExecutionMode();
List<JdbcSchemasInfo.JdbcSchemaInfo> pkTables = postgresSchemasInfo.pkTables();
Set<String> schemaList = new HashSet<>();
String[] tableList = new String[pkTables.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT cases for {@link MongoDBSyncTableAction}. */
public class MongoDBSyncTableActionITCase extends MongoDBActionITCaseBase {
Expand Down Expand Up @@ -398,4 +400,26 @@ public void testComputedColumnWithCaseInsensitive() throws Exception {
rowType,
Collections.singletonList("_id"));
}

@Test
@Timeout(60)
public void testRuntimeExecutionModeCheckForCdcSync() {
Map<String, String> mongodbConfig = getBasicMongoDBConfig();
mongodbConfig.put("database", database);
mongodbConfig.put("collection", "products");
mongodbConfig.put("field.name", "_id,name,description");
mongodbConfig.put("parser.path", "$._id,$.name,$.description");
mongodbConfig.put("schema.start.mode", "specified");

MongoDBSyncTableAction action =
syncTableActionBuilder(mongodbConfig)
.withTableConfig(getBasicTableConfig())
.build();

assertThatThrownBy(() -> runActionWithBatchEnv(action))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"It's only support STREAMING mode for flink-cdc sync table action"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ public void testUnknowMysqlScanStartupMode() {
}

@Test
@Timeout(60)
@Timeout(1000)
public void testRuntimeExecutionModeCheckForCdcSync() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "check_cdc_sync_runtime_execution_mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,29 @@ public void testCatalogAndTableConfig() {
private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}

@Test
@Timeout(60)
public void testRuntimeExecutionModeCheckForCdcSync() {
Map<String, String> postgresConfig = getBasicPostgresConfig();
postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "schema_evolution_\\d+");

PostgresSyncTableAction action =
syncTableActionBuilder(postgresConfig)
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.METASTORE.key(), "test-alter-table"))
.withTableConfig(getBasicTableConfig())
.withPartitionKeys("pt")
.withPrimaryKeys("pt", "_id")
.build();

assertThatThrownBy(() -> runActionWithBatchEnv(action))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"It's only support STREAMING mode for flink-cdc sync table action"));
}
}

0 comments on commit 4c1777f

Please sign in to comment.