From 83444c2270a1b08c7ef6249d51b1e42d48fe52d3 Mon Sep 17 00:00:00 2001 From: HunterXHunter <1356469429@qq.com> Date: Wed, 26 Jun 2024 16:01:15 +0800 Subject: [PATCH] [cdc] Fix keyAndPartitions extractor result conflict if partitions or keys value is empty. (#3574) --- .../paimon/flink/sink/cdc/CdcRecordUtils.java | 3 +- .../CdcRecordKeyAndBucketExtractorTest.java | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java index 769c41a1952e..0d192dd538d6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java @@ -22,7 +22,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.StringUtils; import org.apache.paimon.utils.TypeUtils; import org.slf4j.Logger; @@ -56,7 +55,7 @@ public static GenericRow projectAsInsert(CdcRecord record, List dataF for (int i = 0; i < dataFields.size(); i++) { DataField dataField = dataFields.get(i); String fieldValue = record.fields().get(dataField.name()); - if (!StringUtils.isEmpty(fieldValue)) { + if (fieldValue != null) { genericRow.setField( i, TypeUtils.castFromCdcValueString(fieldValue, dataField.type())); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java index a9dc3907dac3..8384b7155a0e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java @@ -139,6 +139,45 @@ public void testNullPartition() throws Exception { assertThat(actual.bucket()).isEqualTo(expected.bucket()); } + @Test + public void testEmptyPartition() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + TableSchema schema = createTableSchema(); + RowDataKeyAndBucketExtractor expected = new RowDataKeyAndBucketExtractor(schema); + CdcRecordKeyAndBucketExtractor actual = new CdcRecordKeyAndBucketExtractor(schema); + + long k1 = random.nextLong(); + int v1 = random.nextInt(); + String k2 = UUID.randomUUID().toString(); + String v2 = UUID.randomUUID().toString(); + + GenericRowData rowData = + GenericRowData.of( + StringData.fromString(""), + null, + k1, + v1, + StringData.fromString(k2), + StringData.fromString(v2)); + expected.setRecord(rowData); + + Map fields = new HashMap<>(); + fields.put("pt1", ""); + fields.put("pt2", null); + fields.put("k1", String.valueOf(k1)); + fields.put("v1", String.valueOf(v1)); + fields.put("k2", k2); + fields.put("v2", v2); + + actual.setRecord(new CdcRecord(RowKind.INSERT, fields)); + assertThat(actual.partition()).isEqualTo(expected.partition()); + assertThat(actual.bucket()).isEqualTo(expected.bucket()); + + actual.setRecord(new CdcRecord(RowKind.DELETE, fields)); + assertThat(actual.partition()).isEqualTo(expected.partition()); + assertThat(actual.bucket()).isEqualTo(expected.bucket()); + } + private TableSchema createTableSchema() throws Exception { return SchemaUtils.forceCommit( new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())),