From b83ec2725932bacda83075e687b85666dc711cb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F?= Date: Thu, 22 Aug 2024 17:59:56 +0800 Subject: [PATCH] Extended functionality, corresponding to Issue 3633 --- .../generated/kafka_sync_database.html | 8 ++++++ .../apache/paimon/utils/ParameterUtils.java | 16 +++++++++++ .../action/cdc/CdcActionCommonUtils.java | 1 + .../action/cdc/SyncDatabaseActionBase.java | 11 ++++++++ .../cdc/SyncDatabaseActionFactoryBase.java | 3 +++ .../flink/sink/cdc/NewTableSchemaBuilder.java | 17 +++++++++++- .../flink/action/cdc/CdcActionITCaseBase.java | 26 ++++++++++++++++++ .../KafkaCanalSyncDatabaseActionITCase.java | 27 +++++++++++++++++++ .../canal-data-1.txt | 21 +++++++++++++++ .../paimon/flink/action/ActionFactory.java | 14 ++++++++++ 10 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/multipletablepartitionkeys/canal-data-1.txt diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index c598e950b211..888901991d69 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -74,6 +74,14 @@ The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm". If the keys are not in source table, the sink table won't set partition keys. + +
--multiple_table_partition_keys
+ The partition keys for each different Paimon table. If there are multiple partition keys, connect them with comma, for example +
  • --multiple_table_partition_keys tableName1=col1,col2.col3
  • +
  • --multiple_table_partition_keys tableName2=col4,col5.col6
  • +
  • --multiple_table_partition_keys tableName3=col7,col8.col9
  • + If the keys are not in source table, the sink table won't set partition keys. +
    --primary_keys
    The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java index 5d12cf27f705..6efbeb0b3fce 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java @@ -54,4 +54,20 @@ public static void parseKeyValueString(Map map, String kvString) } map.put(kv[0].trim(), kv[1].trim()); } + + public static void parseKeyValueList(Map> mapList, String kvString) { + String[] kv = kvString.split("=", 2); + if (kv.length != 2) { + throw new IllegalArgumentException( + String.format( + "Invalid key-value string '%s'. Please use format 'key=value'", + kvString)); + } + String[] valueArr = kv[1].trim().split(","); + List valueList = new ArrayList<>(); + for (String value : valueArr) { + valueList.add(value); + } + mapList.put(kv[0].trim(), valueList); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 3e85c7c88bf8..8f96022bde35 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -63,6 +63,7 @@ public class CdcActionCommonUtils { public static final String PRIMARY_KEYS = "primary_keys"; public static final String COMPUTED_COLUMN = "computed_column"; public static final String METADATA_COLUMN = "metadata_column"; + public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys"; public static void assertSchemaCompatible( TableSchema paimonSchema, List sourceTableFields) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 6c62bcc0c21f..c513aff10e50 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { protected List primaryKeys = new ArrayList<>(); @Nullable protected String excludingTables; protected List tables = new ArrayList<>(); + protected Map> partitionKeyMultiple = new HashMap<>(); public SyncDatabaseActionBase( String warehouse, @@ -130,6 +132,14 @@ protected FlatMapFunction recordParse() Collections.emptyList(), typeMapping, metadataConverters); } + public SyncDatabaseActionBase withPartitionKeyMultiple( + Map> partitionKeyMultiple) { + if (partitionKeyMultiple != null) { + this.partitionKeyMultiple = partitionKeyMultiple; + } + return this; + } + @Override protected EventParser.Factory buildEventParserFactory() { NewTableSchemaBuilder schemaBuilder = @@ -138,6 +148,7 @@ protected EventParser.Factory buildEventParserFactory() allowUpperCase, partitionKeys, primaryKeys, + partitionKeyMultiple, metadataConverters); Pattern includingPattern = Pattern.compile(includingTables); Pattern excludingPattern = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index 996ecf4c73cb..e7a386979d4e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -26,6 +26,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; @@ -52,6 +53,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) { .withTableSuffix(params.get(TABLE_SUFFIX)) .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) + .withPartitionKeyMultiple( + optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS)) .withPartitionKeys(); if (params.has(PARTITION_KEYS)) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java index 33dfd576fde6..a4c440e26887 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java @@ -22,7 +22,9 @@ import org.apache.paimon.schema.Schema; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -37,18 +39,21 @@ public class NewTableSchemaBuilder implements Serializable { private final List partitionKeys; private final List primaryKeys; private final CdcMetadataConverter[] metadataConverters; + protected Map> partitionKeyMultiple = new HashMap<>(); public NewTableSchemaBuilder( Map tableConfig, boolean caseSensitive, List partitionKeys, List primaryKeys, + Map> partitionKeyMultiple, CdcMetadataConverter[] metadataConverters) { this.tableConfig = tableConfig; this.caseSensitive = caseSensitive; this.metadataConverters = metadataConverters; this.partitionKeys = partitionKeys; this.primaryKeys = primaryKeys; + this.partitionKeyMultiple = partitionKeyMultiple; } public Optional build(RichCdcMultiplexRecord record) { @@ -59,10 +64,20 @@ public Optional build(RichCdcMultiplexRecord record) { record.primaryKeys(), Collections.emptyMap(), null); + List specifiedPartitionKeys = new ArrayList<>(); + if (partitionKeys != null && !partitionKeys.isEmpty()) { + specifiedPartitionKeys = partitionKeys; + } else { + List partitionKeyMultipleList = partitionKeyMultiple.get(record.tableName()); + if (partitionKeyMultipleList != null && !partitionKeyMultipleList.isEmpty()) { + specifiedPartitionKeys = partitionKeyMultipleList; + } + } + return Optional.of( buildPaimonSchema( record.tableName(), - partitionKeys, + specifiedPartitionKeys, primaryKeys, Collections.emptyList(), tableConfig, diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index e8ac05e03975..08289569086a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory; @@ -30,6 +31,7 @@ import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseActionFactory; import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableActionFactory; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataField; @@ -51,7 +53,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -110,6 +114,18 @@ protected void assertTableNotExists(String... tableNames) throws Exception { assertThat(catalog.listTables(database)).doesNotContain(tableNames); } + protected void assertTablePartitionKeys(Map partitionKeyMultiple) + throws Exception { + // get All tableNames; + Set tableNames = partitionKeyMultiple.keySet(); + for (String tableName : tableNames) { + Table table = catalog.getTable(new Identifier(database, tableName)); + String actual = table.partitionKeys().stream().collect(Collectors.joining(",")); + String expected = partitionKeyMultiple.get(tableName); + assertThat(actual).isEqualTo(expected); + } + } + protected void waitForResult( List expected, FileStoreTable table, RowType rowType, List primaryKeys) throws Exception { @@ -366,6 +382,7 @@ protected abstract class SyncDatabaseActionBuilder partitionKeys = new ArrayList<>(); private final List primaryKeys = new ArrayList<>(); private final List metadataColumn = new ArrayList<>(); + protected Map partitionKeyMultiple = new HashMap<>(); public SyncDatabaseActionBuilder(Class clazz, Map sourceConfig) { this.clazz = clazz; @@ -437,6 +454,14 @@ public SyncDatabaseActionBuilder withMetadataColumn(List metadataColu return this; } + public SyncDatabaseActionBuilder withPartitionKeyMultiple( + Map partitionKeyMultiple) { + if (partitionKeyMultiple != null) { + this.partitionKeyMultiple = partitionKeyMultiple; + } + return this; + } + public T build() { List args = new ArrayList<>( @@ -461,6 +486,7 @@ public T build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); args.addAll(listToArgs("--partition-keys", partitionKeys)); + args.addAll(mapToArgs("--multiple-table-partition-keys", partitionKeyMultiple)); args.addAll(listToArgs("--primary-keys", primaryKeys)); args.addAll(listToArgs("--metadata-column", metadataColumn)); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 025a6cce05b8..0e1f4e72ea8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -29,8 +29,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -640,4 +642,29 @@ public void testSpecifyKeys() throws Exception { rowType2, Collections.singletonList("k")); } + + @Test + @Timeout(60) + public void testMultipleTablePartitionKeys() throws Exception { + final String topic = "multiple-table-partition-keys"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka( + topic, "kafka/canal/database/multipletablepartitionkeys/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); + kafkaConfig.put(TOPIC.key(), topic); + Map partitionKeyMultiple = new HashMap<>(); + partitionKeyMultiple.put("tt_1", "k1,k2"); + partitionKeyMultiple.put("tt_2", "k1,k3"); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .withPartitionKeyMultiple(partitionKeyMultiple) + .build(); + runActionWithDefaultEnv(action); + // check paimon tables + List tableNames = new ArrayList<>(partitionKeyMultiple.keySet()); + waitingTables(tableNames); + assertTablePartitionKeys(partitionKeyMultiple); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/multipletablepartitionkeys/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/multipletablepartitionkeys/canal-data-1.txt new file mode 100644 index 000000000000..4df63875a930 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/multipletablepartitionkeys/canal-data-1.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k1":"1","k2":"2","k3":"3","v0":"one"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":76,"isDdl":false,"mysqlType":{"k1":"INT","k2":"INT","k3":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1","k2","k3"],"sql":"","sqlType":{"k1":4,"k2":4,"k3":4,"v0":12},"table":"tt_1","ts":1684770066165,"type":"INSERT"} +{"data":[{"k1":"1","k2":"2","k3":"3","v0":"three"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":78,"isDdl":false,"mysqlType":{"k1":"INT","k2":"INT","k3":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1","k2","k3"],"sql":"","sqlType":{"k1":4,"k2":4,"k3":4,"v0":12},"table":"tt_2","ts":1684770066821,"type":"INSERT"} + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java index aeacc8ce68ee..43719f715d9d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java @@ -38,6 +38,7 @@ import java.util.Optional; import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; +import static org.apache.paimon.utils.ParameterUtils.parseKeyValueList; import static org.apache.paimon.utils.ParameterUtils.parseKeyValueString; /** Factory to create {@link Action}. */ @@ -164,4 +165,17 @@ default String getRequiredValue(MultipleParameterToolAdapter params, String key) checkRequiredArgument(params, key); return params.get(key); } + + default Map> optionalConfigMapList( + MultipleParameterToolAdapter params, String key) { + if (!params.has(key)) { + return Collections.emptyMap(); + } + + Map> config = new HashMap<>(); + for (String kvString : params.getMultiParameter(key)) { + parseKeyValueList(config, kvString); + } + return config; + } }