Skip to content

Commit

Permalink
sync to multiple database
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjun0x01 committed Jun 8, 2023
1 parent c3102b4 commit 42549b3
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 45 deletions.
3 changes: 2 additions & 1 deletion docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Example
### Synchronizing Databases
By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the whole MySQL database into one Paimon database.
By using [MySqlSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple MySQL database into one or multiple Paimon database, if `sync-to-multiple-db` is false, all the MySQL database will be synchronize to one paimon database, and the table with same name in different database will be merged. if `sync-to-multiple-db` is true, all the MySQL database will be synchronize to multiple paimon database with the same schema.
To use this feature through `flink run`, run the following shell command.
Expand All @@ -107,6 +107,7 @@ To use this feature through `flink run`, run the following shell command.
mysql-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--sync-to-multiple-db <true/false>] \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
Expand Down
6 changes: 5 additions & 1 deletion docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<td><h5>--database</h5></td>
<td>The database name in Paimon catalog.</td>
</tr>
<tr>
<td><h5>--sync-to-multiple-db</h5></td>
<td>It is default false, in this case, all the MySQL database will be synchronized to one paimon database, and the table with same name in different database will be merged. if it is true, all the MySQL database will be synchronize to multiple paimon database with the same schema.</td>
</tr>
<tr>
<td><h5>--ignore-incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
Expand All @@ -47,7 +51,7 @@
</tr>
<tr>
<td><h5>--including-tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig) {
.username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
.password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(databaseName + "." + tableName);
.tableList(tableName);

mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
mySqlConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
private final boolean caseSensitive;
private final boolean syncToMultipleDB;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;

Expand All @@ -72,23 +73,38 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {

public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> computedColumns) {
this(serverTimeZone, caseSensitive, computedColumns, new TableNameConverter(caseSensitive));
this(
serverTimeZone,
caseSensitive,
computedColumns,
new TableNameConverter(caseSensitive),
false);
}

public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter) {
this(serverTimeZone, caseSensitive, Collections.emptyList(), tableNameConverter);
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
boolean syncToMultipleDB) {
this(
serverTimeZone,
caseSensitive,
Collections.emptyList(),
tableNameConverter,
syncToMultipleDB);
}

public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter) {
TableNameConverter tableNameConverter,
Boolean syncToMultipleDB) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.tableNameConverter = tableNameConverter;
this.syncToMultipleDB = syncToMultipleDB;
}

@Override
Expand All @@ -114,7 +130,12 @@ public void setRawEvent(String rawEvent) {
@Override
public String parseTableName() {
String tableName = payload.get("source").get("table").asText();
return tableNameConverter.convert(tableName);
if (syncToMultipleDB) {
String dbName = payload.get("source").get("db").asText();
return dbName + "." + tableNameConverter.convert(tableName);
} else {
return tableNameConverter.convert(tableName);
}
}

private void updateFieldTypes(JsonNode schema) {
Expand Down
Loading

0 comments on commit 42549b3

Please sign in to comment.