diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 9c49ea2856849..8c19c206c41cf 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -98,7 +98,7 @@ Example ### Synchronizing Databases -By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database. +By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema. To use this feature through `flink run`, run the following shell command. @@ -108,6 +108,7 @@ To use this feature through `flink run`, run the following shell command. mysql-sync-database --warehouse \ --database \ + [--sync-to-multiple-db ] \ [--ignore-incompatible ] \ [--table-prefix ] \ [--table-suffix ] \ @@ -127,6 +128,10 @@ For each MySQL table to be synchronized, if the corresponding Paimon table does Example 1: synchronize entire database +You can synchronize one or multiple mysql database to paimon, if you want to synchronize one mysql database, +you can set the `database-name` parameter to a specific database name, if you want to synchronize multiple mysql database, +you can set the `database-name` parameter to a regular expression, the table with same name in different database will be merged to the same paimon table(you need to set `sync-to-multiple-db` to `false` at the same time). + ```bash /bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 8336fe6c4877d..4e86e20d9c3a1 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -33,6 +33,10 @@
--database
The database name in Paimon catalog. + +
--sync-to-multiple-db
+supp It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged, it is suitable for database sharding scenarios. if it is true, the parameter "--database" will be ignored, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL before, it is suitable for scenarios with a large number of databases under a database instance, which can save resources. +
--ignore-incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. @@ -47,7 +51,7 @@
--including-tables
- It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. + It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.
--excluding-tables
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index 756b942791133..56bb7d97f5b1e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -211,7 +211,7 @@ static MySqlSource buildMySqlSource(Configuration mySqlConfig) { .username(mySqlConfig.get(MySqlSourceOptions.USERNAME)) .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(databaseName + "." + tableName); + .tableList(tableName); mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); mySqlConfig diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index 4d17a6a8fff99..a9379a6dee2b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -72,6 +72,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser { private final ObjectMapper objectMapper = new ObjectMapper(); private final ZoneId serverTimeZone; private final boolean caseSensitive; + private final boolean syncToMultipleDB; private final TableNameConverter tableNameConverter; private final List computedColumns; private final NewTableSchemaBuilder schemaBuilder; @@ -100,7 +101,8 @@ public MySqlDebeziumJsonEventParser( ddl -> Optional.empty(), null, null, - convertTinyint1ToBool); + convertTinyint1ToBool, + false); } public MySqlDebeziumJsonEventParser( @@ -110,7 +112,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - boolean convertTinyint1ToBool) { + boolean convertTinyint1ToBool, + boolean syncToMultipleDB) { this( serverTimeZone, caseSensitive, @@ -119,7 +122,8 @@ public MySqlDebeziumJsonEventParser( schemaBuilder, includingPattern, excludingPattern, - convertTinyint1ToBool); + convertTinyint1ToBool, + syncToMultipleDB); } public MySqlDebeziumJsonEventParser( @@ -130,7 +134,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - boolean convertTinyint1ToBool) { + boolean convertTinyint1ToBool, + Boolean syncToMultipleDB) { this.serverTimeZone = serverTimeZone; this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; @@ -139,6 +144,7 @@ public MySqlDebeziumJsonEventParser( this.includingPattern = includingPattern; this.excludingPattern = excludingPattern; this.convertTinyint1ToBool = convertTinyint1ToBool; + this.syncToMultipleDB = syncToMultipleDB; } @Override @@ -146,16 +152,30 @@ public void setRawEvent(String rawEvent) { try { root = objectMapper.readValue(rawEvent, JsonNode.class); payload = root.get("payload"); - currentTable = payload.get("source").get("table").asText(); + currentTable = + payload.get("source").get("db").asText() + + "." + + payload.get("source").get("table").asText(); shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(); } catch (Exception e) { throw new RuntimeException(e); } } + @Override + public String parseDatabaseName() { + return payload.get("source").get("db").asText(); + } + @Override public String parseTableName() { - return tableNameConverter.convert(currentTable); + String tableName = payload.get("source").get("table").asText(); + if (syncToMultipleDB) { + String dbName = payload.get("source").get("db").asText(); + return dbName + "." + tableNameConverter.convert(tableName); + } else { + return tableNameConverter.convert(tableName); + } } private boolean isSchemaChange() { @@ -245,8 +265,7 @@ public Optional parseNewTable() { JsonNode primaryKeyColumnNames = tableChanges.get(0).get("table").get("primaryKeyColumnNames"); if (primaryKeyColumnNames.size() == 0) { - String id = tableChanges.get(0).get("id").asText(); - String tableName = id.replaceAll("\"", "").split("\\.")[1]; + String tableName = tableChanges.get(0).get("id").asText().replaceAll("\"", ""); LOG.debug( "Didn't find primary keys from MySQL DDL for table '{}'. " + "This table won't be synchronized.", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index c931fca319584..ec80443c5092b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -32,6 +32,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.HashMultimap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Multimap; + import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -47,11 +50,15 @@ import java.sql.ResultSet; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -102,6 +109,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { private final Configuration mySqlConfig; private final String database; + private final boolean syncToMultipleDB; private final boolean ignoreIncompatible; private final String tablePrefix; private final String tableSuffix; @@ -115,6 +123,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, Map catalogConfig, Map tableConfig) { @@ -122,6 +131,7 @@ public MySqlSyncDatabaseAction( mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, null, null, @@ -136,6 +146,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, @Nullable String tablePrefix, @Nullable String tableSuffix, @@ -147,6 +158,7 @@ public MySqlSyncDatabaseAction( super(warehouse, catalogConfig); this.mySqlConfig = Configuration.fromMap(mySqlConfig); this.database = database; + this.syncToMultipleDB = syncToMultipleDB; this.ignoreIncompatible = ignoreIncompatible; this.tablePrefix = tablePrefix == null ? "" : tablePrefix; this.tableSuffix = tableSuffix == null ? "" : tableSuffix; @@ -165,13 +177,15 @@ public void build(StreamExecutionEnvironment env) throws Exception { + "If you want to sync several MySQL tables into one Paimon table, " + "use mysql-sync-table instead."); boolean caseSensitive = catalog.caseSensitive(); + List excludedTables = new LinkedList<>(); + List mySqlSchemas = getMySqlSchemaList(excludedTables); + Set databases = + mySqlSchemas.stream().map(MySqlSchema::databaseName).collect(Collectors.toSet()); if (!caseSensitive) { - validateCaseInsensitive(); + validateCaseInsensitive(databases); } - List excludedTables = new LinkedList<>(); - List mySqlSchemas = getMySqlSchemaList(excludedTables); String mySqlDatabase = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME); checkArgument( mySqlSchemas.size() > 0, @@ -179,7 +193,31 @@ public void build(StreamExecutionEnvironment env) throws Exception { + mySqlDatabase + ", or MySQL database does not exist."); - catalog.createDatabase(database, true); + Multimap map = HashMultimap.create(); + + if (syncToMultipleDB) { + for (String database : databases) { + catalog.createDatabase(database, true); + } + } else { + catalog.createDatabase(database, true); + mySqlSchemas.forEach(m -> map.put(m.tableName(), m)); + mySqlSchemas.clear(); + for (Map.Entry> entries : map.asMap().entrySet()) { + + MySqlSchema schema = + entries.getValue().stream() + // Ensure that the field order of each merge remains consistent. + .sorted(Comparator.comparing(MySqlSchema::databaseName)) + .reduce(MySqlSchema::merge) + .orElseThrow( + () -> + new RuntimeException( + "No table satisfies the given database name and table name")); + mySqlSchemas.add(schema); + } + } + TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, tablePrefix, tableSuffix); @@ -187,7 +225,13 @@ public void build(StreamExecutionEnvironment env) throws Exception { List monitoredTables = new ArrayList<>(); for (MySqlSchema mySqlSchema : mySqlSchemas) { String paimonTableName = tableNameConverter.convert(mySqlSchema.tableName()); - Identifier identifier = new Identifier(database, paimonTableName); + Identifier identifier; + if (syncToMultipleDB) { + identifier = new Identifier(mySqlSchema.databaseName(), paimonTableName); + } else { + identifier = new Identifier(database, paimonTableName); + } + FileStoreTable table; Schema fromMySql = MySqlActionUtils.buildPaimonSchema( @@ -202,14 +246,15 @@ public void build(StreamExecutionEnvironment env) throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), mySqlSchema, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - monitoredTables.add(mySqlSchema.tableName()); fileStoreTables.add(table); + addMonitorTable(map, monitoredTables, mySqlSchema); } } catch (Catalog.TableNotExistException e) { catalog.createTable(identifier, fromMySql, false); table = (FileStoreTable) catalog.getTable(identifier); - monitoredTables.add(mySqlSchema.tableName()); + fileStoreTables.add(table); + addMonitorTable(map, monitoredTables, mySqlSchema); } } @@ -241,6 +286,9 @@ public void build(StreamExecutionEnvironment env) throws Exception { Pattern includingPattern = this.includingPattern; Pattern excludingPattern = this.excludingPattern; Boolean convertTinyint1ToBool = mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL); + + // if we use syncToMultipleDB variable directly, it will throw an NotSerializableException. + boolean enableMultipleDB = syncToMultipleDB; EventParser.Factory parserFactory = () -> new MySqlDebeziumJsonEventParser( @@ -250,10 +298,12 @@ public void build(StreamExecutionEnvironment env) throws Exception { schemaBuilder, includingPattern, excludingPattern, - convertTinyint1ToBool); + convertTinyint1ToBool, + enableMultipleDB); String database = this.database; DatabaseSyncMode mode = this.mode; + FlinkCdcSyncDatabaseSinkBuilder sinkBuilder = new FlinkCdcSyncDatabaseSinkBuilder() .withInput( @@ -263,8 +313,9 @@ public void build(StreamExecutionEnvironment env) throws Exception { .withDatabase(database) .withCatalogLoader(catalogLoader()) .withTables(fileStoreTables) - .withMode(mode); - + .withMode(mode) + .withSyncMultipleDB(syncToMultipleDB) + .withTables(fileStoreTables); String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); if (sinkParallelism != null) { sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); @@ -273,12 +324,35 @@ public void build(StreamExecutionEnvironment env) throws Exception { sinkBuilder.build(); } - private void validateCaseInsensitive() { - checkArgument( - database.equals(database.toLowerCase()), - String.format( - "Database name [%s] cannot contain upper case in case-insensitive catalog.", - database)); + private void addMonitorTable( + Multimap map, + List monitoredTables, + MySqlSchema mySqlSchema) { + if (syncToMultipleDB) { + monitoredTables.add(mySqlSchema.databaseName() + "." + mySqlSchema.tableName()); + } else { + map.get(mySqlSchema.tableName()) + .forEach(m -> monitoredTables.add(m.databaseName() + "." + m.tableName())); + } + } + + private void validateCaseInsensitive(Set databases) { + if (syncToMultipleDB) { + for (String database : databases) { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + } else { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + checkArgument( tablePrefix.equals(tablePrefix.toLowerCase()), String.format( @@ -292,29 +366,40 @@ private void validateCaseInsensitive() { } private List getMySqlSchemaList(List excludedTables) throws Exception { - String databaseName = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME); + Pattern databasePattern = + Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)); List mySqlSchemaList = new ArrayList<>(); try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) { DatabaseMetaData metaData = conn.getMetaData(); - try (ResultSet tables = - metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) { - while (tables.next()) { - String tableName = tables.getString("TABLE_NAME"); - if (!shouldMonitorTable(tableName)) { - excludedTables.add(tableName); - continue; - } - MySqlSchema mySqlSchema = - new MySqlSchema( - metaData, - databaseName, - tableName, - mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL)); - if (mySqlSchema.primaryKeys().size() > 0) { - // only tables with primary keys will be considered - mySqlSchemaList.add(mySqlSchema); - } else { - excludedTables.add(tableName); + try (ResultSet schemas = metaData.getCatalogs()) { + while (schemas.next()) { + String databaseName = schemas.getString("TABLE_CAT"); + Matcher databaseMatcher = databasePattern.matcher(databaseName); + if (databaseMatcher.matches()) { + try (ResultSet tables = metaData.getTables(databaseName, null, "%", null)) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String fullMysqlTableName = databaseName + "." + tableName; + if (!shouldMonitorTable(fullMysqlTableName)) { + excludedTables.add(fullMysqlTableName); + continue; + } + + MySqlSchema mySqlSchema = + new MySqlSchema( + metaData, + databaseName, + tableName, + mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL)); + if (mySqlSchema.primaryKeys().size() > 0) { + // only tables with primary keys will be considered + mySqlSchemaList.add(mySqlSchema); + } else { + + excludedTables.add(fullMysqlTableName); + } + } + } } } } @@ -323,6 +408,7 @@ private List getMySqlSchemaList(List excludedTables) throws } private boolean shouldMonitorTable(String mySqlTableName) { + boolean shouldMonitor = includingPattern.matcher(mySqlTableName).matches(); if (excludingPattern != null) { shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index 0a6723bf7fefd..b2ac370f38f43 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -49,6 +49,7 @@ public Optional create(MultipleParameterTool params) { String warehouse = params.get("warehouse"); String database = params.get("database"); boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); + boolean syncToMultipleDB = Boolean.parseBoolean(params.get("--sync-to-multiple-db")); String tablePrefix = params.get("table-prefix"); String tableSuffix = params.get("table-suffix"); String includingTables = params.get("including-tables"); @@ -79,6 +80,7 @@ public Optional create(MultipleParameterTool params) { mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, tablePrefix, tableSuffix, @@ -94,7 +96,7 @@ public void printHelp() { System.out.println( "Action \"mysql-sync-database\" creates a streaming job " + "with a Flink MySQL CDC source and multiple Paimon table sinks " - + "to synchronize a whole MySQL database into one Paimon database.\n" + + "to synchronize one or multiple MySQL database into one or multiple Paimon database.\n" + "Only MySQL tables with primary keys will be considered. " + "Newly created MySQL tables after the job starts will not be included."); System.out.println(); @@ -102,6 +104,7 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " mysql-sync-database --warehouse --database " + + "[--sync-to-multiple-db ] " + "[--ignore-incompatible ] " + "[--table-prefix ] " + "[--table-suffix ] " @@ -113,6 +116,13 @@ public void printHelp() { + "[--table-conf [--table-conf ...]]"); System.out.println(); + System.out.println( + "--sync-to-multiple-db is default false, in this case, all the MySQL database will be synchronized to one paimon database, " + + "and the table with same name in different database will be merged, it is suitable for database sharding scenarios. " + + "if it is true, the parameter \"--database\" will be ignored, all the MySQL database will be synchronize to multiple " + + "paimon database with the same schema as MySQL before, it is suitable for scenarios with a large number of databases under a database instance, which can save resources."); + System.out.println(); + System.out.println( "--ignore-incompatible is default false, in this case, if MySQL table name exists in Paimon " + "and their schema is incompatible, an exception will be thrown. " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index fef0bdb727ab1..4f02c4623a170 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -132,7 +133,6 @@ public MySqlSyncTableAction( } public void build(StreamExecutionEnvironment env) throws Exception { - MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); boolean caseSensitive = catalog.caseSensitive(); @@ -140,14 +140,24 @@ public void build(StreamExecutionEnvironment env) throws Exception { validateCaseInsensitive(); } + List mySqlSchemaList = getMySqlSchemaList(); + + String tableNames = + mySqlSchemaList.stream() + .map(m -> m.databaseName() + "." + m.tableName()) + .collect(Collectors.joining("|")); + MySqlSchema mySqlSchema = - getMySqlSchemaList().stream() + mySqlSchemaList.stream() .reduce(MySqlSchema::merge) .orElseThrow( () -> new RuntimeException( "No table satisfies the given database name and table name")); + mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, "(" + tableNames + ")"); + MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); + catalog.createDatabase(database, true); Identifier identifier = new Identifier(database, table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java index 4f2e4a3b91419..e21928feef9dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java @@ -57,16 +57,20 @@ public class CdcDynamicTableParsingProcessFunction extends ProcessFunction parserFactory; private final String database; private final Catalog.Loader catalogLoader; + private final boolean syncToMultipleDB; private transient EventParser parser; private transient Catalog catalog; public CdcDynamicTableParsingProcessFunction( - String database, Catalog.Loader catalogLoader, EventParser.Factory parserFactory) { - // for now, only support single database + String database, + Catalog.Loader catalogLoader, + EventParser.Factory parserFactory, + boolean syncToMultipleDB) { this.database = database; this.catalogLoader = catalogLoader; this.parserFactory = parserFactory; + this.syncToMultipleDB = syncToMultipleDB; } @Override @@ -79,19 +83,22 @@ public void open(Configuration parameters) throws Exception { public void processElement(T raw, Context context, Collector collector) throws Exception { parser.setRawEvent(raw); - // CDC Ingestion only supports single database at this time being. - // In the future, there will be a mapping between source databases - // and target paimon databases - // TODO: support multiple databases - // String databaseName = parser.parseDatabaseName(); - String tableName = parser.parseTableName(); + String finalDatabaseName; + String finalTableName; + if (syncToMultipleDB) { + finalDatabaseName = parser.parseDatabaseName(); + finalTableName = parser.parseTableName().split("\\.")[1]; + } else { + finalDatabaseName = database; + finalTableName = parser.parseTableName(); + } // check for newly added table parser.parseNewTable() .ifPresent( schema -> { Identifier identifier = - new Identifier(database, parser.parseTableName()); + new Identifier(finalDatabaseName, finalTableName); try { catalog.createTable(identifier, schema, true); } catch (Throwable ignored) { @@ -102,7 +109,7 @@ public void processElement(T raw, Context context, Collector collector) th if (schemaChange.size() > 0) { context.output( DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG, - Tuple2.of(Identifier.create(database, tableName), schemaChange)); + Tuple2.of(Identifier.create(finalDatabaseName, finalTableName), schemaChange)); } parser.parseRecords() @@ -110,7 +117,7 @@ public void processElement(T raw, Context context, Collector collector) th record -> context.output( DYNAMIC_OUTPUT_TAG, - wrapRecord(database, tableName, record))); + wrapRecord(finalDatabaseName, finalTableName, record))); } private CdcMultiplexRecord wrapRecord(String databaseName, String tableName, CdcRecord record) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java index c0e481012854f..e60216dc3fd70 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java @@ -42,6 +42,10 @@ default String parseTableName() { throw new UnsupportedOperationException("Table name is not supported in this parser."); } + default String parseDatabaseName() { + throw new UnsupportedOperationException("Database name is not supported in this parser."); + } + /** * Parse new schema if this event contains schema change. * diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 0070f3ae99afa..e51c9f19e843f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -66,9 +66,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder { // Paimon tables. 2) in multiplex sink where it is used to // initialize different writers to multiple tables. private Catalog.Loader catalogLoader; - // database to sync, currently only support single database private String database; private DatabaseSyncMode mode; + private boolean syncToMultipleDB = false; public FlinkCdcSyncDatabaseSinkBuilder withInput(DataStream input) { this.input = input; @@ -81,6 +81,11 @@ public FlinkCdcSyncDatabaseSinkBuilder withParserFactory( return this; } + public FlinkCdcSyncDatabaseSinkBuilder withSyncMultipleDB(Boolean syncToMultipleDB) { + this.syncToMultipleDB = syncToMultipleDB; + return this; + } + public FlinkCdcSyncDatabaseSinkBuilder withTables(List tables) { this.tables = tables; return this; @@ -124,7 +129,7 @@ private void buildCombinedCdcSink() { input.forward() .process( new CdcDynamicTableParsingProcessFunction<>( - database, catalogLoader, parserFactory)) + database, catalogLoader, parserFactory, syncToMultipleDB)) .setParallelism(input.getParallelism()); // for newly-added tables, create a multiplexing operator that handles all their records @@ -171,15 +176,26 @@ private void buildDividedCdcSink() { .setParallelism(input.getParallelism()); for (FileStoreTable table : tables) { + String tableName; + Identifier identifier; + if (syncToMultipleDB) { + String db = table.location().getParent().getName().replace(".db", ""); + tableName = db + "." + table.name(); + identifier = Identifier.create(db, table.name()); + } else { + tableName = table.name(); + identifier = Identifier.create(database, tableName); + } + DataStream schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput( parsed, CdcMultiTableParsingProcessFunction - .createUpdatedDataFieldsOutputTag(table.name())) + .createUpdatedDataFieldsOutputTag(tableName)) .process( new UpdatedDataFieldsProcessFunction( new SchemaManager(table.fileIO(), table.location()), - Identifier.create(database, table.name()), + identifier, catalogLoader)); schemaChangeProcessFunction.getTransformation().setParallelism(1); schemaChangeProcessFunction.getTransformation().setMaxParallelism(1); @@ -187,8 +203,7 @@ private void buildDividedCdcSink() { DataStream parsedForTable = SingleOutputStreamOperatorUtils.getSideOutput( parsed, - CdcMultiTableParsingProcessFunction.createRecordOutputTag( - table.name())); + CdcMultiTableParsingProcessFunction.createRecordOutputTag(tableName)); BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 0595d6fa04da3..fb36a31fe102d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -94,6 +94,7 @@ public void testSchemaEvolution() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -224,6 +225,111 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { waitForResult(expected, table2, rowType2, primaryKeys2); } + private void testSyncToMultipleDatabaseImpl( + boolean enableMultipleDB, String includingTables, String excludingTables) + throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put( + "database-name", "paimon_sync_database_multiple1|paimon_sync_database_multiple2"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + MySqlSyncDatabaseAction action = + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + enableMultipleDB, + false, + null, + null, + includingTables, + excludingTables, + Collections.emptyMap(), + Collections.emptyMap(), + DIVIDED); + + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + } + + @Test + @Timeout(60) + public void testEnableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(true, null, null); + FileStoreTable table1 = getFileStoreTable("t1", "paimon_sync_database_multiple1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v0"}); + List expected = Collections.singletonList("+I[1, flink]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2", "paimon_sync_database_multiple1"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t1", "paimon_sync_database_multiple2"); + List primaryKeys3 = Collections.singletonList("k1"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v1"}); + expected = Collections.singletonList("+I[3, three]"); + waitForResult(expected, table3, rowType, primaryKeys3); + + FileStoreTable table4 = getFileStoreTable("t3", "paimon_sync_database_multiple2"); + List primaryKeys4 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table4, rowType, primaryKeys4); + } + + @Test + @Timeout(60) + public void testDisableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(false, null, null); + FileStoreTable table1 = getFileStoreTable("t1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10) + }, + new String[] {"k1", "v0", "v1"}); + List expected = Arrays.asList("+I[1, flink, NULL]", "+I[3, NULL, three]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t3"); + List primaryKeys3 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table3, rowType, primaryKeys3); + } + @Test @Timeout(60) public void testSchemaEvolutionWithTinyInt1Convert() throws Exception { @@ -243,6 +349,7 @@ public void testSchemaEvolutionWithTinyInt1Convert() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -334,6 +441,7 @@ public void testSpecifiedMySqlTable() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -361,6 +469,7 @@ public void testInvalidDatabase() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -404,6 +513,7 @@ public void testIgnoreIncompatibleTables() throws Exception { mySqlConfig, warehouse, database, + false, true, Collections.emptyMap(), tableConfig); @@ -470,6 +580,7 @@ public void testTableAffix() throws Exception { warehouse, database, false, + false, "test_prefix_", "_test_suffix", null, @@ -594,7 +705,7 @@ private void testTableAffixImpl(Statement statement) throws Exception { public void testIncludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_including", - "flink|paimon.+", + "paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored")); @@ -606,7 +717,7 @@ public void testExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_excluding", null, - "flink|paimon.+", + "paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2")); } @@ -616,8 +727,8 @@ public void testExcludingTables() throws Exception { public void testIncludingAndExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_in_excluding", - "flink|paimon.+", - "paimon_1", + "paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+", + "paimon_sync_database_in_excluding.paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test")); } @@ -645,6 +756,7 @@ private void includingAndExcludingTablesImpl( warehouse, database, false, + false, null, null, includingTables, @@ -679,7 +791,7 @@ public void testIgnoreCase() throws Exception { MySqlSyncDatabaseAction action = new MySqlSyncDatabaseAction( - mySqlConfig, warehouse, database, false, catalogConfig, tableConfig); + mySqlConfig, warehouse, database, false, false, catalogConfig, tableConfig); action.build(env); JobClient client = env.executeAsync(); waitJobRunning(client); @@ -790,10 +902,11 @@ public void testAddIgnoredTable() throws Exception { warehouse, database, false, + false, null, null, - "t.+", - ".*a$", + "paimon_sync_database_add_ignored_table.t.+", + "paimon_sync_database_add_ignored_table.*a$", Collections.emptyMap(), tableConfig, COMBINED); @@ -1105,9 +1218,10 @@ private JobClient buildSyncDatabaseActionWithNewlyAddedTables( warehouse, database, false, + false, null, null, - "t.+", + databaseName + ".t.+", null, catalogConfig, tableConfig, @@ -1143,6 +1257,7 @@ public void testTinyInt1Convert() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -1183,9 +1298,12 @@ private Catalog catalog() { return CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); } - private FileStoreTable getFileStoreTable(String tableName) throws Exception { - Identifier identifier = Identifier.create(database, tableName); - return (FileStoreTable) catalog().getTable(identifier); + private FileStoreTable getFileStoreTable(String tableName, String... databaseName) + throws Exception { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + Identifier identifier = + Identifier.create(databaseName.length > 0 ? databaseName[0] : database, tableName); + return (FileStoreTable) catalog.getTable(identifier); } private void assertTableExists(List tableNames) { diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql index 715a6b66a2ba8..5ac73db076b98 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql @@ -608,3 +608,44 @@ CREATE TABLE schema_evolution_5 ( v2 TINYINT(1) comment 'tinyint(1)', PRIMARY KEY (_id) ); + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testEnableSyncToMultipleDB +-- MySqlSyncDatabaseActionITCase#testDisableSyncToMultipleDB +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_multiple1; +USE paimon_sync_database_multiple1; + +CREATE TABLE t1 ( + k1 INT, + v0 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t2 ( + k2 INT, + v2 VARCHAR(10), + PRIMARY KEY (k2) +); + +INSERT INTO t1 VALUES(1, 'flink'); +INSERT INTO t2 VALUES(2, 'paimon'); + +CREATE DATABASE paimon_sync_database_multiple2; +USE paimon_sync_database_multiple2; + +CREATE TABLE t1 ( + k1 INT, + v1 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t3 ( + k3 INT, + v3 VARCHAR(10), + PRIMARY KEY (k3) +); + +INSERT INTO t1 VALUES(3, 'three'); +INSERT INTO t3 VALUES(4, 'four');