Skip to content

Commit

Permalink
[cdc] Fix keyAndPartitions extractor result conflict if partitions or…
Browse files Browse the repository at this point in the history
… keys value is empty. (apache#3574)
  • Loading branch information
LinMingQiang authored Jun 26, 2024
1 parent cc077b7 commit 83444c2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TypeUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -56,7 +55,7 @@ public static GenericRow projectAsInsert(CdcRecord record, List<DataField> dataF
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
String fieldValue = record.fields().get(dataField.name());
if (!StringUtils.isEmpty(fieldValue)) {
if (fieldValue != null) {
genericRow.setField(
i, TypeUtils.castFromCdcValueString(fieldValue, dataField.type()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,45 @@ public void testNullPartition() throws Exception {
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}

@Test
public void testEmptyPartition() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableSchema schema = createTableSchema();
RowDataKeyAndBucketExtractor expected = new RowDataKeyAndBucketExtractor(schema);
CdcRecordKeyAndBucketExtractor actual = new CdcRecordKeyAndBucketExtractor(schema);

long k1 = random.nextLong();
int v1 = random.nextInt();
String k2 = UUID.randomUUID().toString();
String v2 = UUID.randomUUID().toString();

GenericRowData rowData =
GenericRowData.of(
StringData.fromString(""),
null,
k1,
v1,
StringData.fromString(k2),
StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", "");
fields.put("pt2", null);
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}

private TableSchema createTableSchema() throws Exception {
return SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())),
Expand Down

0 comments on commit 83444c2

Please sign in to comment.