Skip to content

Commit

Permalink
[cdc] Update Paimon-cdc dependency version to CDC-3.0 and flink-1.18
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Mar 26, 2024
1 parent c6cb0c2 commit eacf74b
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
<td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
<td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl` (not required after 1.18), and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/pulsar_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
<td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
<td>The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic/topic-pattern`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl` (not required after 1.18), and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog_conf</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.

<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<flink.cdc.version>2.3.0</flink.cdc.version>
<flink.cdc.version>3.0.0</flink.cdc.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>

Expand Down
16 changes: 7 additions & 9 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.17.2</flink.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.mongodb.cdc.version>2.4.1</flink.mongodb.cdc.version>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.0.0</flink.cdc.version>
<avro.version>1.11.1</avro.version>
<geometry.version>2.2.0</geometry.version>
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<flink.connector.pulsar.version>4.1.0-1.18</flink.connector.pulsar.version>
<flink.connector.kafka.version>3.1.0-1.18</flink.connector.kafka.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -92,7 +91,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>${flink.connector.kafka.version}</version>
<scope>provided</scope>
</dependency>

Expand All @@ -103,11 +102,10 @@ under the License.
<scope>provided</scope>
</dependency>


<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink.mongodb.cdc.version}</version>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -240,7 +238,7 @@ under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${mongodb.testcontainers.version}</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public void checkRequiredOption() {
PULSAR_CONF,
PulsarActionUtils.VALUE_FORMAT,
PulsarOptions.PULSAR_SERVICE_URL,
PulsarOptions.PULSAR_ADMIN_URL,
PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
checkOneRequiredOption(
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -69,6 +68,8 @@ public class MySqlActionUtils {
.withDescription(
"Whether capture the scan the newly added tables or not, by default is true.");

public static final String JDBC_PROPERTIES_PREFIX = "jdbc.properties.";

static Connection getConnection(Configuration mySqlConfig, Map<String, String> jdbcProperties)
throws Exception {
String paramString = "";
Expand Down Expand Up @@ -238,7 +239,7 @@ public static MySqlSource<String> buildMySqlSource(
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Map<String, String> jdbcProperties =
convertToPropertiesPrefixKey(mySqlConfig.toMap(), JdbcUrlUtils.PROPERTIES_PREFIX);
convertToPropertiesPrefixKey(mySqlConfig.toMap(), JDBC_PROPERTIES_PREFIX);

if (typeMapping.containsMode(TINYINT1_NOT_BOOL)) {
String tinyInt1isBit = jdbcProperties.get("tinyInt1isBit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
Expand Down Expand Up @@ -57,7 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
Expand Down Expand Up @@ -162,12 +163,26 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a stream.");

// lower versions of pulsar connector need this option
static final ConfigOption<String> PULSAR_ADMIN_URL =
ConfigOptions.key("pulsar.admin.adminUrl")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
code("http://my-broker.example.com:8080"),
code("https://my-broker.example.com:8443"))
.build());

public static PulsarSource<String> buildPulsarSource(Configuration pulsarConfig) {
PulsarSourceBuilder<String> pulsarSourceBuilder = PulsarSource.builder();

// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
// to be compatible with lower versions
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(new SimpleStringSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -65,8 +64,8 @@ public void testTinyInt1NotBool() throws Exception {
mySqlConfig.put("database-name", "tinyint1_not_bool_test");

// test tinyInt1isBit compatibility and url building
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "useSSL", "false");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "useSSL", "false");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down Expand Up @@ -124,7 +123,7 @@ public void testTinyInt1NotBool() throws Exception {
public void testConflictTinyInt1NotBool() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "tinyint1_not_bool_test");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "true");
mySqlConfig.put(MySqlActionUtils.JDBC_PROPERTIES_PREFIX + "tinyInt1isBit", "true");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
Expand Down Expand Up @@ -147,7 +146,6 @@ public void after() throws Exception {
protected Map<String, String> getBasicPulsarConfig() {
Map<String, String> config = new HashMap<>();
config.put(PULSAR_SERVICE_URL.key(), PULSAR_CONTAINER.getPulsarBrokerUrl());
config.put(PULSAR_ADMIN_URL.key(), PULSAR_CONTAINER.getHttpServiceUrl());
config.put(PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -119,6 +120,11 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
return new SinkRuntimeProviderContext(isBounded())
.createDataStructureConverter(producedDataType);
}

@Override
public Optional<int[][]> getTargetColumns() {
return Optional.empty();
}
};

public static KafkaLogStoreFactory discoverKafkaLogFactory() {
Expand Down

0 comments on commit eacf74b

Please sign in to comment.