Skip to content

Commit

Permalink
[cdc] Fix bug when building schema from delete record (apache#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 authored Jan 8, 2024
1 parent 0aad580 commit 75b1cf5
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -95,14 +96,21 @@ public Schema buildSchema(String record) {
return null;
}

Optional<RichCdcMultiplexRecord> recordOpt = extractRecords().stream().findFirst();
if (!recordOpt.isPresent()) {
throw new RuntimeException("invalid json");
}

Schema.Builder builder = Schema.newBuilder();
extractPaimonFieldTypes().forEach(builder::column);
recordOpt.get().fieldTypes().forEach(builder::column);
builder.primaryKey(extractPrimaryKeys());
return builder.build();
} catch (Exception e) {
logInvalidJsonString(record);
throw e;
// ignore
}

return null;
}

protected abstract List<RichCdcMultiplexRecord> extractRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.getDataFormat;
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.getKafkaEarliestConsumer;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link KafkaSyncTableAction}. */
public class KafkaDebeziumWithSchemaSyncTableActionITCase extends KafkaActionITCaseBase {
Expand Down Expand Up @@ -185,4 +194,36 @@ public void testComputedColumn() throws Exception {
rowType,
Collections.singletonList("id"));
}

@Test
@Timeout(60)
public void testKafkaBuildSchemaWithDelete() throws Exception {
final String topic = "test_kafka_schema";
createTestTopic(topic, 1, 1);
// ---------- Write the Debezium json into Kafka -------------------
List<String> lines =
readLines("kafka/debezium/table/schema/schemaevolution/debezium-data-4.txt");
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Configuration kafkaConfig = Configuration.fromMap(getBasicKafkaConfig());
kafkaConfig.setString(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.setString(TOPIC.key(), topic);

Schema kafkaSchema =
MessageQueueSchemaUtils.getSchema(
getKafkaEarliestConsumer(kafkaConfig),
getDataFormat(kafkaConfig),
TypeMapping.defaultMapping());
List<DataField> fields = new ArrayList<>();
// {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight":
// 3.14}
fields.add(new DataField(0, "id", DataTypes.STRING()));
fields.add(new DataField(1, "name", DataTypes.STRING()));
fields.add(new DataField(2, "description", DataTypes.STRING()));
fields.add(new DataField(3, "weight", DataTypes.STRING()));
assertThat(kafkaSchema.fields()).isEqualTo(fields);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"schema":null, "payload":{"before": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "after": null, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "d", "ts_ms": 1596684883000, "transaction": null}}

0 comments on commit 75b1cf5

Please sign in to comment.