Skip to content

Commit

Permalink
support sync multiple mysql database
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjun0x01 committed Jul 21, 2023
1 parent 63a6468 commit 1791d3c
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 78 deletions.
7 changes: 6 additions & 1 deletion docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Example
### Synchronizing Databases
By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database.
By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema.
To use this feature through `flink run`, run the following shell command.
Expand All @@ -108,6 +108,7 @@ To use this feature through `flink run`, run the following shell command.
mysql-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--sync-to-multiple-db <true/false>] \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
Expand All @@ -127,6 +128,10 @@ For each MySQL table to be synchronized, if the corresponding Paimon table does
Example 1: synchronize entire database
You can synchronize one or multiple mysql database to paimon, if you want to synchronize one mysql database,
you can set the `database-name` parameter to a specific database name, if you want to synchronize multiple mysql database,
you can set the `database-name` parameter to a regular expression, the table with same name in different database will be merged to the same paimon table(you need to set `sync-to-multiple-db` to `false` at the same time).
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
Expand Down
6 changes: 5 additions & 1 deletion docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<td><h5>--database</h5></td>
<td>The database name in Paimon catalog.</td>
</tr>
<tr>
<td><h5>--sync-to-multiple-db</h5></td>
supp <td>It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged, it is suitable for database sharding scenarios. if it is true, the parameter "--database" will be ignored, all the MySQL database will be synchronize to multiple paimon database with the same schema as MySQL before, it is suitable for scenarios with a large number of databases under a database instance, which can save resources.</td>
</tr>
<tr>
<td><h5>--ignore-incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
Expand All @@ -47,7 +51,7 @@
</tr>
<tr>
<td><h5>--including-tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
.username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
.password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(databaseName + "." + tableName);
.tableList(tableName);

mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
mySqlConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
private final boolean caseSensitive;
private final boolean syncToMultipleDB;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
Expand Down Expand Up @@ -100,7 +101,8 @@ public MySqlDebeziumJsonEventParser(
ddl -> Optional.empty(),
null,
null,
convertTinyint1ToBool);
convertTinyint1ToBool,
false);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -110,7 +112,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
boolean convertTinyint1ToBool,
boolean syncToMultipleDB) {
this(
serverTimeZone,
caseSensitive,
Expand All @@ -119,7 +122,8 @@ public MySqlDebeziumJsonEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
convertTinyint1ToBool);
convertTinyint1ToBool,
syncToMultipleDB);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -130,7 +134,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
boolean convertTinyint1ToBool,
Boolean syncToMultipleDB) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
Expand All @@ -139,23 +144,38 @@ public MySqlDebeziumJsonEventParser(
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.convertTinyint1ToBool = convertTinyint1ToBool;
this.syncToMultipleDB = syncToMultipleDB;
}

@Override
public void setRawEvent(String rawEvent) {
try {
root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
currentTable = payload.get("source").get("table").asText();
currentTable =
payload.get("source").get("db").asText()
+ "."
+ payload.get("source").get("table").asText();
shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public String parseDatabaseName() {
return payload.get("source").get("db").asText();
}

@Override
public String parseTableName() {
return tableNameConverter.convert(currentTable);
String tableName = payload.get("source").get("table").asText();
if (syncToMultipleDB) {
String dbName = payload.get("source").get("db").asText();
return dbName + "." + tableNameConverter.convert(tableName);
} else {
return tableNameConverter.convert(tableName);
}
}

private boolean isSchemaChange() {
Expand Down Expand Up @@ -245,8 +265,7 @@ public Optional<Schema> parseNewTable() {
JsonNode primaryKeyColumnNames =
tableChanges.get(0).get("table").get("primaryKeyColumnNames");
if (primaryKeyColumnNames.size() == 0) {
String id = tableChanges.get(0).get("id").asText();
String tableName = id.replaceAll("\"", "").split("\\.")[1];
String tableName = tableChanges.get(0).get("id").asText().replaceAll("\"", "");
LOG.debug(
"Didn't find primary keys from MySQL DDL for table '{}'. "
+ "This table won't be synchronized.",
Expand Down
Loading

0 comments on commit 1791d3c

Please sign in to comment.