From 89f97c7baf59b72a5bc747e6f5391973fac5d3f5 Mon Sep 17 00:00:00 2001 From: JackeyLee007 Date: Thu, 21 Nov 2024 22:22:56 +0800 Subject: [PATCH] [cdc] support aliyun-json when sinking data from kafka with paimon-flink-action (#4570) --- .../cdc/format/aliyun/AliyunDataFormat.java | 34 +++ .../aliyun/AliyunDataFormatFactory.java} | 23 +- .../cdc/format/aliyun/AliyunFieldParser.java | 117 ++++++++ .../cdc/format/aliyun/AliyunRecordParser.java | 260 ++++++++++++++++++ .../MessageQueueCdcTimestampExtractor.java | 4 + .../org.apache.paimon.factories.Factory | 1 + .../aliyun/AliyunJsonRecordParserTest.java | 167 +++++++++++ .../kafka/aliyun/table/event/event-delete.txt | 19 ++ .../kafka/aliyun/table/event/event-insert.txt | 19 ++ .../table/event/event-update-in-one.txt | 19 ++ 10 files changed, 655 insertions(+), 8 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/{ProcessRecordAttributesUtil.java => action/cdc/format/aliyun/AliyunDataFormatFactory.java} (59%) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java new file mode 100644 index 000000000000..ccbacdc2af5e --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormat.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat; +import org.apache.paimon.flink.action.cdc.format.RecordParserFactory; + +/** + * Supports the message queue's debezium json data format and provides definitions for the message + * queue's record json deserialization class and parsing class {@link AliyunRecordParser}. + */ +public class AliyunDataFormat extends AbstractJsonDataFormat { + + @Override + protected RecordParserFactory parser() { + return AliyunRecordParser::new; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java similarity index 59% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java index efe5e12b12d7..a07e2f205c90 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunDataFormatFactory.java @@ -16,16 +16,23 @@ * limitations under the License. */ -package org.apache.paimon.flink; +package org.apache.paimon.flink.action.cdc.format.aliyun; -import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.flink.action.cdc.format.DataFormatFactory; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +/** Factory to create {@link AliyunDataFormat}. */ +public class AliyunDataFormatFactory implements DataFormatFactory { -/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */ -public class ProcessRecordAttributesUtil { - public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {} + public static final String IDENTIFIER = "aliyun-json"; - public static void processWithOutput(RecordAttributes recordAttributes, Output output) {} + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public DataFormat create() { + return new AliyunDataFormat(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java new file mode 100644 index 000000000000..824ed9145943 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunFieldParser.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +/** Converts some special types such as enum、set、geometry. */ +public class AliyunFieldParser { + + protected static byte[] convertGeoType2WkbArray(byte[] mysqlGeomBytes) { + int sridLength = 4; + boolean hasSrid = false; + for (int i = 0; i < sridLength; ++i) { + if (mysqlGeomBytes[i] != 0) { + hasSrid = true; + break; + } + } + byte[] wkb; + if (hasSrid) { + wkb = new byte[mysqlGeomBytes.length]; + // byteOrder + geometry + System.arraycopy(mysqlGeomBytes, 4, wkb, 0, 5); + // srid + System.arraycopy(mysqlGeomBytes, 0, wkb, 5, 4); + // geometry + System.arraycopy(mysqlGeomBytes, 9, wkb, 9, wkb.length - 9); + + // set srid flag + if (wkb[0] == 0) { + // big endian + wkb[1] = (byte) (wkb[1] + 32); + } else { + wkb[4] = (byte) (wkb[4] + 32); + } + } else { + wkb = new byte[mysqlGeomBytes.length - 4]; + System.arraycopy(mysqlGeomBytes, 4, wkb, 0, wkb.length); + } + return wkb; + } + + protected static String convertSet(String value, String mysqlType) { + // mysql set type value can be filled with more than one, value is a bit string conversion + // from the long + int indexes = Integer.parseInt(value); + return getSetValuesByIndex(mysqlType, indexes); + } + + protected static String convertEnum(String value, String mysqlType) { + int elementIndex = Integer.parseInt(value); + // enum('a','b','c') + return getEnumValueByIndex(mysqlType, elementIndex); + } + + protected static String getEnumValueByIndex(String mysqlType, int elementIndex) { + String[] options = extractEnumValueByIndex(mysqlType); + + return options[elementIndex - 1]; + } + + protected static String getSetValuesByIndex(String mysqlType, int indexes) { + String[] options = extractSetValuesByIndex(mysqlType); + + StringBuilder sb = new StringBuilder(); + sb.append("["); + int index = 0; + boolean first = true; + int optionLen = options.length; + + while (indexes != 0L) { + if (indexes % 2L != 0) { + if (first) { + first = false; + } else { + sb.append(','); + } + if (index < optionLen) { + sb.append(options[index]); + } else { + throw new RuntimeException( + String.format( + "extractSetValues from mysqlType[%s],index:%d failed", + mysqlType, indexes)); + } + } + ++index; + indexes = indexes >>> 1; + } + sb.append("]"); + return sb.toString(); + } + + private static String[] extractSetValuesByIndex(String mysqlType) { + // set('x','y') + return mysqlType.substring(5, mysqlType.length() - 2).split("'\\s*,\\s*'"); + } + + private static String[] extractEnumValueByIndex(String mysqlType) { + // enum('x','y') + return mysqlType.substring(6, mysqlType.length() - 2).split("'\\s*,\\s*'"); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java new file mode 100644 index 000000000000..e31b282a76cb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser; +import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs; +import static org.apache.paimon.utils.JsonSerdeUtil.isNull; + +/** + * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON + * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data + * across databases. This parser extracts relevant information from the Canal-JSON format and + * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes + * captured in the database. + * + *

The class handles different types of database operations such as INSERT, UPDATE, and DELETE, + * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation. + * + *

Additionally, the parser supports schema extraction, which can be used to understand the + * structure of the incoming data and its corresponding field types. + */ +public class AliyunRecordParser extends AbstractJsonRecordParser { + + private static final Logger LOG = LoggerFactory.getLogger(AliyunRecordParser.class); + + private static final String FIELD_IS_DDL = "isDdl"; + private static final String FIELD_TYPE = "op"; + + private static final String OP_UPDATE_BEFORE = "UPDATE_BEFORE"; + private static final String OP_UPDATE_AFTER = "UPDATE_AFTER"; + private static final String OP_INSERT = "INSERT"; + private static final String OP_DELETE = "DELETE"; + + private static final String FIELD_PAYLOAD = "payload"; + private static final String FIELD_BEFORE = "before"; + private static final String FIELD_AFTER = "after"; + private static final String FIELD_COLUMN = "dataColumn"; + + private static final String FIELD_SCHEMA = "schema"; + private static final String FIELD_PK = "primaryKey"; + + @Override + protected boolean isDDL() { + JsonNode node = root.get(FIELD_IS_DDL); + return !isNull(node) && node.asBoolean(); + } + + public AliyunRecordParser(TypeMapping typeMapping, List computedColumns) { + super(typeMapping, computedColumns); + } + + @Override + protected String primaryField() { + return "schema.primaryKey"; + } + + @Override + protected String dataField() { + return "payload.dataColumn"; + } + + @Override + protected List extractPrimaryKeys() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + checkNotNull(schemaNode, FIELD_SCHEMA); + ArrayNode pkNode = getNodeAs(schemaNode, FIELD_PK, ArrayNode.class); + List pkFields = new ArrayList<>(); + pkNode.forEach( + pk -> { + if (isNull(pk)) { + throw new IllegalArgumentException( + String.format("Primary key cannot be null: %s", pk)); + } + + pkFields.add(pk.asText()); + }); + return pkFields; + } + + @Override + public List extractRecords() { + if (isDDL()) { + return Collections.emptyList(); + } + + List records = new ArrayList<>(); + + JsonNode payload = root.get(FIELD_PAYLOAD); + checkNotNull(payload, FIELD_PAYLOAD); + + String type = payload.get(FIELD_TYPE).asText(); + + RowKind rowKind = null; + String field = null; + switch (type) { + case OP_UPDATE_BEFORE: + rowKind = RowKind.UPDATE_BEFORE; + field = FIELD_BEFORE; + break; + case OP_UPDATE_AFTER: + rowKind = RowKind.UPDATE_AFTER; + field = FIELD_AFTER; + break; + case OP_INSERT: + rowKind = RowKind.INSERT; + field = FIELD_AFTER; + break; + case OP_DELETE: + rowKind = RowKind.DELETE; + field = FIELD_BEFORE; + break; + default: + throw new UnsupportedOperationException("Unknown record operation: " + type); + } + + JsonNode container = payload.get(field); + checkNotNull(container, String.format("%s.%s", FIELD_PAYLOAD, field)); + + JsonNode data = getNodeAs(container, FIELD_COLUMN, JsonNode.class); + checkNotNull(data, String.format("%s.%s.%s", FIELD_PAYLOAD, field, FIELD_COLUMN)); + + processRecord(data, rowKind, records); + + return records; + } + + @Override + protected Map extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) { + + Map recordMap = + JsonSerdeUtil.convertValue(record, new TypeReference>() {}); + Map rowData = new HashMap<>(); + + fillDefaultTypes(record, rowTypeBuilder); + for (Map.Entry entry : recordMap.entrySet()) { + rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null)); + } + + evalComputedColumns(rowData, rowTypeBuilder); + return rowData; + } + + @Override + protected String format() { + return "aliyun-json"; + } + + @Nullable + @Override + protected String getTableName() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + if (isNull(schemaNode)) { + return null; + } + JsonNode sourceNode = schemaNode.get("source"); + if (isNull(sourceNode)) { + return null; + } + + JsonNode tableNode = sourceNode.get("tableName"); + if (isNull(tableNode)) { + return null; + } + return tableNode.asText(); + } + + @Nullable + @Override + protected String getDatabaseName() { + JsonNode schemaNode = root.get(FIELD_SCHEMA); + if (isNull(schemaNode)) { + return null; + } + JsonNode sourceNode = schemaNode.get("source"); + if (isNull(sourceNode)) { + return null; + } + JsonNode databaseNode = sourceNode.get("dbName"); + if (isNull(databaseNode)) { + return null; + } + return databaseNode.asText(); + } + + private Map matchOldRecords(ArrayNode newData, ArrayNode oldData) { + return IntStream.range(0, newData.size()) + .boxed() + .collect(Collectors.toMap(newData::get, oldData::get)); + } + + private String transformValue(@Nullable String oldValue, String shortType, String mySqlType) { + if (oldValue == null) { + return null; + } + + if (MySqlTypeUtils.isSetType(shortType)) { + return AliyunFieldParser.convertSet(oldValue, mySqlType); + } + + if (MySqlTypeUtils.isEnumType(shortType)) { + return AliyunFieldParser.convertEnum(oldValue, mySqlType); + } + + if (MySqlTypeUtils.isGeoType(shortType)) { + try { + byte[] wkb = + AliyunFieldParser.convertGeoType2WkbArray( + oldValue.getBytes(StandardCharsets.ISO_8859_1)); + return MySqlTypeUtils.convertWkbArray(wkb); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to convert %s to geometry JSON.", oldValue), e); + } + } + return oldValue; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java index 8a9a28453bad..5bf2fefc1b78 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/MessageQueueCdcTimestampExtractor.java @@ -54,6 +54,10 @@ public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcess } else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) { // Dbz json return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms"); + } else if (JsonSerdeUtil.isNodeExists(record, "payload", "timestamp")) { + // Aliyun json + return JsonSerdeUtil.extractValue( + record, Long.class, "payload", "timestamp", "systemTime"); } throw new RuntimeException( String.format( diff --git a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 17b8b29a2009..1b30c7ab6396 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -27,6 +27,7 @@ org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory ### message queue data format factories +org.apache.paimon.flink.action.cdc.format.aliyun.AliyunDataFormatFactory org.apache.paimon.flink.action.cdc.format.canal.CanalDataFormatFactory org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroDataFormatFactory org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java new file mode 100644 index 000000000000..f06268d700e5 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.aliyun; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; +import org.apache.paimon.flink.sink.cdc.CdcRecord; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.RowKind; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Test for AliyunJsonRecordParser. */ +public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase { + + private static final Logger log = LoggerFactory.getLogger(AliyunJsonRecordParserTest.class); + private static List insertList = new ArrayList<>(); + private static List updateList = new ArrayList<>(); + private static List deleteList = new ArrayList<>(); + + private static ObjectMapper objMapper = new ObjectMapper(); + + @Before + public void setup() { + String insertRes = "kafka/aliyun/table/event/event-insert.txt"; + String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt"; + String deleteRes = "kafka/aliyun/table/event/event-delete.txt"; + URL url; + try { + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> insertList.add(e)); + + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> updateList.add(e)); + + url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> deleteList.add(e)); + + } catch (Exception e) { + log.error("Fail to init aliyun-json cases", e); + } + } + + @Test + public void extractInsertRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : insertList) { + // 将json解析为JsonNode对象 + JsonNode rootNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.INSERT); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractUpdateRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : updateList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.UPDATE_AFTER); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractDeleteRecord() throws Exception { + AliyunRecordParser parser = + new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : deleteList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.DELETE); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt new file mode 100644 index 000000000000..ebae6608a755 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-delete.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":1,"val":"1.100000","name":"a","create_time":1731661114000}},"after":null,"sequenceId":"1731663842292000000","timestamp":{"eventTime":1731662085000,"systemTime":1731663848953,"checkpointTime":1731662085000},"op":"DELETE","ddl":null},"version":"0.0.1"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt new file mode 100644 index 000000000000..d1cd34e5e6ac --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-insert.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"payload":{"after":{"dataColumn":{"create_time":1731661114000,"id":2,"name":"a","val":"1.100000"}},"before":null,"ddl":null,"op":"INSERT","sequenceId":"-1","timestamp":{"checkpointTime":-1,"eventTime":-1,"systemTime":1731661820245}},"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbName":"bigdata_test","dbType":"MySQL","tableName":"sync_test_table"}},"version":"0.0.1"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt new file mode 100644 index 000000000000..9acf6309cc48 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/aliyun/table/event/event-update-in-one.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + {"schema":{"dataColumn":[{"name":"id","type":"LONG"},{"name":"val","type":"DOUBLE"},{"name":"name","type":"STRING"},{"name":"create_time","type":"DATE"}],"primaryKey":["id"],"source":{"dbType":"MySQL","dbName":"bigdata_test","tableName":"sync_test_table"}},"payload":{"before":{"dataColumn":{"id":2,"val":"1.100000","name":"a","create_time":1731661114000}},"after":{"dataColumn":{"id":2,"val":"2.200000","name":"a","create_time":1731661114000}},"sequenceId":"1731663842292000001","timestamp":{"eventTime":1731662097000,"systemTime":1731663848979,"checkpointTime":1731662097000},"op":"UPDATE_AFTER","ddl":null},"version":"0.0.1"} \ No newline at end of file