From 9d58d712ad75ad7b7b6b495c73f83329964e29db Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Thu, 25 Jan 2024 14:43:42 +0800 Subject: [PATCH] fix --- docs/content/cdc-ingestion/kafka-cdc.md | 8 +- .../format/debezium/DebeziumRecordParser.java | 36 +-- .../format/debezium/DebeziumSchemaUtils.java | 21 +- .../flink/action/cdc/CdcActionITCaseBase.java | 1 + .../KafkaDebeziumSyncTableActionITCase.java | 8 +- .../cdc/kafka/KafkaSyncTableActionITCase.java | 249 +++++++++++++++++- .../table/schema/alltype/debezium-data-1.txt | 19 ++ 7 files changed, 313 insertions(+), 29 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/alltype/debezium-data-1.txt diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index 74e16bceb144..dab2a676ac51 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -66,9 +66,11 @@ If a message in a Kafka topic is a change event captured from another database u The JSON sources possibly missing some information. For example, Ogg and Maxwell format standards don't contain field types; When you write JSON sources into Flink Kafka sink, it will only reserve data and row type and drop other information. The synchronization job will try best to handle the problem as follows: -1. If missing field types, Paimon will use 'STRING' type as default. -2. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization. -3. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table +1. Usually, debezium-json contains 'schema' field, from which Paimon will retrieve data types. Make sure your debezium +json has this field, or Paimon will use 'STRING' type. +2. If missing field types, Paimon will use 'STRING' type as default. +3. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization. +4. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table synchronization. {{< /hint >}} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index e0a4347f07a9..d4d5b47a25d6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -39,10 +39,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs; import static org.apache.paimon.utils.JsonSerdeUtil.isNull; -import static org.apache.paimon.utils.JsonSerdeUtil.parseJsonMap; /** * The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed @@ -120,13 +120,16 @@ private JsonNode getBefore(String op) { @Override protected void setRoot(String record) { JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class); + + hasSchema = false; if (node.has(FIELD_SCHEMA)) { - hasSchema = true; - JsonNode schema = node.get(FIELD_SCHEMA); - parseSchema(schema); root = node.get(FIELD_PAYLOAD); + JsonNode schema = node.get(FIELD_SCHEMA); + if (!isNull(schema)) { + parseSchema(schema); + hasSchema = true; + } } else { - hasSchema = false; root = node; } } @@ -142,10 +145,10 @@ private void parseSchema(JsonNode schema) { ArrayNode fields = null; for (int i = 0; i < schemaFields.size(); i++) { JsonNode node = schemaFields.get(i); - if (node.get("field").equals("after")) { + if (getString(node, "field").equals("after")) { fields = getNodeAs(node, "fields", ArrayNode.class); break; - } else if (node.get("field").equals("before")) { + } else if (getString(node, "field").equals("before")) { if (fields == null) { fields = getNodeAs(node, "fields", ArrayNode.class); } @@ -159,11 +162,14 @@ private void parseSchema(JsonNode schema) { debeziumTypes.put(field, getString(node, "type")); classNames.put(field, getString(node, "name")); - String parametersString = getString(node, "parameters"); + JsonNode parametersNode = node.get("parameters"); Map parametersMap = - parametersString == null + isNull(parametersNode) ? Collections.emptyMap() - : parseJsonMap(parametersString, String.class); + : JsonSerdeUtil.convertValue( + parametersNode, + new TypeReference>() {}); + parameters.put(field, parametersMap); } } @@ -181,18 +187,18 @@ protected Map extractRowData( return super.extractRowData(record, paimonFieldTypes); } - Map recordMap = - JsonSerdeUtil.convertValue(record, new TypeReference>() {}); + Map recordMap = + JsonSerdeUtil.convertValue(record, new TypeReference>() {}); LinkedHashMap resultMap = new LinkedHashMap<>(); - for (Map.Entry entry : recordMap.entrySet()) { + for (Map.Entry entry : recordMap.entrySet()) { String fieldName = entry.getKey(); - String rawValue = entry.getValue(); + String rawValue = Objects.toString(entry.getValue(), null); String debeziumType = debeziumTypes.get(fieldName); String className = classNames.get(fieldName); String transformed = DebeziumSchemaUtils.transformRawValue( - rawValue, debeziumType, className, typeMapping); + rawValue, debeziumType, className, typeMapping, record.get(fieldName)); resultMap.put(fieldName, transformed); paimonFieldTypes.put( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 214579accb7e..13874872bac5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -22,8 +22,8 @@ import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; -import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -44,7 +44,6 @@ import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; import java.util.Map; @@ -62,12 +61,14 @@ public static String transformRawValue( @Nullable String rawValue, String debeziumType, @Nullable String className, - TypeMapping typeMapping) { + TypeMapping typeMapping, + JsonNode origin) { if (rawValue == null) { return null; } String transformed = rawValue; + if (Bits.LOGICAL_NAME.equals(className)) { // transform little-endian form to normal order // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types @@ -139,8 +140,8 @@ else if (Date.SCHEMA_NAME.equals(className)) { // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector // for implementation // TODO currently we cannot get zone id - ZoneId zoneId = ZoneId.systemDefault(); - LocalDateTime localDateTime = Instant.parse(rawValue).atZone(zoneId).toLocalDateTime(); + LocalDateTime localDateTime = + Instant.parse(rawValue).atZone(ZoneOffset.UTC).toLocalDateTime(); transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6); } else if (MicroTime.SCHEMA_NAME.equals(className)) { long microseconds = Long.parseLong(rawValue); @@ -157,8 +158,7 @@ else if (Date.SCHEMA_NAME.equals(className)) { } else if (Point.LOGICAL_NAME.equals(className) || Geometry.LOGICAL_NAME.equals(className)) { try { - JsonNode geoNode = JsonSerdeUtil.asSpecificNodeType(rawValue, JsonNode.class); - byte[] wkb = geoNode.get(Geometry.WKB_FIELD).binaryValue(); + byte[] wkb = origin.get(Geometry.WKB_FIELD).binaryValue(); transformed = MySqlTypeUtils.convertWkbArray(wkb); } catch (Exception e) { throw new IllegalArgumentException( @@ -180,9 +180,14 @@ public static DataType toDataType( String precision = parameters.get("connect.decimal.precision"); if (precision == null) { return DataTypes.DECIMAL(20, 0); + } + + int p = Integer.parseInt(precision); + if (p > DecimalType.MAX_PRECISION) { + return DataTypes.STRING(); } else { int scale = Integer.parseInt(parameters.get("scale")); - return DataTypes.DECIMAL(Integer.parseInt(precision), scale); + return DataTypes.DECIMAL(p, scale); } case Date.SCHEMA_NAME: return DataTypes.DATE(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 7a621030217a..1d455e793105 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -147,6 +147,7 @@ protected void waitForResult( rowType); List sortedActual = new ArrayList<>(result); Collections.sort(sortedActual); + System.out.println(sortedActual); if (sortedExpected.equals(sortedActual)) { break; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index e5e030a60adb..6552f56f6245 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -25,7 +25,6 @@ public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase { private static final String DEBEZIUM = "debezium"; - private static final String DEBEZIUM_JSON_SCHEMA_INCLUDE = "debezium_json_schema_include"; @Test @Timeout(60) @@ -96,6 +95,11 @@ public void testKafkaBuildSchemaWithDelete() throws Exception { @Test @Timeout(60) public void testSchemaIncludeRecord1() throws Exception { - testSchemaIncludeRecord(DEBEZIUM_JSON_SCHEMA_INCLUDE); + testSchemaIncludeRecord(DEBEZIUM); + } + + @Test + public void testAllTypesWithSchema() throws Exception { + testAllTypesWithSchemaImpl(DEBEZIUM); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index da38b2a0ce7f..47ab426c2700 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -675,7 +675,7 @@ public void testSchemaIncludeRecord(String format) throws Exception { } Map kafkaConfig = getBasicKafkaConfig(); - kafkaConfig.put(VALUE_FORMAT.key(), format); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) @@ -701,4 +701,251 @@ public void testSchemaIncludeRecord(String format) throws Exception { "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"); waitForResult(expected, table, rowType, primaryKeys); } + + // TODO some types are different from mysql cdc; maybe need to fix + public void testAllTypesWithSchemaImpl(String format) throws Exception { + String topic = "schema_include_all_type"; + createTestTopic(topic, 1, 1); + + List lines = readLines("kafka/debezium/table/schema/alltype/debezium-data-1.txt"); + writeRecordsToKafka(topic, lines); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPartitionKeys("pt") + .withPrimaryKeys("pt", "_id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + waitingTables(tableName); + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), // _id + DataTypes.DECIMAL(2, 1).notNull(), // pt + DataTypes.BOOLEAN(), // _bit1 + DataTypes.BINARY(8), // _bit + DataTypes.SMALLINT(), // _tinyint1 different from mysql cdc + DataTypes.SMALLINT(), // _boolean different from mysql cdc + DataTypes.SMALLINT(), // _bool different from mysql cdc + DataTypes.SMALLINT(), // _tinyint different from mysql cdc + DataTypes.SMALLINT(), // _tinyint_unsigned + DataTypes.SMALLINT(), // _tinyint_unsigned_zerofill + DataTypes.SMALLINT(), // _smallint + DataTypes.INT(), // _smallint_unsigned + DataTypes.INT(), // _smallint_unsigned_zerofill + DataTypes.INT(), // _mediumint + DataTypes.INT(), // _mediumint_unsigned different from mysql cdc + DataTypes + .INT(), // _mediumint_unsigned_zerofill different from mysql cdc + DataTypes.INT(), // _int + DataTypes.BIGINT(), // _int_unsigned + DataTypes.BIGINT(), // _int_unsigned_zerofill + DataTypes.BIGINT(), // _bigint + DataTypes.DECIMAL(20, 0), // _bigint_unsigned + DataTypes.DECIMAL(20, 0), // _bigint_unsigned_zerofill + DataTypes.DECIMAL(20, 0), // _serial different from mysql cdc + DataTypes.DOUBLE(), // _float different from mysql cdc + DataTypes.DOUBLE(), // _float_unsigned different from mysql cdc + DataTypes.DOUBLE(), // _float_unsigned_zerofill different from mysql cdc + DataTypes.DOUBLE(), // _real + DataTypes.DOUBLE(), // _real_unsigned + DataTypes.DOUBLE(), // _real_unsigned_zerofill + DataTypes.DOUBLE(), // _double + DataTypes.DOUBLE(), // _double_unsigned + DataTypes.DOUBLE(), // _double_unsigned_zerofill + DataTypes.DOUBLE(), // _double_precision + DataTypes.DOUBLE(), // _double_precision_unsigned + DataTypes.DOUBLE(), // _double_precision_unsigned_zerofill + DataTypes.DECIMAL(8, 3), // _numeric + DataTypes.DECIMAL(8, 3), // _numeric_unsigned + DataTypes.DECIMAL(8, 3), // _numeric_unsigned_zerofill + DataTypes.STRING(), // _fixed + DataTypes.STRING(), // _fixed_unsigned + DataTypes.STRING(), // _fixed_unsigned_zerofill + DataTypes.DECIMAL(8, 0), // _decimal + DataTypes.DECIMAL(8, 0), // _decimal_unsigned + DataTypes.DECIMAL(8, 0), // _decimal_unsigned_zerofill + DataTypes.DECIMAL(38, 10), // _big_decimal + DataTypes.DATE(), // _date + DataTypes.TIMESTAMP(3), // _datetime different from mysql cdc + DataTypes.TIMESTAMP(3), // _datetime3 + DataTypes.TIMESTAMP(6), // _datetime6 + DataTypes.TIMESTAMP(3), // _datetime_p different from mysql cdc + DataTypes.TIMESTAMP(3), // _datetime_p2 different from mysql cdc + DataTypes.TIMESTAMP(6), // _timestamp + DataTypes.TIMESTAMP(6), // _timestamp0 different from mysql cdc + DataTypes.STRING(), // _char different from mysql cdc + DataTypes.STRING(), // _varchar different from mysql cdc + DataTypes.STRING(), // _tinytext + DataTypes.STRING(), // _text + DataTypes.STRING(), // _mediumtext + DataTypes.STRING(), // _longtext + DataTypes.BYTES(), // _bin different from mysql cdc + DataTypes.BYTES(), // _varbin different from mysql cdc + DataTypes.BYTES(), // _tinyblob + DataTypes.BYTES(), // _blob + DataTypes.BYTES(), // _mediumblob + DataTypes.BYTES(), // _longblob + DataTypes.STRING(), // _json + DataTypes.STRING(), // _enum + DataTypes.INT(), // _year + DataTypes.TIME(), // _time + DataTypes.STRING(), // _point + DataTypes.STRING(), // _geometry + DataTypes.STRING(), // _linestring + DataTypes.STRING(), // _polygon + DataTypes.STRING(), // _multipoint + DataTypes.STRING(), // _multiline + DataTypes.STRING(), // _multipolygon + DataTypes.STRING(), // _geometrycollection + DataTypes.STRING() // _set different from mysql cdc + }, + new String[] { + "_id", + "pt", + "_bit1", + "_bit", + "_tinyint1", + "_boolean", + "_bool", + "_tinyint", + "_tinyint_unsigned", + "_tinyint_unsigned_zerofill", + "_smallint", + "_smallint_unsigned", + "_smallint_unsigned_zerofill", + "_mediumint", + "_mediumint_unsigned", + "_mediumint_unsigned_zerofill", + "_int", + "_int_unsigned", + "_int_unsigned_zerofill", + "_bigint", + "_bigint_unsigned", + "_bigint_unsigned_zerofill", + "_serial", + "_float", + "_float_unsigned", + "_float_unsigned_zerofill", + "_real", + "_real_unsigned", + "_real_unsigned_zerofill", + "_double", + "_double_unsigned", + "_double_unsigned_zerofill", + "_double_precision", + "_double_precision_unsigned", + "_double_precision_unsigned_zerofill", + "_numeric", + "_numeric_unsigned", + "_numeric_unsigned_zerofill", + "_fixed", + "_fixed_unsigned", + "_fixed_unsigned_zerofill", + "_decimal", + "_decimal_unsigned", + "_decimal_unsigned_zerofill", + "_big_decimal", + "_date", + "_datetime", + "_datetime3", + "_datetime6", + "_datetime_p", + "_datetime_p2", + "_timestamp", + "_timestamp0", + "_char", + "_varchar", + "_tinytext", + "_text", + "_mediumtext", + "_longtext", + "_bin", + "_varbin", + "_tinyblob", + "_blob", + "_mediumblob", + "_longblob", + "_json", + "_enum", + "_year", + "_time", + "_point", + "_geometry", + "_linestring", + "_polygon", + "_multipoint", + "_multiline", + "_multipolygon", + "_geometrycollection", + "_set", + }); + + // BIT(64) data: 0B11111000111 -> 0B00000111_11000111 + String bits = + Arrays.toString( + new byte[] {0, 0, 0, 0, 0, 0, (byte) 0B00000111, (byte) 0B11000111}); + List expected = + Collections.singletonList( + "+I[" + + "1, 1.1, " + + String.format("true, %s, ", bits) + + "1, 1, 0, 1, 2, 3, " + + "1000, 2000, 3000, " + + "100000, 200000, 300000, " + + "1000000, 2000000, 3000000, " + + "10000000000, 20000000000, 30000000000, 40000000000, " + + "1.5, 2.5, 3.5, " + + "1.000001, 2.000002, 3.000003, " + + "1.000011, 2.000022, 3.000033, " + + "1.000111, 2.000222, 3.000333, " + + "12345.110, 12345.220, 12345.330, " + // TODO fix FIXED + + "1.2345678987654322E32, 1.2345678987654322E32, 1.2345678987654322E32, " + // TODO fix BIG DECIMAL + + "11111, 22222, 33333, 2222222222222222400000000000.0000000000, " + + "19439, " + // display value of datetime is not affected by timezone + + "2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, " + + "2023-03-24T14:30, 2023-03-24T14:30:05.120, " + // display value of timestamp is affected by timezone + // we store 2023-03-23T15:00:10.123456 in UTC-8 system timezone + // and query this timestamp in UTC-5 MySQL server timezone + // so the display value should increase by 3 hour + // TODO haven't handle zone + + "2023-03-23T22:00:10.123456, 2023-03-23T07:10, " + + "Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, " + + "[98, 121, 116, 101, 115, 0, 0, 0, 0, 0], " + + "[109, 111, 114, 101, 32, 98, 121, 116, 101, 115], " + + "[84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], " + + "[66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], " + + "[77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], " + + "[76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], " + + "{\"a\": \"b\"}, " + + "value1, " + + "2023, " + + "36803000, " + + "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, " + + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, " + + "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, " + + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, " + + "{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, " + + "{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, " + + "{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, " + + "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, " + // TODO fix set + + "a,b" + + "]"); + + List primaryKeys = Arrays.asList("pt", "_id"); + + waitForResult(expected, table, rowType, primaryKeys); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/alltype/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/alltype/debezium-data-1.txt new file mode 100644 index 000000000000..b398425cebcc --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/alltype/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"_id"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"1","connect.decimal.precision":"2"},"field":"pt"},{"type":"boolean","optional":true,"field":"_bit1"},{"type":"bytes","optional":true,"name":"io.debezium.data.Bits","version":1,"parameters":{"length":"64"},"field":"_bit"},{"type":"int16","optional":true,"field":"_tinyint1"},{"type":"int16","optional":true,"field":"_boolean"},{"type":"int16","optional":true,"field":"_bool"},{"type":"int16","optional":true,"field":"_tinyint"},{"type":"int16","optional":true,"field":"_tinyint_unsigned"},{"type":"int16","optional":true,"field":"_tinyint_unsigned_zerofill"},{"type":"int16","optional":true,"field":"_smallint"},{"type":"int32","optional":true,"field":"_smallint_unsigned"},{"type":"int32","optional":true,"field":"_smallint_unsigned_zerofill"},{"type":"int32","optional":true,"field":"_mediumint"},{"type":"int32","optional":true,"field":"_mediumint_unsigned"},{"type":"int32","optional":true,"field":"_mediumint_unsigned_zerofill"},{"type":"int32","optional":true,"field":"_int"},{"type":"int64","optional":true,"field":"_int_unsigned"},{"type":"int64","optional":true,"field":"_int_unsigned_zerofill"},{"type":"int64","optional":true,"field":"_bigint"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_bigint_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_bigint_unsigned_zerofill"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_serial"},{"type":"double","optional":true,"field":"_float"},{"type":"double","optional":true,"field":"_float_unsigned"},{"type":"double","optional":true,"field":"_float_unsigned_zerofill"},{"type":"double","optional":true,"field":"_real"},{"type":"double","optional":true,"field":"_real_unsigned"},{"type":"double","optional":true,"field":"_real_unsigned_zerofill"},{"type":"double","optional":true,"field":"_double"},{"type":"double","optional":true,"field":"_double_unsigned"},{"type":"double","optional":true,"field":"_double_unsigned_zerofill"},{"type":"double","optional":true,"field":"_double_precision"},{"type":"double","optional":true,"field":"_double_precision_unsigned"},{"type":"double","optional":true,"field":"_double_precision_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"10","connect.decimal.precision":"38"},"field":"_big_decimal"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime3"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"_datetime6"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime_p"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime_p2"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"_timestamp"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"_timestamp0"},{"type":"string","optional":true,"field":"_char"},{"type":"string","optional":true,"field":"_varchar"},{"type":"string","optional":true,"field":"_tinytext"},{"type":"string","optional":true,"field":"_text"},{"type":"string","optional":true,"field":"_mediumtext"},{"type":"string","optional":true,"field":"_longtext"},{"type":"bytes","optional":true,"field":"_bin"},{"type":"bytes","optional":true,"field":"_varbin"},{"type":"bytes","optional":true,"field":"_tinyblob"},{"type":"bytes","optional":true,"field":"_blob"},{"type":"bytes","optional":true,"field":"_mediumblob"},{"type":"bytes","optional":true,"field":"_longblob"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"_json"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"value1,value2,value3"},"field":"_enum"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"_time"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_geometry"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multipoint"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multiline"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multipolygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_geometrycollection"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"_set"}],"optional":true,"name":"mysql_binlog_source.paimon_sync_table.all_types_table.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"_id"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"1","connect.decimal.precision":"2"},"field":"pt"},{"type":"boolean","optional":true,"field":"_bit1"},{"type":"bytes","optional":true,"name":"io.debezium.data.Bits","version":1,"parameters":{"length":"64"},"field":"_bit"},{"type":"int16","optional":true,"field":"_tinyint1"},{"type":"int16","optional":true,"field":"_boolean"},{"type":"int16","optional":true,"field":"_bool"},{"type":"int16","optional":true,"field":"_tinyint"},{"type":"int16","optional":true,"field":"_tinyint_unsigned"},{"type":"int16","optional":true,"field":"_tinyint_unsigned_zerofill"},{"type":"int16","optional":true,"field":"_smallint"},{"type":"int32","optional":true,"field":"_smallint_unsigned"},{"type":"int32","optional":true,"field":"_smallint_unsigned_zerofill"},{"type":"int32","optional":true,"field":"_mediumint"},{"type":"int32","optional":true,"field":"_mediumint_unsigned"},{"type":"int32","optional":true,"field":"_mediumint_unsigned_zerofill"},{"type":"int32","optional":true,"field":"_int"},{"type":"int64","optional":true,"field":"_int_unsigned"},{"type":"int64","optional":true,"field":"_int_unsigned_zerofill"},{"type":"int64","optional":true,"field":"_bigint"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_bigint_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_bigint_unsigned_zerofill"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0"},"field":"_serial"},{"type":"double","optional":true,"field":"_float"},{"type":"double","optional":true,"field":"_float_unsigned"},{"type":"double","optional":true,"field":"_float_unsigned_zerofill"},{"type":"double","optional":true,"field":"_real"},{"type":"double","optional":true,"field":"_real_unsigned"},{"type":"double","optional":true,"field":"_real_unsigned_zerofill"},{"type":"double","optional":true,"field":"_double"},{"type":"double","optional":true,"field":"_double_unsigned"},{"type":"double","optional":true,"field":"_double_unsigned_zerofill"},{"type":"double","optional":true,"field":"_double_precision"},{"type":"double","optional":true,"field":"_double_precision_unsigned"},{"type":"double","optional":true,"field":"_double_precision_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"8"},"field":"_numeric_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"3","connect.decimal.precision":"40"},"field":"_fixed_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal_unsigned"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"8"},"field":"_decimal_unsigned_zerofill"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"10","connect.decimal.precision":"38"},"field":"_big_decimal"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime3"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"_datetime6"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime_p"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"_datetime_p2"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"_timestamp"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"_timestamp0"},{"type":"string","optional":true,"field":"_char"},{"type":"string","optional":true,"field":"_varchar"},{"type":"string","optional":true,"field":"_tinytext"},{"type":"string","optional":true,"field":"_text"},{"type":"string","optional":true,"field":"_mediumtext"},{"type":"string","optional":true,"field":"_longtext"},{"type":"bytes","optional":true,"field":"_bin"},{"type":"bytes","optional":true,"field":"_varbin"},{"type":"bytes","optional":true,"field":"_tinyblob"},{"type":"bytes","optional":true,"field":"_blob"},{"type":"bytes","optional":true,"field":"_mediumblob"},{"type":"bytes","optional":true,"field":"_longblob"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"_json"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"value1,value2,value3"},"field":"_enum"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"_time"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_geometry"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multipoint"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multiline"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_multipolygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"_geometrycollection"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"_set"}],"optional":true,"name":"mysql_binlog_source.paimon_sync_table.all_types_table.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_binlog_source.paimon_sync_table.all_types_table.Envelope"},"payload":{"before":null,"after":{"_id":1,"pt":1.1,"_bit1":true,"_bit":"xwcAAAAAAAA=","_tinyint1":1,"_boolean":1,"_bool":0,"_tinyint":1,"_tinyint_unsigned":2,"_tinyint_unsigned_zerofill":3,"_smallint":1000,"_smallint_unsigned":2000,"_smallint_unsigned_zerofill":3000,"_mediumint":100000,"_mediumint_unsigned":200000,"_mediumint_unsigned_zerofill":300000,"_int":1000000,"_int_unsigned":2000000,"_int_unsigned_zerofill":3000000,"_bigint":10000000000,"_bigint_unsigned":20000000000,"_bigint_unsigned_zerofill":30000000000,"_serial":40000000000,"_float":1.5,"_float_unsigned":2.5,"_float_unsigned_zerofill":3.5,"_real":1.000001,"_real_unsigned":2.000002,"_real_unsigned_zerofill":3.000003,"_double":1.000011,"_double_unsigned":2.000022,"_double_unsigned_zerofill":3.000033,"_double_precision":1.000111,"_double_precision_unsigned":2.000222,"_double_precision_unsigned_zerofill":3.000333,"_numeric":12345.110,"_numeric_unsigned":12345.220,"_numeric_unsigned_zerofill":12345.330,"_fixed":123456789876543212345678987654321.110,"_fixed_unsigned":123456789876543212345678987654321.220,"_fixed_unsigned_zerofill":123456789876543212345678987654321.330,"_decimal":11111,"_decimal_unsigned":22222,"_decimal_unsigned_zerofill":33333,"_big_decimal":2222222222222222300000001111.1234567890,"_date":19439,"_datetime":1679581805000,"_datetime3":1679581805123,"_datetime6":1679581805123456,"_datetime_p":1679668200000,"_datetime_p2":1679668205120,"_timestamp":"2023-03-23T22:00:10.123456Z","_timestamp0":"2023-03-23T07:10:00Z","_char":"Paimon","_varchar":"Apache Paimon","_tinytext":"Apache Paimon MySQL TINYTEXT Test Data","_text":"Apache Paimon MySQL Test Data","_mediumtext":"Apache Paimon MySQL MEDIUMTEXT Test Data","_longtext":"Apache Paimon MySQL Long Test Data","_bin":"Ynl0ZXMAAAAAAA==","_varbin":"bW9yZSBieXRlcw==","_tinyblob":"VElOWUJMT0IgdHlwZSB0ZXN0IGRhdGE=","_blob":"QkxPQiB0eXBlIHRlc3QgZGF0YQ==","_mediumblob":"TUVESVVNQkxPQiB0eXBlIHRlc3QgZGF0YQ==","_longblob":"TE9OR0JMT0IgIGJ5dGVzIHRlc3QgZGF0YQ==","_json":"{\"a\": \"b\"}","_enum":"value1","_year":2023,"_time":36803000000,"_point":{"x":1.0,"y":1.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAPA/","srid":null},"_geometry":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},"_linestring":{"wkb":"AQIAAAADAAAAAAAAAAAACEAAAAAAAAAAAAAAAAAAAAhAAAAAAAAACEAAAAAAAAAIQAAAAAAAABRA","srid":null},"_polygon":{"wkb":"AQMAAAABAAAABQAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAPA/","srid":null},"_multipoint":{"wkb":"AQQAAAACAAAAAQEAAAAAAAAAAADwPwAAAAAAAPA/AQEAAAAAAAAAAAAAQAAAAAAAAABA","srid":null},"_multiline":{"wkb":"AQUAAAACAAAAAQIAAAADAAAAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAAAEAAAAAAAAAIQAAAAAAAAAhAAQIAAAACAAAAAAAAAAAAEEAAAAAAAAAQQAAAAAAAABRAAAAAAAAAFEA=","srid":null},"_multipolygon":{"wkb":"AQYAAAACAAAAAQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAkQAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAQMAAAABAAAABQAAAAAAAAAAABRAAAAAAAAAFEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAcQAAAAAAAABRAAAAAAAAAHEAAAAAAAAAUQAAAAAAAABRA","srid":null},"_geometrycollection":{"wkb":"AQcAAAADAAAAAQEAAAAAAAAAAAAkQAAAAAAAACRAAQEAAAAAAAAAAAA+QAAAAAAAAD5AAQIAAAACAAAAAAAAAAAALkAAAAAAAAAuQAAAAAAAADRAAAAAAAAANEA=","srid":null},"_set":"a,b"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"paimon_sync_table","sequence":null,"table":"all_types_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1706167086275,"transaction":null}}