Skip to content

Commit

Permalink
[flink] Support canal json row type (#2056)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Oct 8, 2023
1 parent 04179e3 commit 2449cb2
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class CanalRecordParser extends RecordParser {
private static final String OP_UPDATE = "UPDATE";
private static final String OP_INSERT = "INSERT";
private static final String OP_DELETE = "DELETE";
private static final String OP_ROW = "ROW";

@Override
protected boolean isDDL() {
Expand Down Expand Up @@ -110,6 +111,7 @@ public List<RichCdcMultiplexRecord> extractRecords() {
processRecord(data, RowKind.INSERT, records);
break;
case OP_INSERT:
case OP_ROW:
processRecord(data, RowKind.INSERT, records);
break;
case OP_DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,4 +957,75 @@ public void testCatalogAndTableConfig() {
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}

@Test
@Timeout(60)
public void testCDCOperations() throws Exception {
final String topic = "event-insert";
createTestTopic(topic, 1, 1);

// ---------- Write the Canal json into Kafka -------------------
writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-row.txt"));

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put("value.format", "canal-json");
kafkaConfig.put("topic", topic);

KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);
List<String> primaryKeys = Collections.singletonList("_id");
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(),
DataTypes.INT().notNull(),
DataTypes.VARCHAR(20),
DataTypes.BIGINT(),
DataTypes.DECIMAL(8, 3),
DataTypes.VARBINARY(10),
DataTypes.FLOAT()
},
new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"});

// For the ROW operation
List<String> expectedRow =
Collections.singletonList(
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
waitForResult(expectedRow, table, rowType, primaryKeys);

writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-insert.txt"));

// For the INSERT operation
List<String> expectedInsert =
Arrays.asList(
"+I[1, 1, one, NULL, NULL, NULL, NULL]",
"+I[1, 2, two, NULL, NULL, NULL, NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedInsert, table, rowType, primaryKeys);

writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-update.txt"));

// For the UPDATE operation
List<String> expectedUpdate =
Arrays.asList(
"+I[1, 1, one, NULL, NULL, NULL, NULL]",
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedUpdate, table, rowType, primaryKeys);

writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-delete.txt"));

// For the DELETE operation
List<String> expectedDelete =
Arrays.asList(
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedDelete, table, rowType, primaryKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683168079000,"id":42,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079391,"type":"DELETE"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"event","ts":1683006706728,"type":"INSERT"}
{"data":[{"pt":"1","_id":"2","v1":"two"},{"pt":"2","_id":"4","v1":"four"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"event","ts":1683006724404,"type":"INSERT"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin","v5":"9.9"}],"database":"paimon_sync_table","es":1683168155000,"id":55,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":7},"table":"schema_evolution_1","ts":1683168155270,"type":"INSERT"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"pt":"1","_id":"2","v1":"second","v2":null}],"database":"paimon_sync_table","es":1683008290000,"id":18,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":[{"v1":"two"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290626,"type":"UPDATE"}

0 comments on commit 2449cb2

Please sign in to comment.