diff --git a/docs/content/cdc-ingestion/pulsar-cdc.md b/docs/content/cdc-ingestion/pulsar-cdc.md index b65d68e406c8..b8915135868d 100644 --- a/docs/content/cdc-ingestion/pulsar-cdc.md +++ b/docs/content/cdc-ingestion/pulsar-cdc.md @@ -228,3 +228,94 @@ Synchronization from multiple Pulsar topics to Paimon database. --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` + +## Additional pulsar_config + +There are some useful options to build Flink Pulsar Source, but they are not provided by flink-pulsar-connector document. They are: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDefaultTypeDescription
value.format(none)StringDefines the format identifier for encoding value data.
topic(none)StringTopic name(s) from which the data is read. It also supports topic list by separating topic by semicolon + like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified. +
pulsar.startCursor.fromMessageIdEARLIESTStingUsing a unique identifier of a single message to seek the start position. The common format is a triple + '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to + EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1). +
pulsar.startCursor.fromPublishTime(none)LongUsing the message publish time to seek the start position.
pulsar.startCursor.fromMessageIdInclusivetrueBooleanWhether to include the given message id. This option only works when the message id is not EARLIEST or LATEST.
pulsar.stopCursor.atMessageId(none)StringStop consuming when the message id is equal or greater than the specified message id. Message that is equal + to the specified message id will not be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. + Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1). +
pulsar.stopCursor.afterMessageId(none)StringStop consuming when the message id is greater than the specified message id. Message that is equal to the + specified message id will be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. + Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1). +
pulsar.stopCursor.atEventTime(none)LongStop consuming when message event time is greater than or equals the specified timestamp. + Message that even time is equal to the specified timestamp will not be consumed. +
pulsar.stopCursor.afterEventTime(none)LongStop consuming when message event time is greater than the specified timestamp. + Message that even time is equal to the specified timestamp will be consumed. +
pulsar.source.unboundedtrueBooleanTo specify the boundedness of a stream.
+ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java index 11e979c8b864..0f8a17625824 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory; -import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.source.PulsarSource; import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; @@ -44,23 +43,20 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.internal.DefaultImplementation; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder; import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges; -import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange; @@ -73,19 +69,15 @@ public class PulsarActionUtils { .noDefaultValue() .withDescription("Defines the format identifier for encoding value data."); - public static final ConfigOption TOPIC = + public static final ConfigOption> TOPIC = ConfigOptions.key("topic") .stringType() + .asList() .noDefaultValue() .withDescription( - "Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. " - + "Option 'topic' is required for sink."); - - static final ConfigOption PULSAR_AUTH_PARAM_MAP = - ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap") - .stringType() - .noDefaultValue() - .withDescription("Parameters for the authentication plugin."); + "Topic name(s) from which the data is read. It also supports topic list by separating topic " + + "by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" " + + "can be specified."); static final ConfigOption PULSAR_START_CURSOR_FROM_MESSAGE_ID = ConfigOptions.key("pulsar.startCursor.fromMessageId") @@ -151,9 +143,7 @@ public class PulsarActionUtils { .defaultValue(true) .withDescription("To specify the boundedness of a stream."); - public static PulsarSource buildPulsarSource(Configuration rawConfig) { - Configuration pulsarConfig = preprocessPulsarConfig(rawConfig); - + public static PulsarSource buildPulsarSource(Configuration pulsarConfig) { PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder(); // the minimum setup @@ -161,10 +151,7 @@ public static PulsarSource buildPulsarSource(Configuration rawConfig) { .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL)) .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL)) .setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME)) - .setTopics( - Arrays.stream(pulsarConfig.get(TOPIC).split(",")) - .map(String::trim) - .collect(Collectors.toList())) + .setTopics(pulsarConfig.get(TOPIC)) .setDeserializationSchema(new SimpleStringSchema()); // other settings @@ -231,8 +218,7 @@ public static PulsarSource buildPulsarSource(Configuration rawConfig) { String authPluginClassName = pulsarConfig.get(PULSAR_AUTH_PLUGIN_CLASS_NAME); if (authPluginClassName != null) { String authParamsString = pulsarConfig.get(PULSAR_AUTH_PARAMS); - Map authParamsMap = - pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP); + Map authParamsMap = pulsarConfig.get(PULSAR_AUTH_PARAM_MAP); checkArgument( authParamsString != null || authParamsMap != null, @@ -278,21 +264,6 @@ private static MessageId toMessageId(String messageIdString) { } } - static SourceConfiguration toSourceConfiguration(Configuration rawConfig) { - return new SourceConfiguration(preprocessPulsarConfig(rawConfig)); - } - - private static Configuration preprocessPulsarConfig(Configuration rawConfig) { - Configuration cloned = new Configuration(rawConfig); - if (cloned.contains(PULSAR_AUTH_PARAM_MAP)) { - Map authParamsMap = - parseCommaSeparatedKeyValues(cloned.get(PULSAR_AUTH_PARAM_MAP)); - cloned.removeConfig(PULSAR_AUTH_PARAM_MAP); - cloned.set(PulsarOptions.PULSAR_AUTH_PARAM_MAP, authParamsMap); - } - return cloned; - } - public static DataFormat getDataFormat(Configuration pulsarConfig) { return DataFormat.fromConfigString(pulsarConfig.get(VALUE_FORMAT)); } @@ -301,7 +272,7 @@ public static DataFormat getDataFormat(Configuration pulsarConfig) { public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer( Configuration pulsarConfig) { try { - SourceConfiguration pulsarSourceConfiguration = toSourceConfiguration(pulsarConfig); + SourceConfiguration pulsarSourceConfiguration = new SourceConfiguration(pulsarConfig); PulsarClient pulsarClient = PulsarClientFactory.createClient(pulsarSourceConfiguration); ConsumerBuilder consumerBuilder = @@ -313,7 +284,7 @@ public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer( // The default position is Latest consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); - String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).split(",")[0]; + String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).get(0); TopicPartition topicPartition = new TopicPartition(topic); consumerBuilder.topic(topicPartition.getFullTopicName()); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java index 98dfea01a214..33d854fb8eb0 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java @@ -51,7 +51,7 @@ public void testPulsarSchema() throws Exception { sendMessages(topic, messages); Configuration pulsarConfig = Configuration.fromMap(getBasicPulsarConfig()); - pulsarConfig.set(TOPIC, topic); + pulsarConfig.setString(TOPIC.key(), topic); pulsarConfig.set(VALUE_FORMAT, "canal-json"); Schema pulsarSchema = diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java index 946a1f6e4b2e..dbd3db5d5ea7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java @@ -67,7 +67,7 @@ public void testSchemaEvolutionMultiTopic() throws Exception { Map pulsarConfig = getBasicPulsarConfig(); pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1"); pulsarConfig.put(VALUE_FORMAT.key(), "canal-json"); - pulsarConfig.put(TOPIC.key(), String.join(",", topics)); + pulsarConfig.put(TOPIC.key(), String.join(";", topics)); PulsarSyncDatabaseAction action = syncDatabaseActionBuilder(pulsarConfig) .withTableConfig(getBasicTableConfig())