Skip to content

Commit

Permalink
[cdc] Fix kafka canal-json cdc action cannot find confluent schema re…
Browse files Browse the repository at this point in the history
…gistry client class when converting dataformat (#4024)
  • Loading branch information
zhuangchong authored Aug 26, 2024
1 parent b6e8963 commit 67032b3
Show file tree
Hide file tree
Showing 23 changed files with 640 additions and 173 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.List;
import java.util.function.Function;

/** Data format common implementation of {@link DataFormat}. */
public abstract class AbstractDataFormat implements DataFormat {

/** Factory for creating AbstractRecordParser. */
protected abstract RecordParserFactory parser();

/** Deserializer for Kafka Record. */
protected abstract Function<Configuration, KafkaDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer();

/** Deserializer for Pulsar Record. */
protected abstract Function<Configuration, DeserializationSchema<CdcSourceRecord>>
pulsarDeserializer();

@Override
public AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
return parser().createParser(typeMapping, computedColumns);
}

@Override
public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig) {
return kafkaDeserializer().apply(cdcSourceConfig);
}

@Override
public DeserializationSchema<CdcSourceRecord> createPulsarDeserializer(
Configuration cdcSourceConfig) {
return pulsarDeserializer().apply(cdcSourceConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.function.Function;

/**
* The message queue's record json deserialization class common implementation of {@link
* DataFormat}.
*/
public abstract class AbstractJsonDataFormat extends AbstractDataFormat {

@Override
protected Function<Configuration, KafkaDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer() {
return KafkaDebeziumJsonDeserializationSchema::new;
}

@Override
protected Function<Configuration, DeserializationSchema<CdcSourceRecord>> pulsarDeserializer() {
return CdcJsonDeserializationSchema::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,18 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.json.JsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema;
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarDebeziumAvroDeserializationSchema;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.List;
import java.util.function.Function;

/**
* Enumerates the supported data formats for message queue and provides a mechanism to create their
* associated {@link AbstractRecordParser}.
*
* <p>Each data format is associated with a specific implementation of {@link RecordParserFactory},
* which can be used to create instances of {@link AbstractRecordParser} for that format.
* Supports the message queue's data format and provides definitions for the message queue's record
* deserialization class and parsing class {@link AbstractRecordParser}.
*/
public enum DataFormat {
CANAL_JSON(
CanalRecordParser::new,
KafkaDebeziumJsonDeserializationSchema::new,
CdcJsonDeserializationSchema::new),
OGG_JSON(
OggRecordParser::new,
KafkaDebeziumJsonDeserializationSchema::new,
CdcJsonDeserializationSchema::new),
MAXWELL_JSON(
MaxwellRecordParser::new,
KafkaDebeziumJsonDeserializationSchema::new,
CdcJsonDeserializationSchema::new),
DEBEZIUM_JSON(
DebeziumJsonRecordParser::new,
KafkaDebeziumJsonDeserializationSchema::new,
CdcJsonDeserializationSchema::new),
DEBEZIUM_AVRO(
DebeziumAvroRecordParser::new,
KafkaDebeziumAvroDeserializationSchema::new,
PulsarDebeziumAvroDeserializationSchema::new),
JSON(
JsonRecordParser::new,
KafkaDebeziumJsonDeserializationSchema::new,
CdcJsonDeserializationSchema::new);

// Add more data formats here if needed

private final RecordParserFactory parser;
// Deserializer for Kafka
private final Function<Configuration, KafkaDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer;
// Deserializer for Pulsar
private final Function<Configuration, DeserializationSchema<CdcSourceRecord>>
pulsarDeserializer;

DataFormat(
RecordParserFactory parser,
Function<Configuration, KafkaDeserializationSchema<CdcSourceRecord>> kafkaDeserializer,
Function<Configuration, DeserializationSchema<CdcSourceRecord>> pulsarDeserializer) {
this.parser = parser;
this.kafkaDeserializer = kafkaDeserializer;
this.pulsarDeserializer = pulsarDeserializer;
}
public interface DataFormat {

/**
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
Expand All @@ -98,32 +41,11 @@ public enum DataFormat {
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link AbstractRecordParser}.
*/
public AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
return parser.createParser(typeMapping, computedColumns);
}

public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig) {
return kafkaDeserializer.apply(cdcSourceConfig);
}

public DeserializationSchema<CdcSourceRecord> createPulsarDeserializer(
Configuration cdcSourceConfig) {
return pulsarDeserializer.apply(cdcSourceConfig);
}
AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns);

/** Returns the configuration string representation of this data format. */
public String asConfigString() {
return this.name().toLowerCase().replace("_", "-");
}
KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig);

public static DataFormat fromConfigString(String format) {
try {
return DataFormat.valueOf(format.replace("-", "_").toUpperCase());
} catch (Exception e) {
throw new UnsupportedOperationException(
String.format("This format: %s is not supported.", format));
}
}
DeserializationSchema<CdcSourceRecord> createPulsarDeserializer(Configuration cdcSourceConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.factories.Factory;
import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.factories.FactoryUtil;

/** Factory to create {@link DataFormat}. */
public interface DataFormatFactory extends Factory {

DataFormat create();

static DataFormat createDataFormat(String format) {
String identifier = format.toLowerCase();
DataFormatFactory dataFormatFactory;
try {
dataFormatFactory =
FactoryUtil.discoverFactory(
DataFormatFactory.class.getClassLoader(),
DataFormatFactory.class,
identifier);
} catch (FactoryException e) {
throw new UnsupportedOperationException(
String.format("This format: %s is not supported.", format));
}
return dataFormatFactory.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.canal;

import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;

/**
* Supports the message queue's canal json data format and provides definitions for the message
* queue's record json deserialization class and parsing class {@link CanalRecordParser}.
*/
public class CanalDataFormat extends AbstractJsonDataFormat {

@Override
protected RecordParserFactory parser() {
return CanalRecordParser::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.canal;

import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;

/** Factory to create {@link CanalDataFormat}. */
public class CanalDataFormatFactory implements DataFormatFactory {

public static final String IDENTIFIER = "canal-json";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public DataFormat create() {
return new CanalDataFormat();
}
}
Loading

0 comments on commit 67032b3

Please sign in to comment.