Skip to content

Commit

Permalink
[cdc] Fix ambiguous naming in CdcRecord. (#4450)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyaogai authored Nov 5, 2024
1 parent 2f4c8d0 commit 7c4fd96
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ public class CdcRecord implements Serializable {

private RowKind kind;

private final Map<String, String> fields;
// field name -> value
private final Map<String, String> data;

public CdcRecord(RowKind kind, Map<String, String> fields) {
public CdcRecord(RowKind kind, Map<String, String> data) {
this.kind = kind;
this.fields = fields;
this.data = data;
}

public static CdcRecord emptyRecord() {
Expand All @@ -50,16 +51,16 @@ public RowKind kind() {
return kind;
}

public Map<String, String> fields() {
return fields;
public Map<String, String> data() {
return data;
}

public CdcRecord fieldNameLowerCase() {
Map<String, String> newFields = new HashMap<>();
for (Map.Entry<String, String> entry : fields.entrySet()) {
newFields.put(entry.getKey().toLowerCase(), entry.getValue());
Map<String, String> newData = new HashMap<>();
for (Map.Entry<String, String> entry : data.entrySet()) {
newData.put(entry.getKey().toLowerCase(), entry.getValue());
}
return new CdcRecord(kind, newFields);
return new CdcRecord(kind, newData);
}

@Override
Expand All @@ -69,16 +70,16 @@ public boolean equals(Object o) {
}

CdcRecord that = (CdcRecord) o;
return Objects.equals(kind, that.kind) && Objects.equals(fields, that.fields);
return Objects.equals(kind, that.kind) && Objects.equals(data, that.data);
}

@Override
public int hashCode() {
return Objects.hash(kind, fields);
return Objects.hash(kind, data);
}

@Override
public String toString() {
return kind.shortString() + " " + fields;
return kind.shortString() + " " + data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static GenericRow projectAsInsert(CdcRecord record, List<DataField> dataF
GenericRow genericRow = new GenericRow(dataFields.size());
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
String fieldValue = record.fields().get(dataField.name());
String fieldValue = record.data().get(dataField.name());
if (fieldValue != null) {
genericRow.setField(
i, TypeUtils.castFromCdcValueString(fieldValue, dataField.type()));
Expand Down Expand Up @@ -83,7 +83,7 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
List<String> fieldNames =
dataFields.stream().map(DataField::name).collect(Collectors.toList());

for (Map.Entry<String, String> field : record.fields().entrySet()) {
for (Map.Entry<String, String> field : record.data().entrySet()) {
String key = field.getKey();
String value = field.getValue();

Expand Down Expand Up @@ -117,14 +117,14 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
}

public static CdcRecord fromGenericRow(GenericRow row, List<String> fieldNames) {
Map<String, String> fields = new HashMap<>();
Map<String, String> data = new HashMap<>();
for (int i = 0; i < row.getFieldCount(); i++) {
Object field = row.getField(i);
if (field != null) {
fields.put(fieldNames.get(i), field.toString());
data.put(fieldNames.get(i), field.toString());
}
}

return new CdcRecord(row.getRowKind(), fields);
return new CdcRecord(row.getRowKind(), data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
}

public boolean hasPayload() {
return !cdcRecord.fields().isEmpty();
return !cdcRecord.data().isEmpty();
}

public RowKind kind() {
Expand Down Expand Up @@ -95,7 +95,7 @@ public static class Builder {
private final RowKind kind;
private final AtomicInteger fieldId;
private final List<DataField> fields = new ArrayList<>();
private final Map<String, String> fieldValues = new HashMap<>();
private final Map<String, String> data = new HashMap<>();

public Builder(RowKind kind, AtomicInteger fieldId) {
this.kind = kind;
Expand All @@ -109,12 +109,12 @@ public Builder field(String name, DataType type, String value) {
public Builder field(
String name, DataType type, String value, @Nullable String description) {
fields.add(new DataField(fieldId.incrementAndGet(), name, type, description));
fieldValues.put(name, value);
data.put(name, value);
return this;
}

public RichCdcRecord build() {
return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
return new RichCdcRecord(new CdcRecord(kind, data), fields);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ private void testImpl(Identifier tableId, List<Map<String, String>> input) {

// assert that insert and delete records are routed into same channel

for (Map<String, String> fields : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
for (Map<String, String> data : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);

assertThat(
channelComputer.channel(
Expand All @@ -184,8 +184,8 @@ private void testImpl(Identifier tableId, List<Map<String, String>> input) {
// assert that channel >= 0
int numTests = random.nextInt(10) + 1;
for (int test = 0; test < numTests; test++) {
Map<String, String> fields = input.get(random.nextInt(input.size()));
CdcRecord record = new CdcRecord(RowKind.INSERT, fields);
Map<String, String> data = input.get(random.nextInt(input.size()));
CdcRecord record = new CdcRecord(RowKind.INSERT, data);

int numBuckets = random.nextInt(numChannels * 4) + 1;
for (int i = 0; i < numBuckets; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ private void testImpl(TableSchema schema, List<Map<String, String>> input) {

// assert that channel(record) and channel(partition, bucket) gives the same result

for (Map<String, String> fields : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
for (Map<String, String> data : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);

extractor.setRecord(random.nextBoolean() ? insertRecord : deleteRecord);
BinaryRow partition = extractor.partition();
Expand All @@ -151,8 +151,8 @@ private void testImpl(TableSchema schema, List<Map<String, String>> input) {
bucketsPerChannel.put(i, 0);
}

Map<String, String> fields = input.get(random.nextInt(input.size()));
extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
Map<String, String> data = input.get(random.nextInt(input.size()));
extractor.setRecord(new CdcRecord(RowKind.INSERT, data));
BinaryRow partition = extractor.partition();

int numBuckets = random.nextInt(numChannels * 4) + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ public void testExtract() throws Exception {
StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", pt1);
fields.put("pt2", String.valueOf(pt2));
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
Map<String, String> data = new HashMap<>();
data.put("pt1", pt1);
data.put("pt2", String.valueOf(pt2));
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand All @@ -122,19 +122,19 @@ public void testNullPartition() throws Exception {
null, null, k1, v1, StringData.fromString(k2), StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", null);
fields.put("pt2", null);
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);
Map<String, String> data = new HashMap<>();
data.put("pt1", null);
data.put("pt2", null);
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand All @@ -161,19 +161,19 @@ public void testEmptyPartition() throws Exception {
StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", "");
fields.put("pt2", null);
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);
Map<String, String> data = new HashMap<>();
data.put("pt1", "");
data.put("pt2", null);
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand Down
Loading

0 comments on commit 7c4fd96

Please sign in to comment.