From 3eb32dc52ac305d4c375aa5c727e8b61b564159f Mon Sep 17 00:00:00 2001 From: zhangjun Date: Sun, 28 May 2023 23:56:57 +0800 Subject: [PATCH] support sync multiple mysql database --- docs/content/how-to/cdc-ingestion.md | 7 +- .../generated/mysql_sync_database.html | 6 +- .../action/cdc/mysql/MySqlActionUtils.java | 2 +- .../mysql/MySqlDebeziumJsonEventParser.java | 36 ++++- .../cdc/mysql/MySqlSyncDatabaseAction.java | 122 ++++++++++----- .../mysql/MySqlSyncDatabaseActionFactory.java | 12 +- .../cdc/mysql/MySqlSyncTableAction.java | 13 +- ...CdcDynamicTableParsingProcessFunction.java | 32 ++-- .../paimon/flink/sink/cdc/EventParser.java | 4 + .../cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 27 +++- .../mysql/MySqlSyncDatabaseActionITCase.java | 139 ++++++++++++++++-- .../resources/mysql/sync_database_setup.sql | 42 ++++++ 12 files changed, 365 insertions(+), 77 deletions(-) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 57d3e06a4154..e4c0c27055e7 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -101,7 +101,7 @@ the regular expressions. ### Synchronizing Databases -By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database. +By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL. To use this feature through `flink run`, run the following shell command. @@ -111,6 +111,7 @@ To use this feature through `flink run`, run the following shell command. mysql-sync-database --warehouse \ --database \ + [--sync-to-multiple-db ] \ [--ignore-incompatible ] \ [--table-prefix ] \ [--table-suffix ] \ @@ -130,6 +131,10 @@ For each MySQL table to be synchronized, if the corresponding Paimon table does Example 1: synchronize entire database +You can synchronize one or multiple mysql database to paimon, if you want to synchronize one mysql database, +you can set the `database-name` parameter to a specific database name, if you want to synchronize multiple mysql database, +you can set the `database-name` parameter to a regular expression. + ```bash /bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 8336fe6c4877..f59a48182573 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -33,6 +33,10 @@
--database
The database name in Paimon catalog. + +
--sync-to-multiple-db
+supp It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged, it is suitable for database sharding scenarios. if it is true, the parameter "--database" will be ignored, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL, it is suitable for scenarios with a large number of databases under a database instance, which can save resources. +
--ignore-incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. @@ -47,7 +51,7 @@
--including-tables
- It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. + It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.
--excluding-tables
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index 756b94279113..56bb7d97f5b1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -211,7 +211,7 @@ static MySqlSource buildMySqlSource(Configuration mySqlConfig) { .username(mySqlConfig.get(MySqlSourceOptions.USERNAME)) .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(databaseName + "." + tableName); + .tableList(tableName); mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); mySqlConfig diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index dfde301c367d..3af98aeaa227 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -72,6 +72,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser { private final ObjectMapper objectMapper = new ObjectMapper(); private final ZoneId serverTimeZone; private final boolean caseSensitive; + private final boolean syncToMultipleDB; private final TableNameConverter tableNameConverter; private final List computedColumns; private final NewTableSchemaBuilder schemaBuilder; @@ -100,7 +101,8 @@ public MySqlDebeziumJsonEventParser( ddl -> Optional.empty(), null, null, - convertTinyint1ToBool); + convertTinyint1ToBool, + false); } public MySqlDebeziumJsonEventParser( @@ -110,7 +112,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - boolean convertTinyint1ToBool) { + boolean convertTinyint1ToBool, + boolean syncToMultipleDB) { this( serverTimeZone, caseSensitive, @@ -119,7 +122,8 @@ public MySqlDebeziumJsonEventParser( schemaBuilder, includingPattern, excludingPattern, - convertTinyint1ToBool); + convertTinyint1ToBool, + syncToMultipleDB); } public MySqlDebeziumJsonEventParser( @@ -130,7 +134,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - boolean convertTinyint1ToBool) { + boolean convertTinyint1ToBool, + Boolean syncToMultipleDB) { this.serverTimeZone = serverTimeZone; this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; @@ -139,6 +144,7 @@ public MySqlDebeziumJsonEventParser( this.includingPattern = includingPattern; this.excludingPattern = excludingPattern; this.convertTinyint1ToBool = convertTinyint1ToBool; + this.syncToMultipleDB = syncToMultipleDB; } @Override @@ -146,16 +152,30 @@ public void setRawEvent(String rawEvent) { try { root = objectMapper.readValue(rawEvent, JsonNode.class); payload = root.get("payload"); - currentTable = payload.get("source").get("table").asText(); + currentTable = + payload.get("source").get("db").asText() + + "." + + payload.get("source").get("table").asText(); shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(); } catch (Exception e) { throw new RuntimeException(e); } } + @Override + public String parseDatabaseName() { + return payload.get("source").get("db").asText(); + } + @Override public String parseTableName() { - return tableNameConverter.convert(currentTable); + String tableName = payload.get("source").get("table").asText(); + if (syncToMultipleDB) { + String dbName = payload.get("source").get("db").asText(); + return dbName + "." + tableNameConverter.convert(tableName); + } else { + return tableNameConverter.convert(tableName); + } } private boolean isSchemaChange() { @@ -244,11 +264,13 @@ public Optional parseNewTable() { JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames"); if (primaryKeyColumnNames.size() == 0) { + String id = tableChange.get("id").asText(); + String tableName = id.replaceAll("\"", ""); LOG.debug( "Didn't find primary keys from MySQL DDL for table '{}'. " + "This table won't be synchronized.", currentTable); - excludedTables.add(currentTable); + excludedTables.add(tableName); shouldSynchronizeCurrentTable = false; return Optional.empty(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index c931fca31958..d92873004b2c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -51,7 +51,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -102,6 +104,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { private final Configuration mySqlConfig; private final String database; + private final boolean syncToMultipleDB; private final boolean ignoreIncompatible; private final String tablePrefix; private final String tableSuffix; @@ -115,6 +118,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, Map catalogConfig, Map tableConfig) { @@ -122,6 +126,7 @@ public MySqlSyncDatabaseAction( mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, null, null, @@ -136,6 +141,7 @@ public MySqlSyncDatabaseAction( Map mySqlConfig, String warehouse, String database, + boolean syncToMultipleDB, boolean ignoreIncompatible, @Nullable String tablePrefix, @Nullable String tableSuffix, @@ -147,6 +153,7 @@ public MySqlSyncDatabaseAction( super(warehouse, catalogConfig); this.mySqlConfig = Configuration.fromMap(mySqlConfig); this.database = database; + this.syncToMultipleDB = syncToMultipleDB; this.ignoreIncompatible = ignoreIncompatible; this.tablePrefix = tablePrefix == null ? "" : tablePrefix; this.tableSuffix = tableSuffix == null ? "" : tableSuffix; @@ -165,13 +172,15 @@ public void build(StreamExecutionEnvironment env) throws Exception { + "If you want to sync several MySQL tables into one Paimon table, " + "use mysql-sync-table instead."); boolean caseSensitive = catalog.caseSensitive(); + List excludedTables = new LinkedList<>(); + List mySqlSchemas = getMySqlSchemaList(excludedTables); + Set databases = + mySqlSchemas.stream().map(MySqlSchema::databaseName).collect(Collectors.toSet()); if (!caseSensitive) { - validateCaseInsensitive(); + validateCaseInsensitive(databases); } - List excludedTables = new LinkedList<>(); - List mySqlSchemas = getMySqlSchemaList(excludedTables); String mySqlDatabase = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME); checkArgument( mySqlSchemas.size() > 0, @@ -179,7 +188,14 @@ public void build(StreamExecutionEnvironment env) throws Exception { + mySqlDatabase + ", or MySQL database does not exist."); - catalog.createDatabase(database, true); + if (syncToMultipleDB) { + for (String database : databases) { + catalog.createDatabase(database, true); + } + } else { + catalog.createDatabase(database, true); + } + TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, tablePrefix, tableSuffix); @@ -187,7 +203,13 @@ public void build(StreamExecutionEnvironment env) throws Exception { List monitoredTables = new ArrayList<>(); for (MySqlSchema mySqlSchema : mySqlSchemas) { String paimonTableName = tableNameConverter.convert(mySqlSchema.tableName()); - Identifier identifier = new Identifier(database, paimonTableName); + Identifier identifier; + if (syncToMultipleDB) { + identifier = new Identifier(mySqlSchema.databaseName(), paimonTableName); + } else { + identifier = new Identifier(database, paimonTableName); + } + FileStoreTable table; Schema fromMySql = MySqlActionUtils.buildPaimonSchema( @@ -202,13 +224,13 @@ public void build(StreamExecutionEnvironment env) throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), mySqlSchema, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - monitoredTables.add(mySqlSchema.tableName()); + monitoredTables.add(mySqlSchema.databaseName() + "." + mySqlSchema.tableName()); fileStoreTables.add(table); } } catch (Catalog.TableNotExistException e) { catalog.createTable(identifier, fromMySql, false); table = (FileStoreTable) catalog.getTable(identifier); - monitoredTables.add(mySqlSchema.tableName()); + monitoredTables.add(mySqlSchema.databaseName() + "." + mySqlSchema.tableName()); fileStoreTables.add(table); } } @@ -241,6 +263,9 @@ public void build(StreamExecutionEnvironment env) throws Exception { Pattern includingPattern = this.includingPattern; Pattern excludingPattern = this.excludingPattern; Boolean convertTinyint1ToBool = mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL); + + // if we use syncToMultipleDB variable directly, it will throw an NotSerializableException. + boolean enableMultipleDB = syncToMultipleDB; EventParser.Factory parserFactory = () -> new MySqlDebeziumJsonEventParser( @@ -250,10 +275,12 @@ public void build(StreamExecutionEnvironment env) throws Exception { schemaBuilder, includingPattern, excludingPattern, - convertTinyint1ToBool); + convertTinyint1ToBool, + enableMultipleDB); String database = this.database; DatabaseSyncMode mode = this.mode; + FlinkCdcSyncDatabaseSinkBuilder sinkBuilder = new FlinkCdcSyncDatabaseSinkBuilder() .withInput( @@ -263,8 +290,9 @@ public void build(StreamExecutionEnvironment env) throws Exception { .withDatabase(database) .withCatalogLoader(catalogLoader()) .withTables(fileStoreTables) - .withMode(mode); - + .withMode(mode) + .withSyncMultipleDB(syncToMultipleDB) + .withTables(fileStoreTables); String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); if (sinkParallelism != null) { sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); @@ -273,12 +301,23 @@ public void build(StreamExecutionEnvironment env) throws Exception { sinkBuilder.build(); } - private void validateCaseInsensitive() { - checkArgument( - database.equals(database.toLowerCase()), - String.format( - "Database name [%s] cannot contain upper case in case-insensitive catalog.", - database)); + private void validateCaseInsensitive(Set databases) { + if (syncToMultipleDB) { + for (String database : databases) { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + } else { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + } + checkArgument( tablePrefix.equals(tablePrefix.toLowerCase()), String.format( @@ -292,29 +331,40 @@ private void validateCaseInsensitive() { } private List getMySqlSchemaList(List excludedTables) throws Exception { - String databaseName = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME); + Pattern databasePattern = + Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)); List mySqlSchemaList = new ArrayList<>(); try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) { DatabaseMetaData metaData = conn.getMetaData(); - try (ResultSet tables = - metaData.getTables(databaseName, null, "%", new String[] {"TABLE"})) { - while (tables.next()) { - String tableName = tables.getString("TABLE_NAME"); - if (!shouldMonitorTable(tableName)) { - excludedTables.add(tableName); - continue; - } - MySqlSchema mySqlSchema = - new MySqlSchema( - metaData, - databaseName, - tableName, - mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL)); - if (mySqlSchema.primaryKeys().size() > 0) { - // only tables with primary keys will be considered - mySqlSchemaList.add(mySqlSchema); - } else { - excludedTables.add(tableName); + try (ResultSet schemas = metaData.getCatalogs()) { + while (schemas.next()) { + String databaseName = schemas.getString("TABLE_CAT"); + Matcher databaseMatcher = databasePattern.matcher(databaseName); + if (databaseMatcher.matches()) { + try (ResultSet tables = metaData.getTables(databaseName, null, "%", null)) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String fullMysqlTableName = databaseName + "." + tableName; + if (!shouldMonitorTable(fullMysqlTableName)) { + excludedTables.add(fullMysqlTableName); + continue; + } + + MySqlSchema mySqlSchema = + new MySqlSchema( + metaData, + databaseName, + tableName, + mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL)); + if (mySqlSchema.primaryKeys().size() > 0) { + // only tables with primary keys will be considered + mySqlSchemaList.add(mySqlSchema); + } else { + + excludedTables.add(fullMysqlTableName); + } + } + } } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index 0a6723bf7fef..d308a3c8b551 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -49,6 +49,7 @@ public Optional create(MultipleParameterTool params) { String warehouse = params.get("warehouse"); String database = params.get("database"); boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); + boolean syncToMultipleDB = Boolean.parseBoolean(params.get("sync-to-multiple-db")); String tablePrefix = params.get("table-prefix"); String tableSuffix = params.get("table-suffix"); String includingTables = params.get("including-tables"); @@ -79,6 +80,7 @@ public Optional create(MultipleParameterTool params) { mySqlConfig, warehouse, database, + syncToMultipleDB, ignoreIncompatible, tablePrefix, tableSuffix, @@ -94,7 +96,7 @@ public void printHelp() { System.out.println( "Action \"mysql-sync-database\" creates a streaming job " + "with a Flink MySQL CDC source and multiple Paimon table sinks " - + "to synchronize a whole MySQL database into one Paimon database.\n" + + "to synchronize one or multiple MySQL database into one or multiple Paimon database.\n" + "Only MySQL tables with primary keys will be considered. " + "Newly created MySQL tables after the job starts will not be included."); System.out.println(); @@ -102,6 +104,7 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " mysql-sync-database --warehouse --database " + + "[--sync-to-multiple-db ] " + "[--ignore-incompatible ] " + "[--table-prefix ] " + "[--table-suffix ] " @@ -113,6 +116,13 @@ public void printHelp() { + "[--table-conf [--table-conf ...]]"); System.out.println(); + System.out.println( + "--sync-to-multiple-db is default false, in this case, all the MySQL database will be synchronized to one paimon database, " + + "and the table with same name in different database will be merged, it is suitable for database sharding scenarios. " + + "if it is true, the parameter \"--database\" will be ignored, all the MySQL database will be synchronize to multiple " + + "paimon database with the same schema as MySQL, it is suitable for scenarios with a large number of databases under a database instance, which can save resources."); + System.out.println(); + System.out.println( "--ignore-incompatible is default false, in this case, if MySQL table name exists in Paimon " + "and their schema is incompatible, an exception will be thrown. " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index e85d1121ace4..221a8bb34649 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -133,7 +133,6 @@ public MySqlSyncTableAction( } public void build(StreamExecutionEnvironment env) throws Exception { - MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); boolean caseSensitive = catalog.caseSensitive(); @@ -141,14 +140,24 @@ public void build(StreamExecutionEnvironment env) throws Exception { validateCaseInsensitive(); } + List mySqlSchemaList = getMySqlSchemaList(); + + String tableNames = + mySqlSchemaList.stream() + .map(m -> m.databaseName() + "." + m.tableName()) + .collect(Collectors.joining("|")); + MySqlSchema mySqlSchema = - getMySqlSchemaList().stream() + mySqlSchemaList.stream() .reduce(MySqlSchema::merge) .orElseThrow( () -> new RuntimeException( "No table satisfies the given database name and table name")); + mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, "(" + tableNames + ")"); + MySqlSource source = MySqlActionUtils.buildMySqlSource(mySqlConfig); + catalog.createDatabase(database, true); Identifier identifier = new Identifier(database, table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java index 0d9da6df1ae4..b36af24e5bbd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java @@ -62,16 +62,20 @@ public class CdcDynamicTableParsingProcessFunction extends ProcessFunction parserFactory; private final String database; private final Catalog.Loader catalogLoader; + private final boolean syncToMultipleDB; private transient EventParser parser; private transient Catalog catalog; public CdcDynamicTableParsingProcessFunction( - String database, Catalog.Loader catalogLoader, EventParser.Factory parserFactory) { - // for now, only support single database + String database, + Catalog.Loader catalogLoader, + EventParser.Factory parserFactory, + boolean syncToMultipleDB) { this.database = database; this.catalogLoader = catalogLoader; this.parserFactory = parserFactory; + this.syncToMultipleDB = syncToMultipleDB; } @Override @@ -84,20 +88,26 @@ public void open(Configuration parameters) throws Exception { public void processElement(T raw, Context context, Collector collector) throws Exception { parser.setRawEvent(raw); - // CDC Ingestion only supports single database at this time being. - // In the future, there will be a mapping between source databases - // and target paimon databases - // TODO: support multiple databases - // String databaseName = parser.parseDatabaseName(); - String tableName = parser.parseTableName(); + String finalDatabaseName; + String finalTableName; + if (syncToMultipleDB) { + finalDatabaseName = parser.parseDatabaseName(); + finalTableName = parser.parseTableName().split("\\.")[1]; + } else { + finalDatabaseName = database; + finalTableName = parser.parseTableName(); + } // check for newly added table parser.parseNewTable() .ifPresent( schema -> { Identifier identifier = - new Identifier(database, parser.parseTableName()); + new Identifier(finalDatabaseName, finalTableName); try { + if (!catalog.databaseExists(finalDatabaseName)) { + catalog.createDatabase(finalDatabaseName, true); + } catalog.createTable(identifier, schema, true); } catch (Exception e) { LOG.error("create newly added paimon table error.", e); @@ -108,7 +118,7 @@ public void processElement(T raw, Context context, Collector collector) th if (schemaChange.size() > 0) { context.output( DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG, - Tuple2.of(Identifier.create(database, tableName), schemaChange)); + Tuple2.of(Identifier.create(finalDatabaseName, finalTableName), schemaChange)); } parser.parseRecords() @@ -116,7 +126,7 @@ public void processElement(T raw, Context context, Collector collector) th record -> context.output( DYNAMIC_OUTPUT_TAG, - wrapRecord(database, tableName, record))); + wrapRecord(finalDatabaseName, finalTableName, record))); } private CdcMultiplexRecord wrapRecord(String databaseName, String tableName, CdcRecord record) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java index c0e481012854..e60216dc3fd7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java @@ -42,6 +42,10 @@ default String parseTableName() { throw new UnsupportedOperationException("Table name is not supported in this parser."); } + default String parseDatabaseName() { + throw new UnsupportedOperationException("Database name is not supported in this parser."); + } + /** * Parse new schema if this event contains schema change. * diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 0070f3ae99af..e51c9f19e843 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -66,9 +66,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder { // Paimon tables. 2) in multiplex sink where it is used to // initialize different writers to multiple tables. private Catalog.Loader catalogLoader; - // database to sync, currently only support single database private String database; private DatabaseSyncMode mode; + private boolean syncToMultipleDB = false; public FlinkCdcSyncDatabaseSinkBuilder withInput(DataStream input) { this.input = input; @@ -81,6 +81,11 @@ public FlinkCdcSyncDatabaseSinkBuilder withParserFactory( return this; } + public FlinkCdcSyncDatabaseSinkBuilder withSyncMultipleDB(Boolean syncToMultipleDB) { + this.syncToMultipleDB = syncToMultipleDB; + return this; + } + public FlinkCdcSyncDatabaseSinkBuilder withTables(List tables) { this.tables = tables; return this; @@ -124,7 +129,7 @@ private void buildCombinedCdcSink() { input.forward() .process( new CdcDynamicTableParsingProcessFunction<>( - database, catalogLoader, parserFactory)) + database, catalogLoader, parserFactory, syncToMultipleDB)) .setParallelism(input.getParallelism()); // for newly-added tables, create a multiplexing operator that handles all their records @@ -171,15 +176,26 @@ private void buildDividedCdcSink() { .setParallelism(input.getParallelism()); for (FileStoreTable table : tables) { + String tableName; + Identifier identifier; + if (syncToMultipleDB) { + String db = table.location().getParent().getName().replace(".db", ""); + tableName = db + "." + table.name(); + identifier = Identifier.create(db, table.name()); + } else { + tableName = table.name(); + identifier = Identifier.create(database, tableName); + } + DataStream schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput( parsed, CdcMultiTableParsingProcessFunction - .createUpdatedDataFieldsOutputTag(table.name())) + .createUpdatedDataFieldsOutputTag(tableName)) .process( new UpdatedDataFieldsProcessFunction( new SchemaManager(table.fileIO(), table.location()), - Identifier.create(database, table.name()), + identifier, catalogLoader)); schemaChangeProcessFunction.getTransformation().setParallelism(1); schemaChangeProcessFunction.getTransformation().setMaxParallelism(1); @@ -187,8 +203,7 @@ private void buildDividedCdcSink() { DataStream parsedForTable = SingleOutputStreamOperatorUtils.getSideOutput( parsed, - CdcMultiTableParsingProcessFunction.createRecordOutputTag( - table.name())); + CdcMultiTableParsingProcessFunction.createRecordOutputTag(tableName)); BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index b1bcd88bd487..d5fc76624771 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -100,6 +100,7 @@ public void testSchemaEvolution() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -230,6 +231,109 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { waitForResult(expected, table2, rowType2, primaryKeys2); } + private void testSyncToMultipleDatabaseImpl( + boolean enableMultipleDB, String includingTables, String excludingTables) + throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put( + "database-name", "paimon_sync_database_multiple1|paimon_sync_database_multiple2"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + MySqlSyncDatabaseAction action = + new MySqlSyncDatabaseAction( + mySqlConfig, + warehouse, + database, + enableMultipleDB, + false, + null, + null, + includingTables, + excludingTables, + Collections.emptyMap(), + Collections.emptyMap(), + DIVIDED); + + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + } + + @Test + @Timeout(60) + public void testEnableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(true, null, null); + FileStoreTable table1 = getFileStoreTable("t1", "paimon_sync_database_multiple1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v1"}); + List expected = Collections.singletonList("+I[1, flink]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2", "paimon_sync_database_multiple1"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t1", "paimon_sync_database_multiple2"); + List primaryKeys3 = Collections.singletonList("k1"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v1"}); + expected = Collections.singletonList("+I[3, three]"); + waitForResult(expected, table3, rowType, primaryKeys3); + + FileStoreTable table4 = getFileStoreTable("t3", "paimon_sync_database_multiple2"); + List primaryKeys4 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table4, rowType, primaryKeys4); + } + + @Test + @Timeout(60) + public void testDisableSyncToMultipleDB() throws Exception { + testSyncToMultipleDatabaseImpl(false, null, null); + FileStoreTable table1 = getFileStoreTable("t1"); + List primaryKeys1 = Collections.singletonList("k1"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v1"}); + List expected = Arrays.asList("+I[1, flink]", "+I[3, three]"); + waitForResult(expected, table1, rowType, primaryKeys1); + + FileStoreTable table2 = getFileStoreTable("t2"); + List primaryKeys2 = Collections.singletonList("k2"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v2"}); + expected = Collections.singletonList("+I[2, paimon]"); + waitForResult(expected, table2, rowType, primaryKeys2); + + FileStoreTable table3 = getFileStoreTable("t3"); + List primaryKeys3 = Collections.singletonList("k3"); + rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k3", "v3"}); + expected = Collections.singletonList("+I[4, four]"); + waitForResult(expected, table3, rowType, primaryKeys3); + } + @Test @Timeout(60) public void testSchemaEvolutionWithTinyInt1Convert() throws Exception { @@ -249,6 +353,7 @@ public void testSchemaEvolutionWithTinyInt1Convert() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -340,6 +445,7 @@ public void testSpecifiedMySqlTable() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -367,6 +473,7 @@ public void testInvalidDatabase() { warehouse, database, false, + false, Collections.emptyMap(), Collections.emptyMap()); @@ -410,6 +517,7 @@ public void testIgnoreIncompatibleTables() throws Exception { mySqlConfig, warehouse, database, + false, true, Collections.emptyMap(), tableConfig); @@ -476,6 +584,7 @@ public void testTableAffix() throws Exception { warehouse, database, false, + false, "test_prefix_", "_test_suffix", null, @@ -600,7 +709,7 @@ private void testTableAffixImpl(Statement statement) throws Exception { public void testIncludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_including", - "flink|paimon.+", + "paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored")); @@ -612,7 +721,7 @@ public void testExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_excluding", null, - "flink|paimon.+", + "paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2")); } @@ -622,8 +731,8 @@ public void testExcludingTables() throws Exception { public void testIncludingAndExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_in_excluding", - "flink|paimon.+", - "paimon_1", + "paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+", + "paimon_sync_database_in_excluding.paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test")); } @@ -651,6 +760,7 @@ private void includingAndExcludingTablesImpl( warehouse, database, false, + false, null, null, includingTables, @@ -685,7 +795,7 @@ public void testIgnoreCase() throws Exception { MySqlSyncDatabaseAction action = new MySqlSyncDatabaseAction( - mySqlConfig, warehouse, database, false, catalogConfig, tableConfig); + mySqlConfig, warehouse, database, false, false, catalogConfig, tableConfig); action.build(env); JobClient client = env.executeAsync(); waitJobRunning(client); @@ -793,10 +903,11 @@ public void testAddIgnoredTable() throws Exception { warehouse, database, false, + false, null, null, - "t.+", - ".*a$", + "paimon_sync_database_add_ignored_table.t.+", + "paimon_sync_database_add_ignored_table.*a$", Collections.emptyMap(), tableConfig, COMBINED); @@ -1105,9 +1216,10 @@ private JobClient buildSyncDatabaseActionWithNewlyAddedTables( warehouse, database, false, + false, null, null, - "t.+", + databaseName + ".t.+", null, catalogConfig, tableConfig, @@ -1143,6 +1255,7 @@ public void testTinyInt1Convert() throws Exception { warehouse, database, false, + false, Collections.emptyMap(), tableConfig); action.build(env); @@ -1211,6 +1324,7 @@ public void testSyncManyTableWithLimitedMemory() throws Exception { warehouse, database, false, + false, null, null, null, @@ -1257,9 +1371,12 @@ public void testSyncManyTableWithLimitedMemory() throws Exception { } } - private FileStoreTable getFileStoreTable(String tableName) throws Exception { - Identifier identifier = Identifier.create(database, tableName); - return (FileStoreTable) catalog().getTable(identifier); + private FileStoreTable getFileStoreTable(String tableName, String... databaseName) + throws Exception { + Catalog catalog = catalog(); + Identifier identifier = + Identifier.create(databaseName.length > 0 ? databaseName[0] : database, tableName); + return (FileStoreTable) catalog.getTable(identifier); } private void assertTableExists(List tableNames) { diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql index 52e3a4c16007..9182fce4a69c 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_database_setup.sql @@ -346,3 +346,45 @@ CREATE TABLE a ( v VARCHAR(10), PRIMARY KEY (k) ); + + +-- ################################################################################ +-- MySqlSyncDatabaseActionITCase#testEnableSyncToMultipleDB +-- MySqlSyncDatabaseActionITCase#testDisableSyncToMultipleDB +-- ################################################################################ + +CREATE DATABASE paimon_sync_database_multiple1; +USE paimon_sync_database_multiple1; + +CREATE TABLE t1 ( + k1 INT, + v1 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t2 ( + k2 INT, + v2 VARCHAR(10), + PRIMARY KEY (k2) +); + +INSERT INTO t1 VALUES(1, 'flink'); +INSERT INTO t2 VALUES(2, 'paimon'); + +CREATE DATABASE paimon_sync_database_multiple2; +USE paimon_sync_database_multiple2; + +CREATE TABLE t1 ( + k1 INT, + v1 VARCHAR(10), + PRIMARY KEY (k1) +); + +CREATE TABLE t3 ( + k3 INT, + v3 VARCHAR(10), + PRIMARY KEY (k3) +); + +INSERT INTO t1 VALUES(3, 'three'); +INSERT INTO t3 VALUES(4, 'four'); \ No newline at end of file