Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] Fix ambiguous naming in CdcRecord #4450

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ public class CdcRecord implements Serializable {

private RowKind kind;

private final Map<String, String> fields;
// field name -> value
private final Map<String, String> data;

public CdcRecord(RowKind kind, Map<String, String> fields) {
public CdcRecord(RowKind kind, Map<String, String> data) {
this.kind = kind;
this.fields = fields;
this.data = data;
}

public static CdcRecord emptyRecord() {
Expand All @@ -50,16 +51,16 @@ public RowKind kind() {
return kind;
}

public Map<String, String> fields() {
return fields;
public Map<String, String> data() {
return data;
}

public CdcRecord fieldNameLowerCase() {
Map<String, String> newFields = new HashMap<>();
for (Map.Entry<String, String> entry : fields.entrySet()) {
newFields.put(entry.getKey().toLowerCase(), entry.getValue());
Map<String, String> newData = new HashMap<>();
for (Map.Entry<String, String> entry : data.entrySet()) {
newData.put(entry.getKey().toLowerCase(), entry.getValue());
}
return new CdcRecord(kind, newFields);
return new CdcRecord(kind, newData);
}

@Override
Expand All @@ -69,16 +70,16 @@ public boolean equals(Object o) {
}

CdcRecord that = (CdcRecord) o;
return Objects.equals(kind, that.kind) && Objects.equals(fields, that.fields);
return Objects.equals(kind, that.kind) && Objects.equals(data, that.data);
}

@Override
public int hashCode() {
return Objects.hash(kind, fields);
return Objects.hash(kind, data);
}

@Override
public String toString() {
return kind.shortString() + " " + fields;
return kind.shortString() + " " + data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static GenericRow projectAsInsert(CdcRecord record, List<DataField> dataF
GenericRow genericRow = new GenericRow(dataFields.size());
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
String fieldValue = record.fields().get(dataField.name());
String fieldValue = record.data().get(dataField.name());
if (fieldValue != null) {
genericRow.setField(
i, TypeUtils.castFromCdcValueString(fieldValue, dataField.type()));
Expand Down Expand Up @@ -83,7 +83,7 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
List<String> fieldNames =
dataFields.stream().map(DataField::name).collect(Collectors.toList());

for (Map.Entry<String, String> field : record.fields().entrySet()) {
for (Map.Entry<String, String> field : record.data().entrySet()) {
String key = field.getKey();
String value = field.getValue();

Expand Down Expand Up @@ -117,14 +117,14 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
}

public static CdcRecord fromGenericRow(GenericRow row, List<String> fieldNames) {
Map<String, String> fields = new HashMap<>();
Map<String, String> data = new HashMap<>();
for (int i = 0; i < row.getFieldCount(); i++) {
Object field = row.getField(i);
if (field != null) {
fields.put(fieldNames.get(i), field.toString());
data.put(fieldNames.get(i), field.toString());
}
}

return new CdcRecord(row.getRowKind(), fields);
return new CdcRecord(row.getRowKind(), data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
}

public boolean hasPayload() {
return !cdcRecord.fields().isEmpty();
return !cdcRecord.data().isEmpty();
}

public RowKind kind() {
Expand Down Expand Up @@ -95,7 +95,7 @@ public static class Builder {
private final RowKind kind;
private final AtomicInteger fieldId;
private final List<DataField> fields = new ArrayList<>();
private final Map<String, String> fieldValues = new HashMap<>();
private final Map<String, String> data = new HashMap<>();

public Builder(RowKind kind, AtomicInteger fieldId) {
this.kind = kind;
Expand All @@ -109,12 +109,12 @@ public Builder field(String name, DataType type, String value) {
public Builder field(
String name, DataType type, String value, @Nullable String description) {
fields.add(new DataField(fieldId.incrementAndGet(), name, type, description));
fieldValues.put(name, value);
data.put(name, value);
return this;
}

public RichCdcRecord build() {
return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
return new RichCdcRecord(new CdcRecord(kind, data), fields);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ private void testImpl(Identifier tableId, List<Map<String, String>> input) {

// assert that insert and delete records are routed into same channel

for (Map<String, String> fields : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
for (Map<String, String> data : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);

assertThat(
channelComputer.channel(
Expand All @@ -184,8 +184,8 @@ private void testImpl(Identifier tableId, List<Map<String, String>> input) {
// assert that channel >= 0
int numTests = random.nextInt(10) + 1;
for (int test = 0; test < numTests; test++) {
Map<String, String> fields = input.get(random.nextInt(input.size()));
CdcRecord record = new CdcRecord(RowKind.INSERT, fields);
Map<String, String> data = input.get(random.nextInt(input.size()));
CdcRecord record = new CdcRecord(RowKind.INSERT, data);

int numBuckets = random.nextInt(numChannels * 4) + 1;
for (int i = 0; i < numBuckets; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ private void testImpl(TableSchema schema, List<Map<String, String>> input) {

// assert that channel(record) and channel(partition, bucket) gives the same result

for (Map<String, String> fields : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
for (Map<String, String> data : input) {
CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);

extractor.setRecord(random.nextBoolean() ? insertRecord : deleteRecord);
BinaryRow partition = extractor.partition();
Expand All @@ -151,8 +151,8 @@ private void testImpl(TableSchema schema, List<Map<String, String>> input) {
bucketsPerChannel.put(i, 0);
}

Map<String, String> fields = input.get(random.nextInt(input.size()));
extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
Map<String, String> data = input.get(random.nextInt(input.size()));
extractor.setRecord(new CdcRecord(RowKind.INSERT, data));
BinaryRow partition = extractor.partition();

int numBuckets = random.nextInt(numChannels * 4) + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ public void testExtract() throws Exception {
StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", pt1);
fields.put("pt2", String.valueOf(pt2));
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
Map<String, String> data = new HashMap<>();
data.put("pt1", pt1);
data.put("pt2", String.valueOf(pt2));
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand All @@ -122,19 +122,19 @@ public void testNullPartition() throws Exception {
null, null, k1, v1, StringData.fromString(k2), StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", null);
fields.put("pt2", null);
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);
Map<String, String> data = new HashMap<>();
data.put("pt1", null);
data.put("pt2", null);
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand All @@ -161,19 +161,19 @@ public void testEmptyPartition() throws Exception {
StringData.fromString(v2));
expected.setRecord(rowData);

Map<String, String> fields = new HashMap<>();
fields.put("pt1", "");
fields.put("pt2", null);
fields.put("k1", String.valueOf(k1));
fields.put("v1", String.valueOf(v1));
fields.put("k2", k2);
fields.put("v2", v2);
Map<String, String> data = new HashMap<>();
data.put("pt1", "");
data.put("pt2", null);
data.put("k1", String.valueOf(k1));
data.put("v1", String.valueOf(v1));
data.put("k2", k2);
data.put("v2", v2);

actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());

actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
Expand Down
Loading
Loading