From 83ff8139443f6ed2878380021a17537ba8e7c3dd Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Tue, 5 Nov 2024 10:50:19 +0800 Subject: [PATCH] fix ambiguous naming in CdcRecord. --- .../paimon/flink/sink/cdc/CdcRecord.java | 25 +- .../paimon/flink/sink/cdc/CdcRecordUtils.java | 10 +- .../paimon/flink/sink/cdc/RichCdcRecord.java | 8 +- ...CdcMultiplexRecordChannelComputerTest.java | 10 +- .../cdc/CdcRecordChannelComputerTest.java | 10 +- .../CdcRecordKeyAndBucketExtractorTest.java | 56 ++-- .../CdcRecordStoreMultiWriteOperatorTest.java | 256 ++++++++---------- .../cdc/CdcRecordStoreWriteOperatorTest.java | 88 +++--- .../paimon/flink/sink/cdc/TestTable.java | 18 +- 9 files changed, 229 insertions(+), 252 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java index 9adca753dc55..b23d0d6f06de 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java @@ -35,11 +35,12 @@ public class CdcRecord implements Serializable { private RowKind kind; - private final Map fields; + // field name -> value + private final Map data; - public CdcRecord(RowKind kind, Map fields) { + public CdcRecord(RowKind kind, Map data) { this.kind = kind; - this.fields = fields; + this.data = data; } public static CdcRecord emptyRecord() { @@ -50,16 +51,16 @@ public RowKind kind() { return kind; } - public Map fields() { - return fields; + public Map data() { + return data; } public CdcRecord fieldNameLowerCase() { - Map newFields = new HashMap<>(); - for (Map.Entry entry : fields.entrySet()) { - newFields.put(entry.getKey().toLowerCase(), entry.getValue()); + Map newData = new HashMap<>(); + for (Map.Entry entry : data.entrySet()) { + newData.put(entry.getKey().toLowerCase(), entry.getValue()); } - return new CdcRecord(kind, newFields); + return new CdcRecord(kind, newData); } @Override @@ -69,16 +70,16 @@ public boolean equals(Object o) { } CdcRecord that = (CdcRecord) o; - return Objects.equals(kind, that.kind) && Objects.equals(fields, that.fields); + return Objects.equals(kind, that.kind) && Objects.equals(data, that.data); } @Override public int hashCode() { - return Objects.hash(kind, fields); + return Objects.hash(kind, data); } @Override public String toString() { - return kind.shortString() + " " + fields; + return kind.shortString() + " " + data; } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java index 0d192dd538d6..91979a2c99b8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java @@ -54,7 +54,7 @@ public static GenericRow projectAsInsert(CdcRecord record, List dataF GenericRow genericRow = new GenericRow(dataFields.size()); for (int i = 0; i < dataFields.size(); i++) { DataField dataField = dataFields.get(i); - String fieldValue = record.fields().get(dataField.name()); + String fieldValue = record.data().get(dataField.name()); if (fieldValue != null) { genericRow.setField( i, TypeUtils.castFromCdcValueString(fieldValue, dataField.type())); @@ -83,7 +83,7 @@ public static Optional toGenericRow(CdcRecord record, List fieldNames = dataFields.stream().map(DataField::name).collect(Collectors.toList()); - for (Map.Entry field : record.fields().entrySet()) { + for (Map.Entry field : record.data().entrySet()) { String key = field.getKey(); String value = field.getValue(); @@ -117,14 +117,14 @@ public static Optional toGenericRow(CdcRecord record, List fieldNames) { - Map fields = new HashMap<>(); + Map data = new HashMap<>(); for (int i = 0; i < row.getFieldCount(); i++) { Object field = row.getField(i); if (field != null) { - fields.put(fieldNames.get(i), field.toString()); + data.put(fieldNames.get(i), field.toString()); } } - return new CdcRecord(row.getRowKind(), fields); + return new CdcRecord(row.getRowKind(), data); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java index 7fc0c3ff7b09..04b86fea568f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java @@ -48,7 +48,7 @@ public RichCdcRecord(CdcRecord cdcRecord, List fields) { } public boolean hasPayload() { - return !cdcRecord.fields().isEmpty(); + return !cdcRecord.data().isEmpty(); } public RowKind kind() { @@ -95,7 +95,7 @@ public static class Builder { private final RowKind kind; private final AtomicInteger fieldId; private final List fields = new ArrayList<>(); - private final Map fieldValues = new HashMap<>(); + private final Map data = new HashMap<>(); public Builder(RowKind kind, AtomicInteger fieldId) { this.kind = kind; @@ -109,12 +109,12 @@ public Builder field(String name, DataType type, String value) { public Builder field( String name, DataType type, String value, @Nullable String description) { fields.add(new DataField(fieldId.incrementAndGet(), name, type, description)); - fieldValues.put(name, value); + data.put(name, value); return this; } public RichCdcRecord build() { - return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields); + return new RichCdcRecord(new CdcRecord(kind, data), fields); } } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java index ce0d484f4ce4..867cbdbae002 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java @@ -163,9 +163,9 @@ private void testImpl(Identifier tableId, List> input) { // assert that insert and delete records are routed into same channel - for (Map fields : input) { - CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields); - CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields); + for (Map data : input) { + CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data); + CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data); assertThat( channelComputer.channel( @@ -184,8 +184,8 @@ private void testImpl(Identifier tableId, List> input) { // assert that channel >= 0 int numTests = random.nextInt(10) + 1; for (int test = 0; test < numTests; test++) { - Map fields = input.get(random.nextInt(input.size())); - CdcRecord record = new CdcRecord(RowKind.INSERT, fields); + Map data = input.get(random.nextInt(input.size())); + CdcRecord record = new CdcRecord(RowKind.INSERT, data); int numBuckets = random.nextInt(numChannels * 4) + 1; for (int i = 0; i < numBuckets; i++) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java index 9a19013e2983..8271ad18751c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java @@ -128,9 +128,9 @@ private void testImpl(TableSchema schema, List> input) { // assert that channel(record) and channel(partition, bucket) gives the same result - for (Map fields : input) { - CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields); - CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields); + for (Map data : input) { + CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data); + CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data); extractor.setRecord(random.nextBoolean() ? insertRecord : deleteRecord); BinaryRow partition = extractor.partition(); @@ -151,8 +151,8 @@ private void testImpl(TableSchema schema, List> input) { bucketsPerChannel.put(i, 0); } - Map fields = input.get(random.nextInt(input.size())); - extractor.setRecord(new CdcRecord(RowKind.INSERT, fields)); + Map data = input.get(random.nextInt(input.size())); + extractor.setRecord(new CdcRecord(RowKind.INSERT, data)); BinaryRow partition = extractor.partition(); int numBuckets = random.nextInt(numChannels * 4) + 1; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java index 8384b7155a0e..802a3ea9d4cf 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java @@ -87,19 +87,19 @@ public void testExtract() throws Exception { StringData.fromString(v2)); expected.setRecord(rowData); - Map fields = new HashMap<>(); - fields.put("pt1", pt1); - fields.put("pt2", String.valueOf(pt2)); - fields.put("k1", String.valueOf(k1)); - fields.put("v1", String.valueOf(v1)); - fields.put("k2", k2); - fields.put("v2", v2); - - actual.setRecord(new CdcRecord(RowKind.INSERT, fields)); + Map data = new HashMap<>(); + data.put("pt1", pt1); + data.put("pt2", String.valueOf(pt2)); + data.put("k1", String.valueOf(k1)); + data.put("v1", String.valueOf(v1)); + data.put("k2", k2); + data.put("v2", v2); + + actual.setRecord(new CdcRecord(RowKind.INSERT, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); - actual.setRecord(new CdcRecord(RowKind.DELETE, fields)); + actual.setRecord(new CdcRecord(RowKind.DELETE, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); } @@ -122,19 +122,19 @@ public void testNullPartition() throws Exception { null, null, k1, v1, StringData.fromString(k2), StringData.fromString(v2)); expected.setRecord(rowData); - Map fields = new HashMap<>(); - fields.put("pt1", null); - fields.put("pt2", null); - fields.put("k1", String.valueOf(k1)); - fields.put("v1", String.valueOf(v1)); - fields.put("k2", k2); - fields.put("v2", v2); + Map data = new HashMap<>(); + data.put("pt1", null); + data.put("pt2", null); + data.put("k1", String.valueOf(k1)); + data.put("v1", String.valueOf(v1)); + data.put("k2", k2); + data.put("v2", v2); - actual.setRecord(new CdcRecord(RowKind.INSERT, fields)); + actual.setRecord(new CdcRecord(RowKind.INSERT, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); - actual.setRecord(new CdcRecord(RowKind.DELETE, fields)); + actual.setRecord(new CdcRecord(RowKind.DELETE, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); } @@ -161,19 +161,19 @@ public void testEmptyPartition() throws Exception { StringData.fromString(v2)); expected.setRecord(rowData); - Map fields = new HashMap<>(); - fields.put("pt1", ""); - fields.put("pt2", null); - fields.put("k1", String.valueOf(k1)); - fields.put("v1", String.valueOf(v1)); - fields.put("k2", k2); - fields.put("v2", v2); + Map data = new HashMap<>(); + data.put("pt1", ""); + data.put("pt2", null); + data.put("k1", String.valueOf(k1)); + data.put("v1", String.valueOf(v1)); + data.put("k2", k2); + data.put("v2", v2); - actual.setRecord(new CdcRecord(RowKind.INSERT, fields)); + actual.setRecord(new CdcRecord(RowKind.INSERT, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); - actual.setRecord(new CdcRecord(RowKind.DELETE, fields)); + actual.setRecord(new CdcRecord(RowKind.DELETE, data)); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.bucket()).isEqualTo(expected.bucket()); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index 2a1bb4004306..8c78ab853a60 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -172,16 +172,14 @@ public void testAsyncTableCreate() throws Exception { t.start(); // check that records should be processed after table is created - Map fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); + Map data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); CdcMultiplexRecord actual = runner.poll(1); @@ -192,15 +190,13 @@ public void testAsyncTableCreate() throws Exception { assertThat(actual).isEqualTo(expected); // after table is created, record should be processed immediately - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "3"); - fields.put("v", "30"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "3"); + data.put("v", "30"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.take(); assertThat(actual).isEqualTo(expected); @@ -227,16 +223,14 @@ public void testInitializeState() throws Exception { t.start(); // check that records should be processed after table is created - Map fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); + Map data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); CdcMultiplexRecord actual = runner.poll(1); @@ -254,15 +248,13 @@ public void testInitializeState() throws Exception { assertThat(operator.writes().size()).isEqualTo(1); // after table is created, record should be processed immediately - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "3"); - fields.put("v", "30"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "3"); + data.put("v", "30"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.take(); assertThat(actual).isEqualTo(expected); @@ -302,44 +294,38 @@ public void testSingleTableAddColumn() throws Exception { // check that records with compatible schema can be processed immediately - Map fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); + Map data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); CdcMultiplexRecord actual = runner.take(); assertThat(actual).isEqualTo(expected); - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "2"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "2"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.take(); assertThat(actual).isEqualTo(expected); - // check that records with new fields should be processed after schema is updated + // check that records with new data should be processed after schema is updated - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "3"); - fields.put("v", "30"); - fields.put("v2", "300"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "3"); + data.put("v", "30"); + data.put("v2", "300"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -383,34 +369,30 @@ public void testSingleTableUpdateColumnType() throws Exception { // check that records with compatible schema can be processed immediately - Map fields = new HashMap<>(); - fields.put("k", "1"); - fields.put("v1", "10"); - fields.put("v2", "0.625"); - fields.put("v3", "one"); - fields.put("v4", "b_one"); + Map data = new HashMap<>(); + data.put("k", "1"); + data.put("v1", "10"); + data.put("v2", "0.625"); + data.put("v3", "one"); + data.put("v4", "b_one"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); CdcMultiplexRecord actual = runner.take(); assertThat(actual).isEqualTo(expected); - // check that records with new fields should be processed after schema is updated + // check that records with new data should be processed after schema is updated // int -> bigint - fields = new HashMap<>(); - fields.put("k", "2"); - fields.put("v1", "12345678987654321"); - fields.put("v2", "0.25"); + data = new HashMap<>(); + data.put("k", "2"); + data.put("v1", "12345678987654321"); + data.put("v2", "0.25"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -422,15 +404,13 @@ public void testSingleTableUpdateColumnType() throws Exception { // float -> double - fields = new HashMap<>(); - fields.put("k", "3"); - fields.put("v1", "100"); - fields.put("v2", "1.0000000000009095"); + data = new HashMap<>(); + data.put("k", "3"); + data.put("v1", "100"); + data.put("v2", "1.0000000000009095"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -441,15 +421,13 @@ public void testSingleTableUpdateColumnType() throws Exception { // varchar(5) -> varchar(10) - fields = new HashMap<>(); - fields.put("k", "4"); - fields.put("v1", "40"); - fields.put("v3", "long four"); + data = new HashMap<>(); + data.put("k", "4"); + data.put("v1", "40"); + data.put("v3", "long four"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -460,15 +438,13 @@ public void testSingleTableUpdateColumnType() throws Exception { // varbinary(5) -> varbinary(10) - fields = new HashMap<>(); - fields.put("k", "5"); - fields.put("v1", "50"); - fields.put("v4", "long five~"); + data = new HashMap<>(); + data.put("k", "5"); + data.put("v1", "50"); + data.put("v4", "long five~"); expected = CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + databaseName, tableId.getObjectName(), new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -499,53 +475,53 @@ public void testMultiTableUpdateColumnType() throws Exception { // check that records with compatible schema from different tables // can be processed immediately - Map fields; + Map data; // first table record - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( databaseName, firstTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); CdcMultiplexRecord actual = runner.take(); assertThat(actual).isEqualTo(expected); // second table record - fields = new HashMap<>(); - fields.put("k", "1"); - fields.put("v1", "10"); - fields.put("v2", "0.625"); - fields.put("v3", "one"); - fields.put("v4", "b_one"); + data = new HashMap<>(); + data.put("k", "1"); + data.put("v1", "10"); + data.put("v2", "0.625"); + data.put("v3", "one"); + data.put("v4", "b_one"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.take(); assertThat(actual).isEqualTo(expected); - // check that records with new fields should be processed after schema is updated + // check that records with new data should be processed after schema is updated // int -> bigint SchemaManager schemaManager; // first table - fields = new HashMap<>(); - fields.put("pt", "1"); - fields.put("k", "123456789876543211"); - fields.put("v", "varchar"); + data = new HashMap<>(); + data.put("pt", "1"); + data.put("k", "123456789876543211"); + data.put("v", "varchar"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, firstTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -556,15 +532,15 @@ public void testMultiTableUpdateColumnType() throws Exception { assertThat(actual).isEqualTo(expected); // second table - fields = new HashMap<>(); - fields.put("k", "2"); - fields.put("v1", "12345678987654321"); - fields.put("v2", "0.25"); + data = new HashMap<>(); + data.put("k", "2"); + data.put("v1", "12345678987654321"); + data.put("v2", "0.25"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -577,15 +553,15 @@ public void testMultiTableUpdateColumnType() throws Exception { // below are schema changes only from the second table // float -> double - fields = new HashMap<>(); - fields.put("k", "3"); - fields.put("v1", "100"); - fields.put("v2", "1.0000000000009095"); + data = new HashMap<>(); + data.put("k", "3"); + data.put("v1", "100"); + data.put("v2", "1.0000000000009095"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -597,15 +573,15 @@ public void testMultiTableUpdateColumnType() throws Exception { // varchar(5) -> varchar(10) - fields = new HashMap<>(); - fields.put("k", "4"); - fields.put("v1", "40"); - fields.put("v3", "long four"); + data = new HashMap<>(); + data.put("k", "4"); + data.put("v1", "40"); + data.put("v3", "long four"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -617,15 +593,15 @@ public void testMultiTableUpdateColumnType() throws Exception { // varbinary(5) -> varbinary(10) - fields = new HashMap<>(); - fields.put("k", "5"); - fields.put("v1", "50"); - fields.put("v4", "long five~"); + data = new HashMap<>(); + data.put("k", "5"); + data.put("v1", "50"); + data.put("v4", "long five~"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -651,33 +627,33 @@ public void testUsingTheSameCompactExecutor() throws Exception { t.start(); // write records to two tables thus two FileStoreWrite will be created - Map fields; + Map data; // first table record - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord( databaseName, firstTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); // second table record - fields = new HashMap<>(); - fields.put("k", "1"); - fields.put("v1", "10"); - fields.put("v2", "0.625"); - fields.put("v3", "one"); - fields.put("v4", "b_one"); + data = new HashMap<>(); + data.put("k", "1"); + data.put("v1", "10"); + data.put("v2", "0.625"); + data.put("v3", "one"); + data.put("v4", "b_one"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, secondTable.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); + new CdcRecord(RowKind.INSERT, data)); runner.offer(expected); // get and check compactExecutor from two FileStoreWrite diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index 9af7eabdaaad..f3693fe405de 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -106,31 +106,31 @@ public void testAddColumn() throws Exception { // check that records with compatible schema can be processed immediately - Map fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); - CdcRecord expected = new CdcRecord(RowKind.INSERT, fields); + Map data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "1"); + data.put("v", "10"); + CdcRecord expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); CdcRecord actual = runner.take(); assertThat(actual).isEqualTo(expected); - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "2"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "2"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.take(); assertThat(actual).isEqualTo(expected); - // check that records with new fields should be processed after schema is updated + // check that records with new data should be processed after schema is updated - fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "3"); - fields.put("v", "30"); - fields.put("v2", "300"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("pt", "0"); + data.put("k", "3"); + data.put("v", "30"); + data.put("v2", "300"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -172,26 +172,26 @@ public void testUpdateColumnType() throws Exception { // check that records with compatible schema can be processed immediately - Map fields = new HashMap<>(); - fields.put("k", "1"); - fields.put("v1", "10"); - fields.put("v2", "0.625"); - fields.put("v3", "one"); - fields.put("v4", "b_one"); - CdcRecord expected = new CdcRecord(RowKind.INSERT, fields); + Map data = new HashMap<>(); + data.put("k", "1"); + data.put("v1", "10"); + data.put("v2", "0.625"); + data.put("v3", "one"); + data.put("v4", "b_one"); + CdcRecord expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); CdcRecord actual = runner.take(); assertThat(actual).isEqualTo(expected); - // check that records with new fields should be processed after schema is updated + // check that records with new data should be processed after schema is updated // int -> bigint - fields = new HashMap<>(); - fields.put("k", "2"); - fields.put("v1", "12345678987654321"); - fields.put("v2", "0.25"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("k", "2"); + data.put("v1", "12345678987654321"); + data.put("v2", "0.25"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -203,11 +203,11 @@ public void testUpdateColumnType() throws Exception { // float -> double - fields = new HashMap<>(); - fields.put("k", "3"); - fields.put("v1", "100"); - fields.put("v2", "1.0000000000009095"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("k", "3"); + data.put("v1", "100"); + data.put("v2", "1.0000000000009095"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -218,11 +218,11 @@ public void testUpdateColumnType() throws Exception { // varchar(5) -> varchar(10) - fields = new HashMap<>(); - fields.put("k", "4"); - fields.put("v1", "40"); - fields.put("v3", "long four"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("k", "4"); + data.put("v1", "40"); + data.put("v3", "long four"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); @@ -233,11 +233,11 @@ public void testUpdateColumnType() throws Exception { // varbinary(5) -> varbinary(10) - fields = new HashMap<>(); - fields.put("k", "5"); - fields.put("v1", "50"); - fields.put("v4", "long five~"); - expected = new CdcRecord(RowKind.INSERT, fields); + data = new HashMap<>(); + data.put("k", "5"); + data.put("v1", "50"); + data.put("v4", "long five~"); + expected = new CdcRecord(RowKind.INSERT, data); runner.offer(expected); actual = runner.poll(1); assertThat(actual).isNull(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java index 525a05096942..6a38c1c2659d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java @@ -114,18 +114,18 @@ public TestTable( } events.add(new TestCdcEvent(tableName, currentDataFieldList(fieldNames, isBigInt))); } else { - Map fields = new HashMap<>(); + Map data = new HashMap<>(); int key = random.nextInt(numKeys); - fields.put("k", String.valueOf(key)); + data.put("k", String.valueOf(key)); int pt = key % numPartitions; - fields.put("pt", String.valueOf(pt)); + data.put("pt", String.valueOf(pt)); for (int j = 0; j < fieldNames.size(); j++) { String fieldName = fieldNames.get(j); if (isBigInt.get(j)) { - fields.put(fieldName, String.valueOf(random.nextLong())); + data.put(fieldName, String.valueOf(random.nextLong())); } else { - fields.put(fieldName, String.valueOf(random.nextInt())); + data.put(fieldName, String.valueOf(random.nextInt())); } } @@ -140,8 +140,8 @@ public TestTable( shouldInsert = random.nextInt(5) > 0; } if (shouldInsert) { - records.add(new CdcRecord(RowKind.INSERT, fields)); - expected.put(key, fields); + records.add(new CdcRecord(RowKind.INSERT, data)); + expected.put(key, data); } } // Generate test data for append table @@ -149,8 +149,8 @@ public TestTable( if (expected.containsKey(key)) { records.add(new CdcRecord(RowKind.DELETE, expected.get(key))); } else { - records.add(new CdcRecord(RowKind.INSERT, fields)); - expected.put(key, fields); + records.add(new CdcRecord(RowKind.INSERT, data)); + expected.put(key, data); } } events.add(new TestCdcEvent(tableName, records, Objects.hash(tableName, key)));