diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 4a0f7f5f6c465..1b4f0fc786b14 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -97,7 +97,7 @@ Example ### Synchronizing Databases -By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database. +By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema. To use this feature through `flink run`, run the following shell command. @@ -107,6 +107,7 @@ To use this feature through `flink run`, run the following shell command. mysql-sync-database --warehouse \ --database \ + [--sync-to-multiple-db ] \ [--ignore-incompatible ] \ [--table-prefix ] \ [--table-suffix ] \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 1c1dc789c0da3..028e2275b009f 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -33,6 +33,10 @@
--database
The database name in Paimon catalog. + +
--sync-to-multiple-db
+ It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged. if it is true, all the MySQL database will be synchronize to multiple paimon database with the same schema. +
--ignore-incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. @@ -47,7 +51,7 @@
--including-tables
- It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. + It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.
--excluding-tables
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index dc6815f2c3ebd..aa183b61fe16d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -181,7 +181,7 @@ static MySqlSource buildMySqlSource(Configuration mySqlConfig) { .username(mySqlConfig.get(MySqlSourceOptions.USERNAME)) .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(databaseName + "." + tableName); + .tableList(tableName); mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); mySqlConfig diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index 691d59a0fbfc6..61fe0bcf10328 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -63,6 +63,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser { private final ObjectMapper objectMapper = new ObjectMapper(); private final ZoneId serverTimeZone; private final boolean caseSensitive; + private final boolean syncToMultipleDB; private final TableNameConverter tableNameConverter; private final List computedColumns; @@ -72,23 +73,38 @@ public class MySqlDebeziumJsonEventParser implements EventParser { public MySqlDebeziumJsonEventParser( ZoneId serverTimeZone, boolean caseSensitive, List computedColumns) { - this(serverTimeZone, caseSensitive, computedColumns, new TableNameConverter(caseSensitive)); + this( + serverTimeZone, + caseSensitive, + computedColumns, + new TableNameConverter(caseSensitive), + false); } public MySqlDebeziumJsonEventParser( - ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter) { - this(serverTimeZone, caseSensitive, Collections.emptyList(), tableNameConverter); + ZoneId serverTimeZone, + boolean caseSensitive, + TableNameConverter tableNameConverter, + boolean syncToMultipleDB) { + this( + serverTimeZone, + caseSensitive, + Collections.emptyList(), + tableNameConverter, + syncToMultipleDB); } public MySqlDebeziumJsonEventParser( ZoneId serverTimeZone, boolean caseSensitive, List computedColumns, - TableNameConverter tableNameConverter) { + TableNameConverter tableNameConverter, + Boolean syncToMultipleDB) { this.serverTimeZone = serverTimeZone; this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; this.tableNameConverter = tableNameConverter; + this.syncToMultipleDB = syncToMultipleDB; } @Override @@ -114,7 +130,12 @@ public void setRawEvent(String rawEvent) { @Override public String parseTableName() { String tableName = payload.get("source").get("table").asText(); - return tableNameConverter.convert(tableName); + if (syncToMultipleDB) { + String dbName = payload.get("source").get("db").asText(); + return dbName + "." + tableNameConverter.convert(tableName); + } else { + return tableNameConverter.convert(tableName); + } } private void updateFieldTypes(JsonNode schema) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index b057070cc9db8..4497c0246e372 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -31,6 +31,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.HashMultimap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Multimap; + import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -47,12 +50,17 @@ import java.sql.ResultSet; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.paimon.flink.action.Action.optionalConfigMap; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -99,6 +107,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { private final Configuration mySqlConfig; private final String database; + private final boolean syncToMultipleDB; private final boolean ignoreIncompatible; private final String tablePrefix; private final String tableSuffix; @@ -110,6 +119,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, Map catalogConfig, Map tableConfig) { @@ -117,6 +127,7 @@ public MySqlSyncDatabaseAction( mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, null, null, @@ -130,6 +141,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, @Nullable String tablePrefix, @Nullable String tableSuffix, @@ -140,6 +152,7 @@ public MySqlSyncDatabaseAction( super(warehouse, catalogConfig); this.mySqlConfig = Configuration.fromMap(mySqlConfig); this.database = database; + this.syncToMultipleDB = syncToMultipleDB; this.ignoreIncompatible = ignoreIncompatible; this.tablePrefix = tablePrefix == null ? "" : tablePrefix; this.tableSuffix = tableSuffix == null ? "" : tableSuffix; @@ -157,18 +170,45 @@ public void build(StreamExecutionEnvironment env) throws Exception { + "use mysql-sync-table instead."); boolean caseSensitive = catalog.caseSensitive(); + List mySqlSchemas = getMySqlSchemaList(); + Set databases = + mySqlSchemas.stream().map(MySqlSchema::databaseName).collect(Collectors.toSet()); + if (!caseSensitive) { - validateCaseInsensitive(); + validateCaseInsensitive(databases); } - List mySqlSchemas = getMySqlSchemaList(); checkArgument( mySqlSchemas.size() > 0, "No tables found in MySQL database " + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME) + ", or MySQL database does not exist."); - catalog.createDatabase(database, true); + Multimap map = HashMultimap.create(); + + if (syncToMultipleDB) { + for (String database : databases) { + catalog.createDatabase(database, true); + } + } else { + catalog.createDatabase(database, true); + mySqlSchemas.forEach(m -> map.put(m.tableName(), m)); + mySqlSchemas.clear(); + for (Map.Entry> entries : map.asMap().entrySet()) { + + MySqlSchema schema = + entries.getValue().stream() + // Ensure that the field order of each merge remains consistent. + .sorted(Comparator.comparing(MySqlSchema::databaseName)) + .reduce(MySqlSchema::merge) + .orElseThrow( + () -> + new RuntimeException( + "No table satisfies the given database name and table name")); + mySqlSchemas.add(schema); + } + } + TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, tablePrefix, tableSuffix); @@ -176,7 +216,13 @@ public void build(StreamExecutionEnvironment env) throws Exception { List monitoredTables = new ArrayList<>(); for (MySqlSchema mySqlSchema : mySqlSchemas) { String paimonTableName = tableNameConverter.convert(mySqlSchema.tableName()); - Identifier identifier = new Identifier(database, paimonTableName); + Identifier identifier; + if (syncToMultipleDB) { + identifier = new Identifier(mySqlSchema.databaseName(), paimonTableName); + } else { + identifier = new Identifier(database, paimonTableName); + } + FileStoreTable table; Schema fromMySql = MySqlActionUtils.buildPaimonSchema( @@ -191,14 +237,15 @@ public void build(StreamExecutionEnvironment env) throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), mySqlSchema, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - monitoredTables.add(mySqlSchema.tableName()); fileStoreTables.add(table); + addMonitorTable(map, monitoredTables, mySqlSchema); } } catch (Catalog.TableNotExistException e) { catalog.createTable(identifier, fromMySql, false); table = (FileStoreTable) catalog.getTable(identifier); - monitoredTables.add(mySqlSchema.tableName()); + fileStoreTables.add(table); + addMonitorTable(map, monitoredTables, mySqlSchema); } } @@ -213,8 +260,13 @@ public void build(StreamExecutionEnvironment env) throws Exception { String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE); ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone); + + // if we use syncToMultipleDB directly, it will throw an NotSerializableException. + boolean enableMultipleDB = syncToMultipleDB; EventParser.Factory parserFactory = - () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, tableNameConverter); + () -> + new MySqlDebeziumJsonEventParser( + zoneId, caseSensitive, tableNameConverter, enableMultipleDB); FlinkCdcSyncDatabaseSinkBuilder sinkBuilder = new FlinkCdcSyncDatabaseSinkBuilder() @@ -222,6 +274,7 @@ public void build(StreamExecutionEnvironment env) throws Exception { env.fromSource( source, WatermarkStrategy.noWatermarks(), "MySQL Source")) .withParserFactory(parserFactory) + .withSyncMultipleDB(syncToMultipleDB) .withTables(fileStoreTables); String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); if (sinkParallelism != null) { @@ -230,12 +283,35 @@ public void build(StreamExecutionEnvironment env) throws Exception { sinkBuilder.build(); } - private void validateCaseInsensitive() { - checkArgument( - database.equals(database.toLowerCase()), - String.format( - "Database name [%s] cannot contain upper case in case-insensitive catalog.", - database)); + private void addMonitorTable( + Multimap map, + List monitoredTables, + MySqlSchema mySqlSchema) { + if (syncToMultipleDB) { + monitoredTables.add(mySqlSchema.databaseName() + "." + mySqlSchema.tableName()); + } else { + map.get(mySqlSchema.tableName()) + .forEach(m -> monitoredTables.add(m.databaseName() + "." + m.tableName())); + } + } + + private void validateCaseInsensitive(Set databases) { + if (syncToMultipleDB) { + for (String database : databases) { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + } else { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + checkArgument( tablePrefix.equals(tablePrefix.toLowerCase()), String.format( @@ -249,21 +325,32 @@ private void validateCaseInsensitive() { } private List getMySqlSchemaList() throws Exception { - String databaseName = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME); + Pattern databasePattern = + Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)); List mySqlSchemaList = new ArrayList<>(); try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) { DatabaseMetaData metaData = conn.getMetaData(); - try (ResultSet tables = - metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) { - while (tables.next()) { - String tableName = tables.getString("TABLE_NAME"); - if (!shouldMonitorTable(tableName)) { - continue; - } - MySqlSchema mySqlSchema = new MySqlSchema(metaData, databaseName, tableName); - if (mySqlSchema.primaryKeys().size() > 0) { - // only tables with primary keys will be considered - mySqlSchemaList.add(mySqlSchema); + try (ResultSet schemas = metaData.getCatalogs()) { + while (schemas.next()) { + String databaseName = schemas.getString("TABLE_CAT"); + Matcher databaseMatcher = databasePattern.matcher(databaseName); + if (databaseMatcher.matches()) { + try (ResultSet tables = metaData.getTables(databaseName, null, "%", null)) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + + if (!shouldMonitorTable(databaseName + "." + tableName)) { + continue; + } + + MySqlSchema mySqlSchema = + new MySqlSchema(metaData, databaseName, tableName); + if (mySqlSchema.primaryKeys().size() > 0) { + // only tables with primary keys will be considered + mySqlSchemaList.add(mySqlSchema); + } + } + } } } } @@ -326,6 +413,7 @@ public static Optional create(String[] args) { String warehouse = params.get("warehouse"); String database = params.get("database"); boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); + boolean syncToMultipleDB = Boolean.parseBoolean(params.get("sync-to-multiple-db")); String tablePrefix = params.get("table-prefix"); String tableSuffix = params.get("table-suffix"); String includingTables = params.get("including-tables"); @@ -343,6 +431,7 @@ public static Optional create(String[] args) { mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, tablePrefix, tableSuffix, @@ -356,7 +445,7 @@ private static void printHelp() { System.out.println( "Action \"mysql-sync-database\" creates a streaming job " + "with a Flink MySQL CDC source and multiple Paimon table sinks " - + "to synchronize a whole MySQL database into one Paimon database.\n" + + "to synchronize one or multiple MySQL database into one or multiple Paimon database.\n" + "Only MySQL tables with primary keys will be considered. " + "Newly created MySQL tables after the job starts will not be included."); System.out.println(); @@ -364,6 +453,7 @@ private static void printHelp() { System.out.println("Syntax:"); System.out.println( " mysql-sync-database --warehouse --database " + + "[--sync-to-multiple-db ] " + "[--ignore-incompatible ] " + "[--table-prefix ] " + "[--table-suffix ] " @@ -374,6 +464,12 @@ private static void printHelp() { + "[--table-conf [--table-conf ...]]"); System.out.println(); + System.out.println( + "--sync-to-multiple-db is default false, in this case, all the MySQL database will be synchronized to " + + "one paimon database, and the table with same name in different database will be merged. " + + "if it is true, all the MySQL database will be synchronize to multiple paimon database with the same schema."); + System.out.println(); + System.out.println( "--ignore-incompatible is default false, in this case, if MySQL table name exists in Paimon " + "and their schema is incompatible, an exception will be thrown. " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index 17466a9478162..27330acd54828 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -137,7 +137,6 @@ public MySqlSyncTableAction( } public void build(StreamExecutionEnvironment env) throws Exception { - MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); boolean caseSensitive = catalog.caseSensitive(); @@ -145,14 +144,24 @@ public void build(StreamExecutionEnvironment env) throws Exception { validateCaseInsensitive(); } + List mySqlSchemaList = getMySqlSchemaList(); + + String tableNames = + mySqlSchemaList.stream() + .map(m -> m.databaseName() + "." + m.tableName()) + .collect(Collectors.joining("|")); + MySqlSchema mySqlSchema = - getMySqlSchemaList().stream() + mySqlSchemaList.stream() .reduce(MySqlSchema::merge) .orElseThrow( () -> new RuntimeException( "No table satisfies the given database name and table name")); + mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, "(" + tableNames + ")"); + MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); + catalog.createDatabase(database, true); Identifier identifier = new Identifier(database, table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index b62992d08537a..0fb8c0ddc9b05 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -52,6 +52,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder { private Lock.Factory lockFactory = Lock.emptyFactory(); @Nullable private Integer parallelism; + private boolean syncToMultipleDB = false; public FlinkCdcSyncDatabaseSinkBuilder withInput(DataStream input) { this.input = input; @@ -64,6 +65,11 @@ public FlinkCdcSyncDatabaseSinkBuilder withParserFactory( return this; } + public FlinkCdcSyncDatabaseSinkBuilder withSyncMultipleDB(Boolean syncToMultipleDB) { + this.syncToMultipleDB = syncToMultipleDB; + return this; + } + public FlinkCdcSyncDatabaseSinkBuilder withTables(List tables) { this.tables = tables; return this; @@ -91,11 +97,19 @@ public void build() { .setParallelism(input.getParallelism()); for (FileStoreTable table : tables) { + String tableName; + if (syncToMultipleDB) { + String db = table.location().getParent().getName().replace(".db", ""); + tableName = db + "." + table.name(); + } else { + tableName = table.name(); + } + DataStream schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput( parsed, CdcMultiTableParsingProcessFunction - .createUpdatedDataFieldsOutputTag(table.name())) + .createUpdatedDataFieldsOutputTag(tableName)) .process( new UpdatedDataFieldsProcessFunction( new SchemaManager(table.fileIO(), table.location()))); @@ -109,7 +123,7 @@ public void build() { SingleOutputStreamOperatorUtils.getSideOutput( parsed, CdcMultiTableParsingProcessFunction - .createRecordOutputTag(table.name())) + .createRecordOutputTag(tableName)) .getTransformation(), partitioner); if (parallelism != null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index c4f960925b5ed..b48b140c30069 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -74,6 +74,7 @@ public void testSchemaEvolution() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -204,6 +205,110 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { waitForResult(expected, table2, rowType2, primaryKeys2); } + private void testSyncToMultipleDatabaseImpl( + boolean enableMultipleDB, String includingTables, String excludingTables) + throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put( + "database-name", "paimon_sync_database_multiple1|paimon_sync_database_multiple2"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + MySqlSyncDatabaseAction action = + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + enableMultipleDB, + false, + null, + null, + includingTables, + excludingTables, + Collections.emptyMap(), + Collections.emptyMap()); + + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + } + + @Test + @Timeout(60) + public void testEnableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(true, null, null); + FileStoreTable table1 = getFileStoreTable("t1", "paimon_sync_database_multiple1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v0"}); + List expected = Collections.singletonList("+I[1, flink]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2", "paimon_sync_database_multiple1"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t1", "paimon_sync_database_multiple2"); + List primaryKeys3 = Collections.singletonList("k1"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v1"}); + expected = Collections.singletonList("+I[3, three]"); + waitForResult(expected, table3, rowType, primaryKeys3); + + FileStoreTable table4 = getFileStoreTable("t3", "paimon_sync_database_multiple2"); + List primaryKeys4 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table4, rowType, primaryKeys4); + } + + @Test + @Timeout(60) + public void testDisableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(false, null, null); + FileStoreTable table1 = getFileStoreTable("t1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10) + }, + new String[] {"k1", "v0", "v1"}); + List expected = Arrays.asList("+I[1, flink, NULL]", "+I[3, NULL, three]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t3"); + List primaryKeys3 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table3, rowType, primaryKeys3); + } + @Test public void testSpecifiedMySqlTable() { Map mySqlConfig = getBasicMySqlConfig(); @@ -217,6 +322,7 @@ public void testSpecifiedMySqlTable() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -244,6 +350,7 @@ public void testInvalidDatabase() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -287,6 +394,7 @@ public void testIgnoreIncompatibleTables() throws Exception { mySqlConfig, warehouse, database, + false, true, Collections.emptyMap(), tableConfig); @@ -353,6 +461,7 @@ public void testTableAffix() throws Exception { warehouse, database, false, + false, "test_prefix_", "_test_suffix", null, @@ -476,7 +585,7 @@ private void testTableAffixImpl(Statement statement) throws Exception { public void testIncludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_including", - "flink|paimon.+", + "paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored")); @@ -488,7 +597,7 @@ public void testExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_excluding", null, - "flink|paimon.+", + "paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2")); } @@ -498,8 +607,8 @@ public void testExcludingTables() throws Exception { public void testIncludingAndExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_in_excluding", - "flink|paimon.+", - "paimon_1", + "paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+", + "paimon_sync_database_in_excluding.paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test")); } @@ -527,6 +636,7 @@ private void includingAndExcludingTablesImpl( warehouse, database, false, + false, null, null, includingTables, @@ -560,7 +670,7 @@ public void testIgnoreCase() throws Exception { MySqlSyncDatabaseAction action = new MySqlSyncDatabaseAction( - mySqlConfig, warehouse, database, false, catalogConfig, tableConfig); + mySqlConfig, warehouse, database, false, false, catalogConfig, tableConfig); action.build(env); JobClient client = env.executeAsync(); waitJobRunning(client); @@ -619,9 +729,11 @@ public void testIgnoreCase() throws Exception { } } - private FileStoreTable getFileStoreTable(String tableName) throws Exception { + private FileStoreTable getFileStoreTable(String tableName, String... databaseName) + throws Exception { Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); - Identifier identifier = Identifier.create(database, tableName); + Identifier identifier = + Identifier.create(databaseName.length > 0 ? databaseName[0] : database, tableName); return (FileStoreTable) catalog.getTable(identifier); } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql index 839c0ab796b4c..403e896beb4bb 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql @@ -456,3 +456,44 @@ CREATE TABLE T ( UPPERCASE_V0 VARCHAR(20), PRIMARY KEY (k) ); + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testEnableSyncToMultipleDB +-- MySqlSyncDatabaseActionITCase#testDisableSyncToMultipleDB +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_multiple1; +USE paimon_sync_database_multiple1; + +CREATE TABLE t1 ( + k1 INT, + v0 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t2 ( + k2 INT, + v2 VARCHAR(10), + PRIMARY KEY (k2) +); + +INSERT INTO t1 VALUES(1, 'flink'); +INSERT INTO t2 VALUES(2, 'paimon'); + +CREATE DATABASE paimon_sync_database_multiple2; +USE paimon_sync_database_multiple2; + +CREATE TABLE t1 ( + k1 INT, + v1 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t3 ( + k3 INT, + v3 VARCHAR(10), + PRIMARY KEY (k3) +); + +INSERT INTO t1 VALUES(3, 'three'); +INSERT INTO t3 VALUES(4, 'four');