Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] support sync multiple mysql database to paimon #1282

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ the regular expressions.

### Synchronizing Databases

By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database.
By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL.

To use this feature through `flink run`, run the following shell command.

Expand All @@ -111,6 +111,7 @@ To use this feature through `flink run`, run the following shell command.
mysql-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--sync-to-multiple-db <true/false>] \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
Expand All @@ -130,6 +131,10 @@ For each MySQL table to be synchronized, if the corresponding Paimon table does

Example 1: synchronize entire database

You can synchronize one or multiple mysql database to paimon, if you want to synchronize one mysql database,
you can set the `database-name` parameter to a specific database name, if you want to synchronize multiple mysql database,
you can set the `database-name` parameter to a regular expression.

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
Expand Down
6 changes: 5 additions & 1 deletion docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<td><h5>--database</h5></td>
<td>The database name in Paimon catalog.</td>
</tr>
<tr>
<td><h5>--sync-to-multiple-db</h5></td>
supp <td>It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged, it is suitable for database sharding scenarios. if it is true, the parameter "--database" will be ignored, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL, it is suitable for scenarios with a large number of databases under a database instance, which can save resources.</td>
</tr>
<tr>
<td><h5>--ignore-incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
Expand All @@ -47,7 +51,7 @@
</tr>
<tr>
<td><h5>--including-tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
.username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
.password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(databaseName + "." + tableName);
.tableList(tableName);

mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
mySqlConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
private final boolean caseSensitive;
private final boolean syncToMultipleDB;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
Expand Down Expand Up @@ -100,7 +101,8 @@ public MySqlDebeziumJsonEventParser(
ddl -> Optional.empty(),
null,
null,
convertTinyint1ToBool);
convertTinyint1ToBool,
false);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -110,7 +112,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
boolean convertTinyint1ToBool,
boolean syncToMultipleDB) {
this(
serverTimeZone,
caseSensitive,
Expand All @@ -119,7 +122,8 @@ public MySqlDebeziumJsonEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
convertTinyint1ToBool);
convertTinyint1ToBool,
syncToMultipleDB);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -130,7 +134,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
boolean convertTinyint1ToBool,
Boolean syncToMultipleDB) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
Expand All @@ -139,23 +144,38 @@ public MySqlDebeziumJsonEventParser(
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.convertTinyint1ToBool = convertTinyint1ToBool;
this.syncToMultipleDB = syncToMultipleDB;
}

@Override
public void setRawEvent(String rawEvent) {
try {
root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
currentTable = payload.get("source").get("table").asText();
currentTable =
payload.get("source").get("db").asText()
+ "."
+ payload.get("source").get("table").asText();
shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public String parseDatabaseName() {
return payload.get("source").get("db").asText();
}

@Override
public String parseTableName() {
return tableNameConverter.convert(currentTable);
String tableName = payload.get("source").get("table").asText();
if (syncToMultipleDB) {
String dbName = payload.get("source").get("db").asText();
return dbName + "." + tableNameConverter.convert(tableName);
} else {
return tableNameConverter.convert(tableName);
}
}

private boolean isSchemaChange() {
Expand Down Expand Up @@ -244,11 +264,13 @@ public Optional<Schema> parseNewTable() {

JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames");
if (primaryKeyColumnNames.size() == 0) {
String id = tableChange.get("id").asText();
String tableName = id.replaceAll("\"", "");
LOG.debug(
"Didn't find primary keys from MySQL DDL for table '{}'. "
+ "This table won't be synchronized.",
currentTable);
excludedTables.add(currentTable);
excludedTables.add(tableName);
shouldSynchronizeCurrentTable = false;
return Optional.empty();
}
Expand Down
Loading