diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java index 62ca27a4fa6a..f0404fa46117 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java @@ -65,6 +65,7 @@ public class CanalRecordParser extends RecordParser { private static final String OP_UPDATE = "UPDATE"; private static final String OP_INSERT = "INSERT"; private static final String OP_DELETE = "DELETE"; + private static final String OP_ROW = "ROW"; @Override protected boolean isDDL() { @@ -110,6 +111,7 @@ public List extractRecords() { processRecord(data, RowKind.INSERT, records); break; case OP_INSERT: + case OP_ROW: processRecord(data, RowKind.INSERT, records); break; case OP_DELETE: diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index d21dad37e071..8d3440345bc2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -957,4 +957,75 @@ public void testCatalogAndTableConfig() { assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + + @Test + @Timeout(60) + public void testCDCOperations() throws Exception { + final String topic = "event-insert"; + createTestTopic(topic, 1, 1); + + // ---------- Write the Canal json into Kafka ------------------- + writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-row.txt")); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", topic); + + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + List primaryKeys = Collections.singletonList("_id"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(20), + DataTypes.BIGINT(), + DataTypes.DECIMAL(8, 3), + DataTypes.VARBINARY(10), + DataTypes.FLOAT() + }, + new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"}); + + // For the ROW operation + List expectedRow = + Collections.singletonList( + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"); + waitForResult(expectedRow, table, rowType, primaryKeys); + + writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-insert.txt")); + + // For the INSERT operation + List expectedInsert = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, two, NULL, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", + "+I[2, 4, four, NULL, NULL, NULL, NULL]"); + waitForResult(expectedInsert, table, rowType, primaryKeys); + + writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-update.txt")); + + // For the UPDATE operation + List expectedUpdate = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", + "+I[2, 4, four, NULL, NULL, NULL, NULL]"); + waitForResult(expectedUpdate, table, rowType, primaryKeys); + + writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-delete.txt")); + + // For the DELETE operation + List expectedDelete = + Arrays.asList( + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", + "+I[2, 4, four, NULL, NULL, NULL, NULL]"); + waitForResult(expectedDelete, table, rowType, primaryKeys); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-delete.txt new file mode 100644 index 000000000000..231b3d7a9773 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-delete.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683168079000,"id":42,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079391,"type":"DELETE"} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-insert.txt new file mode 100644 index 000000000000..191744562d55 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-insert.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"event","ts":1683006706728,"type":"INSERT"} +{"data":[{"pt":"1","_id":"2","v1":"two"},{"pt":"2","_id":"4","v1":"four"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"event","ts":1683006724404,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-row.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-row.txt new file mode 100644 index 000000000000..2084a8d85d6d --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-row.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin","v5":"9.9"}],"database":"paimon_sync_table","es":1683168155000,"id":55,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":7},"table":"schema_evolution_1","ts":1683168155270,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-update.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-update.txt new file mode 100644 index 000000000000..3ef8a8a2bd21 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/event/event-update.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"pt":"1","_id":"2","v1":"second","v2":null}],"database":"paimon_sync_table","es":1683008290000,"id":18,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":[{"v1":"two"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290626,"type":"UPDATE"}