Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink-cdc] kafka_sync_database supports table name mapping when prefix and postfix could not fit the need. #4660

Merged
merged 10 commits into from
Dec 10, 2024
1 change: 1 addition & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ To use this feature through `flink run`, run the following shell command.
kafka_sync_database
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <table-name|name-regular-expr>] \
Expand Down
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
<td><h5>--ignore_incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
</tr>
<tr>
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CdcActionCommonUtils {
public static final String PULSAR_CONF = "pulsar_conf";
public static final String TABLE_PREFIX = "table_prefix";
public static final String TABLE_SUFFIX = "table_suffix";
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
public static final String TYPE_MAPPING = "type_mapping";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected MultiTablesSinkMode mode = COMBINED;
protected String tablePrefix = "";
protected String tableSuffix = "";
protected Map<String, String> tableMapping = new HashMap<>();
protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
Expand Down Expand Up @@ -97,6 +98,13 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
return this;
}

public SyncDatabaseActionBase withTableMapping(Map<String, String> tableMapping) {
if (tableMapping != null) {
this.tableMapping = tableMapping;
}
return this;
}

public SyncDatabaseActionBase includingTables(@Nullable String includingTables) {
if (includingTables != null) {
this.includingTables = includingTables;
Expand Down Expand Up @@ -155,7 +163,8 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix);
new TableNameConverter(
allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping);
Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
Expand All @@ -51,6 +52,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
protected void withParams(MultipleParameterToolAdapter params, T action) {
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.withPartitionKeyMultiple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.paimon.catalog.Identifier;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/** Used to convert a MySQL source table name to corresponding Paimon table name. */
public class TableNameConverter implements Serializable {
Expand All @@ -31,20 +33,31 @@ public class TableNameConverter implements Serializable {
private final boolean mergeShards;
private final String prefix;
private final String suffix;
private final Map<String, String> tableMapping;

public TableNameConverter(boolean caseSensitive) {
this(caseSensitive, true, "", "");
this(caseSensitive, true, "", "", null);
}

public TableNameConverter(
boolean caseSensitive, boolean mergeShards, String prefix, String suffix) {
boolean caseSensitive,
boolean mergeShards,
String prefix,
String suffix,
Map<String, String> tableMapping) {
this.caseSensitive = caseSensitive;
this.mergeShards = mergeShards;
this.prefix = prefix;
this.suffix = suffix;
this.tableMapping = lowerMapKey(tableMapping);
}

public String convert(String originName) {
if (tableMapping.containsKey(originName.toLowerCase())) {
String mappedName = tableMapping.get(originName.toLowerCase());
return caseSensitive ? mappedName : mappedName.toLowerCase();
}

String tableName = caseSensitive ? originName : originName.toLowerCase();
return prefix + tableName + suffix;
}
Expand All @@ -58,4 +71,18 @@ public String convert(Identifier originIdentifier) {
+ originIdentifier.getObjectName();
return convert(rawName);
}

private Map<String, String> lowerMapKey(Map<String, String> map) {
int size = map == null ? 0 : map.size();
JackeyLee007 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> lowerKeyMap = new HashMap<>(size);
if (size == 0) {
return lowerKeyMap;
}

for (String key : map.keySet()) {
lowerKeyMap.put(key.toLowerCase(), map.get(key));
}

return lowerKeyMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ protected void beforeBuildingSourceSink() throws Exception {
+ ", or MySQL database does not exist.");

TableNameConverter tableNameConverter =
new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix);
new TableNameConverter(
allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping);
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

/** Tests for {@link TableNameConverter}. */
public class TableNameConverterTest {

@Test
public void testConvertTableName() {
Map<String, String> tableMapping = new HashMap<>(1);
tableMapping.put("mapped_src", "mapped_TGT");
TableNameConverter caseConverter =
new TableNameConverter(true, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");

Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");

TableNameConverter noCaseConverter =
new TableNameConverter(false, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt");
Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");
}
}
Loading