Skip to content

Commit

Permalink
[cdc] Update Paimon-cdc dependency version to CDC-3.0 and flink-1.18
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Mar 25, 2024
1 parent 48aa793 commit fa584dc
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 19 deletions.
16 changes: 7 additions & 9 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.17.2</flink.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.mongodb.cdc.version>2.4.1</flink.mongodb.cdc.version>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.0.1</flink.cdc.version>
<avro.version>1.11.1</avro.version>
<geometry.version>2.2.0</geometry.version>
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<flink.connector.pulsar.version>4.1.0-1.18</flink.connector.pulsar.version>
<flink.connector.kafka.version>3.1.0-1.18</flink.connector.kafka.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -92,7 +91,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>${flink.connector.kafka.version}</version>
<scope>provided</scope>
</dependency>

Expand All @@ -103,11 +102,10 @@ under the License.
<scope>provided</scope>
</dependency>


<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink.mongodb.cdc.version}</version>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -240,7 +238,7 @@ under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${mongodb.testcontainers.version}</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public void checkRequiredOption() {
PULSAR_CONF,
PulsarActionUtils.VALUE_FORMAT,
PulsarOptions.PULSAR_SERVICE_URL,
PulsarOptions.PULSAR_ADMIN_URL,
PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
checkOneRequiredOption(
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -69,6 +68,8 @@ public class MySqlActionUtils {
.withDescription(
"Whether capture the scan the newly added tables or not, by default is true.");

public static final String JDBC_PROPERTIES_PREFIX = "jdbc.properties.";

static Connection getConnection(Configuration mySqlConfig, Map<String, String> jdbcProperties)
throws Exception {
String paramString = "";
Expand Down Expand Up @@ -238,7 +239,7 @@ public static MySqlSource<String> buildMySqlSource(
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Map<String, String> jdbcProperties =
convertToPropertiesPrefixKey(mySqlConfig.toMap(), JdbcUrlUtils.PROPERTIES_PREFIX);
convertToPropertiesPrefixKey(mySqlConfig.toMap(), JDBC_PROPERTIES_PREFIX);

if (typeMapping.containsMode(TINYINT1_NOT_BOOL)) {
String tinyInt1isBit = jdbcProperties.get("tinyInt1isBit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
Expand Down Expand Up @@ -57,7 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
Expand Down Expand Up @@ -162,12 +163,26 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a stream.");

// lower versions of pulsar connector need this option
static final ConfigOption<String> PULSAR_ADMIN_URL =
ConfigOptions.key("pulsar.admin.adminUrl")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
code("http://my-broker.example.com:8080"),
code("https://my-broker.example.com:8443"))
.build());

public static PulsarSource<String> buildPulsarSource(Configuration pulsarConfig) {
PulsarSourceBuilder<String> pulsarSourceBuilder = PulsarSource.builder();

// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
// to be compatible with lower versions
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(new SimpleStringSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -65,8 +64,8 @@ public void testTinyInt1NotBool() throws Exception {
mySqlConfig.put("database-name", "tinyint1_not_bool_test");

// test tinyInt1isBit compatibility and url building
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "useSSL", "false");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "useSSL", "false");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down Expand Up @@ -124,7 +123,7 @@ public void testTinyInt1NotBool() throws Exception {
public void testConflictTinyInt1NotBool() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "tinyint1_not_bool_test");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "true");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "tinyInt1isBit", "true");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
Expand Down Expand Up @@ -147,7 +146,6 @@ public void after() throws Exception {
protected Map<String, String> getBasicPulsarConfig() {
Map<String, String> config = new HashMap<>();
config.put(PULSAR_SERVICE_URL.key(), PULSAR_CONTAINER.getPulsarBrokerUrl());
config.put(PULSAR_ADMIN_URL.key(), PULSAR_CONTAINER.getHttpServiceUrl());
config.put(PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -119,6 +120,11 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
return new SinkRuntimeProviderContext(isBounded())
.createDataStructureConverter(producedDataType);
}

@Override
public Optional<int[][]> getTargetColumns() {
return Optional.empty();
}
};

public static KafkaLogStoreFactory discoverKafkaLogFactory() {
Expand Down

0 comments on commit fa584dc

Please sign in to comment.