Skip to content

Commit

Permalink
[cdc] Improve pulsar-cdc (apache#2535)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Dec 20, 2023
1 parent 998e5b1 commit 8887df6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 42 deletions.
91 changes: 91 additions & 0 deletions docs/content/cdc-ingestion/pulsar-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,94 @@ Synchronization from multiple Pulsar topics to Paimon database.
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
```
## Additional pulsar_config
There are some useful options to build Flink Pulsar Source, but they are not provided by flink-pulsar-connector document. They are:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left">Key</th>
<th class="text-left">Default</th>
<th class="text-left">Type</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>value.format</td>
<td>(none)</td>
<td>String</td>
<td>Defines the format identifier for encoding value data.</td>
</tr>
<tr>
<td>topic</td>
<td>(none)</td>
<td>String</td>
<td>Topic name(s) from which the data is read. It also supports topic list by separating topic by semicolon
like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified.
</td>
</tr>
<tr>
<td>pulsar.startCursor.fromMessageId</td>
<td>EARLIEST</td>
<td>Sting</td>
<td>Using a unique identifier of a single message to seek the start position. The common format is a triple
'&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'. Specially, you can set it to
EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).
</td>
</tr>
<tr>
<td>pulsar.startCursor.fromPublishTime</td>
<td>(none)</td>
<td>Long</td>
<td>Using the message publish time to seek the start position.</td>
</tr>
<tr>
<td>pulsar.startCursor.fromMessageIdInclusive</td>
<td>true</td>
<td>Boolean</td>
<td>Whether to include the given message id. This option only works when the message id is not EARLIEST or LATEST.</td>
</tr>
<tr>
<td>pulsar.stopCursor.atMessageId</td>
<td>(none)</td>
<td>String</td>
<td>Stop consuming when the message id is equal or greater than the specified message id. Message that is equal
to the specified message id will not be consumed. The common format is a triple '&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'.
Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).
<tr>
<td>pulsar.stopCursor.afterMessageId</td>
<td>(none)</td>
<td>String</td>
<td>Stop consuming when the message id is greater than the specified message id. Message that is equal to the
specified message id will be consumed. The common format is a triple '&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'.
Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).
</td>
</tr>
<tr>
<td>pulsar.stopCursor.atEventTime</td>
<td>(none)</td>
<td>Long</td>
<td>Stop consuming when message event time is greater than or equals the specified timestamp.
Message that even time is equal to the specified timestamp will not be consumed.
</td>
</tr>
<tr>
<td>pulsar.stopCursor.afterEventTime</td>
<td>(none)</td>
<td>Long</td>
<td>Stop consuming when message event time is greater than the specified timestamp.
Message that even time is equal to the specified timestamp will be consumed.
</td>
</tr>
<tr>
<td>pulsar.source.unbounded</td>
<td>true</td>
<td>Boolean</td>
<td>To specify the boundedness of a stream.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
Expand All @@ -44,23 +43,20 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.internal.DefaultImplementation;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;

Expand All @@ -73,19 +69,15 @@ public class PulsarActionUtils {
.noDefaultValue()
.withDescription("Defines the format identifier for encoding value data.");

public static final ConfigOption<String> TOPIC =
public static final ConfigOption<List<String>> TOPIC =
ConfigOptions.key("topic")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. "
+ "Option 'topic' is required for sink.");

static final ConfigOption<String> PULSAR_AUTH_PARAM_MAP =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
.stringType()
.noDefaultValue()
.withDescription("Parameters for the authentication plugin.");
"Topic name(s) from which the data is read. It also supports topic list by separating topic "
+ "by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" "
+ "can be specified.");

static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID =
ConfigOptions.key("pulsar.startCursor.fromMessageId")
Expand Down Expand Up @@ -151,20 +143,15 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a stream.");

public static PulsarSource<String> buildPulsarSource(Configuration rawConfig) {
Configuration pulsarConfig = preprocessPulsarConfig(rawConfig);

public static PulsarSource<String> buildPulsarSource(Configuration pulsarConfig) {
PulsarSourceBuilder<String> pulsarSourceBuilder = PulsarSource.builder();

// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setTopics(
Arrays.stream(pulsarConfig.get(TOPIC).split(","))
.map(String::trim)
.collect(Collectors.toList()))
.setTopics(pulsarConfig.get(TOPIC))
.setDeserializationSchema(new SimpleStringSchema());

// other settings
Expand Down Expand Up @@ -231,8 +218,7 @@ public static PulsarSource<String> buildPulsarSource(Configuration rawConfig) {
String authPluginClassName = pulsarConfig.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
if (authPluginClassName != null) {
String authParamsString = pulsarConfig.get(PULSAR_AUTH_PARAMS);
Map<String, String> authParamsMap =
pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
Map<String, String> authParamsMap = pulsarConfig.get(PULSAR_AUTH_PARAM_MAP);

checkArgument(
authParamsString != null || authParamsMap != null,
Expand Down Expand Up @@ -278,21 +264,6 @@ private static MessageId toMessageId(String messageIdString) {
}
}

static SourceConfiguration toSourceConfiguration(Configuration rawConfig) {
return new SourceConfiguration(preprocessPulsarConfig(rawConfig));
}

private static Configuration preprocessPulsarConfig(Configuration rawConfig) {
Configuration cloned = new Configuration(rawConfig);
if (cloned.contains(PULSAR_AUTH_PARAM_MAP)) {
Map<String, String> authParamsMap =
parseCommaSeparatedKeyValues(cloned.get(PULSAR_AUTH_PARAM_MAP));
cloned.removeConfig(PULSAR_AUTH_PARAM_MAP);
cloned.set(PulsarOptions.PULSAR_AUTH_PARAM_MAP, authParamsMap);
}
return cloned;
}

public static DataFormat getDataFormat(Configuration pulsarConfig) {
return DataFormat.fromConfigString(pulsarConfig.get(VALUE_FORMAT));
}
Expand All @@ -301,7 +272,7 @@ public static DataFormat getDataFormat(Configuration pulsarConfig) {
public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
Configuration pulsarConfig) {
try {
SourceConfiguration pulsarSourceConfiguration = toSourceConfiguration(pulsarConfig);
SourceConfiguration pulsarSourceConfiguration = new SourceConfiguration(pulsarConfig);
PulsarClient pulsarClient = PulsarClientFactory.createClient(pulsarSourceConfiguration);

ConsumerBuilder<String> consumerBuilder =
Expand All @@ -313,7 +284,7 @@ public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
// The default position is Latest
consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);

String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).split(",")[0];
String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).get(0);
TopicPartition topicPartition = new TopicPartition(topic);
consumerBuilder.topic(topicPartition.getFullTopicName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testPulsarSchema() throws Exception {
sendMessages(topic, messages);

Configuration pulsarConfig = Configuration.fromMap(getBasicPulsarConfig());
pulsarConfig.set(TOPIC, topic);
pulsarConfig.setString(TOPIC.key(), topic);
pulsarConfig.set(VALUE_FORMAT, "canal-json");

Schema pulsarSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
Map<String, String> pulsarConfig = getBasicPulsarConfig();
pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
pulsarConfig.put(TOPIC.key(), String.join(",", topics));
pulsarConfig.put(TOPIC.key(), String.join(";", topics));
PulsarSyncDatabaseAction action =
syncDatabaseActionBuilder(pulsarConfig)
.withTableConfig(getBasicTableConfig())
Expand Down

0 comments on commit 8887df6

Please sign in to comment.