Skip to content

Commit

Permalink
[cdc] Use CdcSourceRecord as source type instead of raw String
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Jan 25, 2024
1 parent c408d50 commit 72b36be
Show file tree
Hide file tree
Showing 27 changed files with 399 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,24 +209,22 @@ public static <T extends JsonNode> T asSpecificNodeType(String json, Class<T> cl
}

/** Parses a JSON string and extracts a value of the specified type from the given path keys. */
public static <T> T extractValue(String json, Class<T> valueType, String... path)
public static <T> T extractValue(JsonNode jsonNode, Class<T> valueType, String... path)
throws JsonProcessingException {
JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
for (String key : path) {
currentNode = currentNode.get(key);
if (currentNode == null) {
jsonNode = jsonNode.get(key);
if (jsonNode == null) {
throw new IllegalArgumentException("Invalid path or key not found: " + key);
}
}
return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
return OBJECT_MAPPER_INSTANCE.treeToValue(jsonNode, valueType);
}

/** Checks if a specified node exists in a JSON string. */
public static boolean isNodeExists(String json, String... path) throws JsonProcessingException {
JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
public static boolean isNodeExists(JsonNode jsonNode, String... path) {
for (String key : path) {
currentNode = currentNode.get(key);
if (currentNode == null) {
jsonNode = jsonNode.get(key);
if (jsonNode == null) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;

/**
* A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
* received {@link SourceRecord} to {@link CdcSourceRecord}.
*/
public class CdcDebeziumDeserializationSchema
implements DebeziumDeserializationSchema<CdcSourceRecord> {

private static final long serialVersionUID = 1L;

private transient JsonConverter jsonConverter;

/**
* Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
* schema in messages.
*/
private final Boolean includeSchema;

/** The custom configurations for {@link JsonConverter}. */
private final Map<String, Object> customConverterConfigs;

public CdcDebeziumDeserializationSchema() {
this(false);
}

public CdcDebeziumDeserializationSchema(Boolean includeSchema) {
this(includeSchema, null);
}

public CdcDebeziumDeserializationSchema(
Boolean includeSchema, Map<String, Object> customConverterConfigs) {
this.includeSchema = includeSchema;
this.customConverterConfigs = customConverterConfigs;
}

@Override
public void deserialize(SourceRecord record, Collector<CdcSourceRecord> out) throws Exception {
if (jsonConverter == null) {
initializeJsonConverter();
}
byte[] bytes =
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
out.collect(new CdcSourceRecord(record.topic(), null, new String(bytes)));
}

/** Initialize {@link JsonConverter} with given configs. */
private void initializeJsonConverter() {
jsonConverter = new JsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
if (customConverterConfigs != null) {
configs.putAll(customConverterConfigs);
}
jsonConverter.configure(configs);
}

@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;

/** A simple deserialization schema for {@link CdcSourceRecord}. */
public class CdcDeserializationSchema implements DeserializationSchema<CdcSourceRecord> {

private static final long serialVersionUID = 1L;

private final ObjectMapper objectMapper = new ObjectMapper();

public CdcDeserializationSchema() {
objectMapper
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public CdcSourceRecord deserialize(byte[] message) throws IOException {
if (message == null) {
return null;
}
return new CdcSourceRecord(objectMapper.readValue(message, JsonNode.class));
}

@Override
public boolean isEndOfStream(CdcSourceRecord nextElement) {
return false;
}

@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.annotation.Experimental;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;

/** A data change record from the CDC source. */
@Experimental
public class CdcSourceRecord implements Serializable {

private static final long serialVersionUID = 1L;

@Nullable private final String topic;

@Nullable private final Object key;

private final Object value;

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
this.topic = topic;
this.key = key;
this.value = value;
}

public CdcSourceRecord(Object value) {
this(null, null, value);
}

@Nullable
public String getTopic() {
return topic;
}

@Nullable
public Object getKey() {
return key;
}

public Object getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CdcSourceRecord)) {
return false;
}

CdcSourceRecord that = (CdcSourceRecord) o;
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value);
}

@Override
public String toString() {
return topic + ": " + key + " " + value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void sleepSafely(int duration) {
/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {

List<String> getRecords(int pollTimeOutMills);
List<CdcSourceRecord> getRecords(int pollTimeOutMills);

String topic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected void validateCaseSensitivity() {
}

@Override
protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, Collections.emptyList(), typeMapping, metadataConverters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void checkRequiredOption() {
}
}

public Source<String, ?, ?> provideSource() {
public Source<CdcSourceRecord, ?, ?> provideSource() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
Expand All @@ -193,7 +193,7 @@ public void checkRequiredOption() {
}
}

public FlatMapFunction<String, RichCdcMultiplexRecord> provideRecordParser(
public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected void beforeBuildingSourceSink() throws Exception {
}

@Override
protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, computedColumns, typeMapping, metadataConverters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected Object buildSource() {
return syncJobHandler.provideSource();
}

private DataStreamSource<String> buildDataStreamSource(Object source) {
private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object source) {
if (source instanceof Source) {
boolean isAutomaticWatermarkCreationEnabled =
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
Expand All @@ -137,7 +137,7 @@ private DataStreamSource<String> buildDataStreamSource(Object source) {
Options options = Options.fromMap(tableConfig);
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
WatermarkStrategy<String> watermarkStrategy =
WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
isAutomaticWatermarkCreationEnabled
? watermarkAlignGroup != null
? new CdcWatermarkStrategy(createExtractor(source))
Expand All @@ -152,18 +152,18 @@ private DataStreamSource<String> buildDataStreamSource(Object source) {
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
}
return env.fromSource(
(Source<String, ?, ?>) source,
(Source<CdcSourceRecord, ?, ?>) source,
watermarkStrategy,
syncJobHandler.provideSourceName());
}
if (source instanceof SourceFunction) {
return env.addSource(
(SourceFunction<String>) source, syncJobHandler.provideSourceName());
(SourceFunction<CdcSourceRecord>) source, syncJobHandler.provideSourceName());
}
throw new UnsupportedOperationException("Unrecognized source type");
}

protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> recordParse();
protected abstract FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse();

protected abstract EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory();

Expand Down
Loading

0 comments on commit 72b36be

Please sign in to comment.