Skip to content

Commit

Permalink
[cdc] Update cdc module dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 30, 2023
1 parent a9a798b commit cc75ec6
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 21 deletions.
11 changes: 6 additions & 5 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.17.2</flink.version>
<flink.mysql.cdc.version>2.3.0</flink.mysql.cdc.version>
<flink.mongodb.cdc.version>2.4.1</flink.mongodb.cdc.version>
<flink.version>1.18.0</flink.version>
<flink.mysql.cdc.version>3.0.0</flink.mysql.cdc.version>
<flink.mongodb.cdc.version>3.0.0</flink.mongodb.cdc.version>
<avro.version>1.11.1</avro.version>
<geometry.version>2.2.0</geometry.version>
<json-path.version>2.8.0</json-path.version>
<mongodb.testcontainers.version>1.18.3</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<flink.connector.pulsar.version>4.1.0-1.18</flink.connector.pulsar.version>
<flink.connector.kafka.version>3.0.2-1.18</flink.connector.kafka.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -99,7 +100,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>${flink.connector.kafka.version}</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void checkRequiredOption() {
PULSAR_CONF,
PulsarActionUtils.VALUE_FORMAT,
PulsarOptions.PULSAR_SERVICE_URL,
PulsarOptions.PULSAR_ADMIN_URL,
PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
checkOneRequiredOption(
cdcSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
Expand Down Expand Up @@ -63,10 +62,12 @@ public class MySqlActionUtils {
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(true)
.defaultValue(false)
.withDescription(
"Whether capture the scan the newly added tables or not, by default is true.");

public static final String PROPERTIES_PREFIX = "jdbc.properties.";

static Connection getConnection(Configuration mySqlConfig, Map<String, String> jdbcProperties)
throws Exception {
String paramString = "";
Expand Down Expand Up @@ -239,14 +240,10 @@ private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Map<String, String> jdbcProperties =
mySqlConfig.toMap().entrySet().stream()
.filter(e -> e.getKey().startsWith(JdbcUrlUtils.PROPERTIES_PREFIX))
.filter(e -> e.getKey().startsWith(PROPERTIES_PREFIX))
.collect(
Collectors.toMap(
e ->
e.getKey()
.substring(
JdbcUrlUtils.PROPERTIES_PREFIX
.length()),
e -> e.getKey().substring(PROPERTIES_PREFIX.length()),
Map.Entry::getValue));

if (typeMapping.containsMode(TINYINT1_NOT_BOOL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
Expand Down Expand Up @@ -57,7 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
Expand Down Expand Up @@ -162,12 +163,26 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a stream.");

// lower versions of pulsar connector need this option
static final ConfigOption<String> PULSAR_ADMIN_URL =
ConfigOptions.key("pulsar.admin.adminUrl")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
code("http://my-broker.example.com:8080"),
code("https://my-broker.example.com:8443"))
.build());

public static PulsarSource<String> buildPulsarSource(Configuration pulsarConfig) {
PulsarSourceBuilder<String> pulsarSourceBuilder = PulsarSource.builder();

// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
// to be compatible with lower versions
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(new SimpleStringSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -65,8 +64,8 @@ public void testTinyInt1NotBool() throws Exception {
mySqlConfig.put("database-name", "tinyint1_not_bool_test");

// test tinyInt1isBit compatibility and url building
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "useSSL", "false");
mySqlConfig.put(MySqlActionUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(MySqlActionUtils.PROPERTIES_PREFIX + "useSSL", "false");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down Expand Up @@ -124,7 +123,7 @@ public void testTinyInt1NotBool() throws Exception {
public void testConflictTinyInt1NotBool() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "tinyint1_not_bool_test");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "true");
mySqlConfig.put(MySqlActionUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "true");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
Expand Down Expand Up @@ -147,7 +146,6 @@ public void after() throws Exception {
protected Map<String, String> getBasicPulsarConfig() {
Map<String, String> config = new HashMap<>();
config.put(PULSAR_SERVICE_URL.key(), PULSAR_CONTAINER.getPulsarBrokerUrl());
config.put(PULSAR_ADMIN_URL.key(), PULSAR_CONTAINER.getHttpServiceUrl());
config.put(PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -119,6 +120,11 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
return new SinkRuntimeProviderContext(isBounded())
.createDataStructureConverter(producedDataType);
}

@Override
public Optional<int[][]> getTargetColumns() {
return Optional.empty();
}
};

public static KafkaLogStoreFactory discoverKafkaLogFactory() {
Expand Down

0 comments on commit cc75ec6

Please sign in to comment.