From 544f565c35595a7fdd2b21aabb72a73401a30eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?= Date: Thu, 21 Nov 2024 20:29:47 +0800 Subject: [PATCH 1/4] support aliyun-json when sinking data from kafka with paimon-flink-action --- .../cdc/format/aliyun/AliyunDataFormat.java | 34 +++ .../aliyun/AliyunDataFormatFactory.java} | 23 +- .../cdc/format/aliyun/AliyunFieldParser.java | 117 ++++++++ .../cdc/format/aliyun/AliyunRecordParser.java | 260 ++++++++++++++++++ .../MessageQueueCdcTimestampExtractor.java | 4 + .../org.apache.paimon.factories.Factory | 1 + .../aliyun/AliyunJsonRecordParserTest.java | 167 +++++++++++ .../kafka/aliyun/table/event/event-delete.txt | 19 ++ .../kafka/aliyun/table/event/event-insert.txt | 19 ++ .../table/event/event-update-in-one.txt | 19 ++ 10 files changed, 655 insertions(+), 8 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/{ProcessRecordAttributesUtil.java => action/cdc/format/aliyun/AliyunDataFormatFactory.java} (59%) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java new file mode 100644 index 000000000000..ccbacdc2af5e --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat; +import org.apache.paimon.flink.action.cdc.format.RecordParserFactory; + +/** + * Supports the message queue's debezium json data format and provides definitions for the message + * queue's record json deserialization class and parsing class {@link AliyunRecordParser}. + */ +public class AliyunDataFormat extends AbstractJsonDataFormat { + + @Override + protected RecordParserFactory parser() { + return AliyunRecordParser::new; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java similarity index 59% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java index efe5e12b12d7..a07e2f205c90 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java @@ -16,16 +16,23 @@ * limitations under the License. */ -package org.apache.paimon.flink; +package org.apache.paimon.flink.action.cdc.format.aliyun; -import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.flink.action.cdc.format.DataFormatFactory; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +/** Factory to create {@link AliyunDataFormat}. */ +public class AliyunDataFormatFactory implements DataFormatFactory { -/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */ -public class ProcessRecordAttributesUtil { - public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {} + public static final String IDENTIFIER = "aliyun-json"; - public static void processWithOutput(RecordAttributes recordAttributes, Output output) {} + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public DataFormat create() { + return new AliyunDataFormat(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java new file mode 100644 index 000000000000..824ed9145943 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +/** Converts some special types such as enum、set、geometry. */ +public class AliyunFieldParser { + + protected static byte[] convertGeoType2WkbArray(byte[] mysqlGeomBytes) { + int sridLength = 4; + boolean hasSrid = false; + for (int i = 0; i < sridLength; ++i) { + if (mysqlGeomBytes[i] != 0) { + hasSrid = true; + break; + } + } + byte[] wkb; + if (hasSrid) { + wkb = new byte[mysqlGeomBytes.length]; + // byteOrder + geometry + System.arraycopy(mysqlGeomBytes, 4, wkb, 0, 5); + // srid + System.arraycopy(mysqlGeomBytes, 0, wkb, 5, 4); + // geometry + System.arraycopy(mysqlGeomBytes, 9, wkb, 9, wkb.length - 9); + + // set srid flag + if (wkb[0] == 0) { + // big endian + wkb[1] = (byte) (wkb[1] + 32); + } else { + wkb[4] = (byte) (wkb[4] + 32); + } + } else { + wkb = new byte[mysqlGeomBytes.length - 4]; + System.arraycopy(mysqlGeomBytes, 4, wkb, 0, wkb.length); + } + return wkb; + } + + protected static String convertSet(String value, String mysqlType) { + // mysql set type value can be filled with more than one, value is a bit string conversion + // from the long + int indexes = Integer.parseInt(value); + return getSetValuesByIndex(mysqlType, indexes); + } + + protected static String convertEnum(String value, String mysqlType) { + int elementIndex = Integer.parseInt(value); + // enum('a','b','c') + return getEnumValueByIndex(mysqlType, elementIndex); + } + + protected static String getEnumValueByIndex(String mysqlType, int elementIndex) { + String[] options = extractEnumValueByIndex(mysqlType); + + return options[elementIndex - 1]; + } + + protected static String getSetValuesByIndex(String mysqlType, int indexes) { + String[] options = extractSetValuesByIndex(mysqlType); + + StringBuilder sb = new StringBuilder(); + sb.append("["); + int index = 0; + boolean first = true; + int optionLen = options.length; + + while (indexes != 0L) { + if (indexes % 2L != 0) { + if (first) { + first = false; + } else { + sb.append(','); + } + if (index < optionLen) { + sb.append(options[index]); + } else { + throw new RuntimeException( + String.format( + "extractSetValues from mysqlType[%s],index:%d failed", + mysqlType, indexes)); + } + } + ++index; + indexes = indexes >>> 1; + } + sb.append("]"); + return sb.toString(); + } + + private static String[] extractSetValuesByIndex(String mysqlType) { + // set('x','y') + return mysqlType.substring(5, mysqlType.length() - 2).split("'\\s*,\\s*'"); + } + + private static String[] extractEnumValueByIndex(String mysqlType) { + // enum('x','y') + return mysqlType.substring(6, mysqlType.length() - 2).split("'\\s*,\\s*'"); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java new file mode 100644 index 000000000000..e31b282a76cb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser; +import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs; +import static org.apache.paimon.utils.JsonSerdeUtil.isNull; + +/** + * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON + * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data + * across databases. This parser extracts relevant information from the Canal-JSON format and + * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes + * captured in the database. + * + *

The class handles different types of database operations such as INSERT, UPDATE, and DELETE, + * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation. + * + *

Additionally, the parser supports schema extraction, which can be used to understand the + * structure of the incoming data and its corresponding field types. + */ +public class AliyunRecordParser extends AbstractJsonRecordParser { + + private static final Logger LOG = LoggerFactory.getLogger(AliyunRecordParser.class); + + private static final String FIELD_IS_DDL = "isDdl"; + private static final String FIELD_TYPE = "op"; + + private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE"; + private static final String OP_UPDATE_AFTER = "UPDATE_AFTER"; + private static final String OP_INSERT = "INSERT"; + private static final String OP_DELETE = "DELETE"; + + private static final String FIELD_PAYLOAD = "payload"; + private static final String FIELD_BEFORE = "before"; + private static final String FIELD_AFTER = "after"; + private static final String FIELD_COLUMN = "dataColumn"; + + private static final String FIELD_SCHEMA = "schema"; + private static final String FIELD_PK = "primaryKey"; + + @Override + protected boolean isDDL() { + JsonNode node = root.get(FIELD_IS_DDL); + return !isNull(node) && node.asBoolean(); + } + + public AliyunRecordParser(TypeMapping typeMapping, List computedColumns) { + super(typeMapping, computedColumns); + } + + @Override + protected String primaryField() { + return "schema.primaryKey"; + } + + @Override + protected String dataField() { + return "payload.dataColumn"; + } + + @Override + protected List extractPrimaryKeys() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + checkNotNull(schemaNode, FIELD_SCHEMA); + ArrayNode pkNode = getNodeAs(schemaNode, FIELD_PK, ArrayNode.class); + List pkFields = new ArrayList<>(); + pkNode.forEach( + pk -> { + if (isNull(pk)) { + throw new IllegalArgumentException( + String.format("Primary key cannot be null: %s", pk)); + } + + pkFields.add(pk.asText()); + }); + return pkFields; + } + + @Override + public List extractRecords() { + if (isDDL()) { + return Collections.emptyList(); + } + + List records = new ArrayList<>(); + + JsonNode payload = root.get(FIELD_PAYLOAD); + checkNotNull(payload, FIELD_PAYLOAD); + + String type = payload.get(FIELD_TYPE).asText(); + + RowKind rowKind = null; + String field = null; + switch (type) { + case OP_UPDATE_BEFORE: + rowKind = RowKind.UPDATE_BEFORE; + field = FIELD_BEFORE; + break; + case OP_UPDATE_AFTER: + rowKind = RowKind.UPDATE_AFTER; + field = FIELD_AFTER; + break; + case OP_INSERT: + rowKind = RowKind.INSERT; + field = FIELD_AFTER; + break; + case OP_DELETE: + rowKind = RowKind.DELETE; + field = FIELD_BEFORE; + break; + default: + throw new UnsupportedOperationException("Unknown record operation: " + type); + } + + JsonNode container = payload.get(field); + checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field)); + + JsonNode data = getNodeAs(container, FIELD_COLUMN, JsonNode.class); + checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field, FIELD_COLUMN)); + + processRecord(data, rowKind, records); + + return records; + } + + @Override + protected Map extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) { + + Map recordMap = + JsonSerdeUtil.convertValue(record, new TypeReference>() {}); + Map rowData = new HashMap<>(); + + fillDefaultTypes(record, rowTypeBuilder); + for (Map.Entry entry : recordMap.entrySet()) { + rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null)); + } + + evalComputedColumns(rowData, rowTypeBuilder); + return rowData; + } + + @Override + protected String format() { + return "aliyun-json"; + } + + @Nullable + @Override + protected String getTableName() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + if (isNull(schemaNode)) { + return null; + } + JsonNode sourceNode = schemaNode.get("source"); + if (isNull(sourceNode)) { + return null; + } + + JsonNode tableNode = sourceNode.get("tableName"); + if (isNull(tableNode)) { + return null; + } + return tableNode.asText(); + } + + @Nullable + @Override + protected String getDatabaseName() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + if (isNull(schemaNode)) { + return null; + } + JsonNode sourceNode = schemaNode.get("source"); + if (isNull(sourceNode)) { + return null; + } + JsonNode databaseNode = sourceNode.get("dbName"); + if (isNull(databaseNode)) { + return null; + } + return databaseNode.asText(); + } + + private Map matchOldRecords(ArrayNode newData, ArrayNode oldData) { + return IntStream.range(0, newData.size()) + .boxed() + .collect(Collectors.toMap(newData::get, oldData::get)); + } + + private String transformValue(@Nullable String oldValue, String shortType, String mySqlType) { + if (oldValue == null) { + return null; + } + + if (MySqlTypeUtils.isSetType(shortType)) { + return AliyunFieldParser.convertSet(oldValue, mySqlType); + } + + if (MySqlTypeUtils.isEnumType(shortType)) { + return AliyunFieldParser.convertEnum(oldValue, mySqlType); + } + + if (MySqlTypeUtils.isGeoType(shortType)) { + try { + byte[] wkb = + AliyunFieldParser.convertGeoType2WkbArray( + oldValue.getBytes(StandardCharsets.ISO_8859_1)); + return MySqlTypeUtils.convertWkbArray(wkb); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to convert %s to geometry JSON.", oldValue), e); + } + } + return oldValue; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java index 8a9a28453bad..5bf2fefc1b78 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java @@ -54,6 +54,10 @@ public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcess } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) { // Dbz json return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms"); + } else if (JsonSerdeUtil.isNodeExists(record, "payload", "timestamp")) { + // Aliyun json + return JsonSerdeUtil.extractValue( + record, Long.class, "payload", "timestamp", "systemTime"); } throw new RuntimeException( String.format( diff --git a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 17b8b29a2009..1b30c7ab6396 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -27,6 +27,7 @@ org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory ### message queue data format factories +org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java new file mode 100644 index 000000000000..f06268d700e5 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; +import org.apache.paimon.flink.sink.cdc.CdcRecord; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.RowKind; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Test for AliyunJsonRecordParser. */ +public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase { + + private static final Logger log = LoggerFactory.getLogger(AliyunJsonRecordParserTest.class); + private static List insertList = new ArrayList<>(); + private static List updateList = new ArrayList<>(); + private static List deleteList = new ArrayList<>(); + + private static ObjectMapper objMapper = new ObjectMapper(); + + @Before + public void setup() { + String insertRes = "kafka/aliyun/table/event/event-insert.txt"; + String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt"; + String deleteRes = "kafka/aliyun/table/event/event-delete.txt"; + URL url; + try { + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> insertList.add(e)); + + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> updateList.add(e)); + + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> deleteList.add(e)); + + } catch (Exception e) { + log.error("Fail to init aliyun-json cases", e); + } + } + + @Test + public void extractInsertRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : insertList) { + // 将json解析为JsonNode对象 + JsonNode rootNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.INSERT); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractUpdateRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : updateList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.UPDATE_AFTER); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractDeleteRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : deleteList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.DELETE); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt new file mode 100644 index 000000000000..ebae6608a755 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":1,"val":"1.100000","name":"a","create_time":1731661114000}},"after":null,"sequenceId":"1731663842292000000","timestamp":{"eventTime":1731662085000,"systemTime":1731663848953,"checkpointTime":1731662085000},"op":"DELETE","ddl":null},"version":"0.0.1"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt new file mode 100644 index 000000000000..d1cd34e5e6ac --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"payload":{"after":{"dataColumn":{"create_time":1731661114000,"id":2,"name":"a","val":"1.100000"}},"before":null,"ddl":null,"op":"INSERT","sequenceId":"-1","timestamp":{"checkpointTime":-1,"eventTime":-1,"systemTime":1731661820245}},"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbName":"bigdata_test","dbType":"MySQL","tableName":"sync_test_table"}},"version":"0.0.1"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt new file mode 100644 index 000000000000..9acf6309cc48 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + {"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":2,"val":"1.100000","name":"a","create_time":1731661114000}},"after":{"dataColumn":{"id":2,"val":"2.200000","name":"a","create_time":1731661114000}},"sequenceId":"1731663842292000001","timestamp":{"eventTime":1731662097000,"systemTime":1731663848979,"checkpointTime":1731662097000},"op":"UPDATE_AFTER","ddl":null},"version":"0.0.1"} \ No newline at end of file From e53e1d267a6de96b1cee138eb056181ea31fb0c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?= Date: Sat, 7 Dec 2024 20:20:08 +0800 Subject: [PATCH 2/4] kafka_sync_database supports table name mapping when prefix and postfix could not fit the need. --- .../action/cdc/CdcActionCommonUtils.java | 1 + .../action/cdc/SyncDatabaseActionBase.java | 11 ++++- .../cdc/SyncDatabaseActionFactoryBase.java | 2 + .../flink/action/cdc/TableNameConverter.java | 31 ++++++++++++- .../cdc/mysql/MySqlSyncDatabaseAction.java | 3 +- .../action/cdc/TableNameConverterTest.java | 46 +++++++++++++++++++ 6 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 8f96022bde35..83891c90b8e1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -56,6 +56,7 @@ public class CdcActionCommonUtils { public static final String PULSAR_CONF = "pulsar_conf"; public static final String TABLE_PREFIX = "table_prefix"; public static final String TABLE_SUFFIX = "table_suffix"; + public static final String TABLE_MAPPING = "table_mapping"; public static final String INCLUDING_TABLES = "including_tables"; public static final String EXCLUDING_TABLES = "excluding_tables"; public static final String TYPE_MAPPING = "type_mapping"; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 4ab56bdcf118..ac3483ac23bf 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -52,6 +52,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { protected MultiTablesSinkMode mode = COMBINED; protected String tablePrefix = ""; protected String tableSuffix = ""; + protected Map tableMapping = new HashMap<>(); protected String includingTables = ".*"; protected List partitionKeys = new ArrayList<>(); protected List primaryKeys = new ArrayList<>(); @@ -97,6 +98,13 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) { return this; } + public SyncDatabaseActionBase withTableMapping(Map tableMapping) { + if (tableMapping != null) { + this.tableMapping = tableMapping; + } + return this; + } + public SyncDatabaseActionBase includingTables(@Nullable String includingTables) { if (includingTables != null) { this.includingTables = includingTables; @@ -155,7 +163,8 @@ protected EventParser.Factory buildEventParserFactory() Pattern excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); TableNameConverter tableNameConverter = - new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter( + allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); Set createdTables; try { createdTables = new HashSet<>(catalog.listTables(database)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index e7a386979d4e..2135f2a28112 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -29,6 +29,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; @@ -51,6 +52,7 @@ public Optional create(MultipleParameterToolAdapter params) { protected void withParams(MultipleParameterToolAdapter params, T action) { action.withTablePrefix(params.get(TABLE_PREFIX)) .withTableSuffix(params.get(TABLE_SUFFIX)) + .withTableMapping(optionalConfigMap(params, TABLE_MAPPING)) .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) .withPartitionKeyMultiple( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java index 67c70aa58cdb..033339070f9a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java @@ -21,6 +21,8 @@ import org.apache.paimon.catalog.Identifier; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; /** Used to convert a MySQL source table name to corresponding Paimon table name. */ public class TableNameConverter implements Serializable { @@ -31,20 +33,31 @@ public class TableNameConverter implements Serializable { private final boolean mergeShards; private final String prefix; private final String suffix; + private final Map tableMapping; public TableNameConverter(boolean caseSensitive) { - this(caseSensitive, true, "", ""); + this(caseSensitive, true, "", "", null); } public TableNameConverter( - boolean caseSensitive, boolean mergeShards, String prefix, String suffix) { + boolean caseSensitive, + boolean mergeShards, + String prefix, + String suffix, + Map tableMapping) { this.caseSensitive = caseSensitive; this.mergeShards = mergeShards; this.prefix = prefix; this.suffix = suffix; + this.tableMapping = lowerMapKey(tableMapping); } public String convert(String originName) { + if (tableMapping.containsKey(originName.toLowerCase())) { + String mappedName = tableMapping.get(originName.toLowerCase()); + return caseSensitive ? mappedName : mappedName.toLowerCase(); + } + String tableName = caseSensitive ? originName : originName.toLowerCase(); return prefix + tableName + suffix; } @@ -58,4 +71,18 @@ public String convert(Identifier originIdentifier) { + originIdentifier.getObjectName(); return convert(rawName); } + + private Map lowerMapKey(Map map) { + int size = map == null ? 0 : map.size(); + Map lowerKeyMap = new HashMap(size); + if (size == 0) { + return lowerKeyMap; + } + + for (String key : map.keySet()) { + lowerKeyMap.put(key.toLowerCase(), map.get(key)); + } + + return lowerKeyMap; + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index f8ea8cdc4438..235b3f9a3235 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -138,7 +138,8 @@ protected void beforeBuildingSourceSink() throws Exception { + ", or MySQL database does not exist."); TableNameConverter tableNameConverter = - new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter( + allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); for (JdbcTableInfo tableInfo : jdbcTableInfos) { Identifier identifier = Identifier.create( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java new file mode 100644 index 000000000000..3140c5f6960f --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** Tests for {@link TableNameConverter}. */ +public class TableNameConverterTest { + private Map tableMapping; + + @Test + public void testConvertTableName() { + Map tableMapping = new HashMap<>(); + tableMapping.put("mapped_src", "mapped_TGT"); + TableNameConverter caseConverter = + new TableNameConverter(true, true, "pre_", "_pos", tableMapping); + Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT"); + + Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + + TableNameConverter incaseConverter = + new TableNameConverter(false, true, "pre_", "_pos", tableMapping); + Assert.assertEquals(incaseConverter.convert("mapped_src"), "mapped_tgt"); + Assert.assertEquals(incaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + } +} From 80d294f989052953cc3ff6ff57930714911366cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?= Date: Mon, 9 Dec 2024 14:06:05 +0800 Subject: [PATCH 3/4] Adapt code according to code review result --- .../paimon/flink/action/cdc/TableNameConverter.java | 2 +- .../paimon/flink/action/cdc/TableNameConverterTest.java | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java index 033339070f9a..4eca8b903ed1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java @@ -74,7 +74,7 @@ public String convert(Identifier originIdentifier) { private Map lowerMapKey(Map map) { int size = map == null ? 0 : map.size(); - Map lowerKeyMap = new HashMap(size); + Map lowerKeyMap = new HashMap<>(size); if (size == 0) { return lowerKeyMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java index 3140c5f6960f..dfbe32e3d398 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java @@ -26,11 +26,10 @@ /** Tests for {@link TableNameConverter}. */ public class TableNameConverterTest { - private Map tableMapping; @Test public void testConvertTableName() { - Map tableMapping = new HashMap<>(); + Map tableMapping = new HashMap<>(1); tableMapping.put("mapped_src", "mapped_TGT"); TableNameConverter caseConverter = new TableNameConverter(true, true, "pre_", "_pos", tableMapping); @@ -38,9 +37,9 @@ public void testConvertTableName() { Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); - TableNameConverter incaseConverter = + TableNameConverter noCaseConverter = new TableNameConverter(false, true, "pre_", "_pos", tableMapping); - Assert.assertEquals(incaseConverter.convert("mapped_src"), "mapped_tgt"); - Assert.assertEquals(incaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt"); + Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); } } From edcb3e7ad97b6eff07f656b15e8ddde86127d270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?= Date: Mon, 9 Dec 2024 20:27:25 +0800 Subject: [PATCH 4/4] Supply documents for kafka_sync_database option --table_mapping --- docs/content/cdc-ingestion/kafka-cdc.md | 1 + docs/layouts/shortcodes/generated/kafka_sync_database.html | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index f57260275ea8..b037937c554f 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -198,6 +198,7 @@ To use this feature through `flink run`, run the following shell command. kafka_sync_database --warehouse \ --database \ + [--table_mapping =] \ [--table_prefix ] \ [--table_suffix ] \ [--including_tables ] \ diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 888901991d69..6c90f1d7f7d8 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -37,6 +37,10 @@

--ignore_incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. + +
--table_mapping
+ The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix". +
--table_prefix
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".