You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This took me a little while to track down but im pretty sure its an actual bug. This seems simple enough to fix and I'd be willing to take on the PR to fix it. I've prototyped that change locally and its 1 line in the code and 1-2 lines in the unit test.
The Client class has a subscribe function. It has a parameter called pattern_auto_discovery_period
This value is unused and the value will always default to 60 seconds (because that is what is in the cpp files).
Here is a POC that shows the problem
import pulsar
import multiprocessing as M
import time
import re
import _pulsar
mode = "non-persistent" # ?? nothing received
TOPIC = f"{mode}://public/default/my-topic/whatever"
TOPIC2 = f"{mode}://public/default/my-topic/something"
TOPIC_WILD = f"{mode}://public/default/my-topic/.*"
def callback( consumer , msg ):
print(f"Got topic {msg.topic_name()} {msg.data()}\n", flush=True)
def main():
sub_mode = _pulsar.RegexSubscriptionMode.NonPersistentOnly
time.sleep( 2 )
client = pulsar.Client('pulsar://localhost:6650')
# This topic will be found right away becuase i created it first!
producer = client.create_producer(TOPIC)
client.subscribe( re.compile(TOPIC_WILD) , subscription_name="subname111",message_listener=callback , regex_subscription_mode=sub_mode , pattern_auto_discovery_period=100 )
# This topic will not be found until 60 seconds into the run regardless of pattern_auto_discovery_period
producer2 = client.create_producer( TOPIC2 )
for k in range(0,120):
time.sleep(1 )
o = f"some data {k}"
if k % 2 == 0:
producer.send( o.encode() )
else:
producer2.send( o.encode() )
client.close()
if __name__ == "__main__":
main( )
Here is a screenshot of the pattern discovery happening at 60 seconds.
However, this test has a bug as well. Since all the producers are made before the subscribe call the topics are all found right away.
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3) ##### MOVE THIS after the subscribe to demonstrate auto-recovery!
consumer = client.subscribe(
re.compile(topics_pattern),
"my-pattern-consumer-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=10,
pattern_auto_discovery_period=1,
)
# wait enough time to trigger auto discovery
time.sleep(2)
The text was updated successfully, but these errors were encountered:
mikeawalker
changed the title
pattern_auto_discovery_period is pulsar.Client is not used.
pattern_auto_discovery_period in pulsar.Client is not used.
Dec 8, 2023
This took me a little while to track down but im pretty sure its an actual bug. This seems simple enough to fix and I'd be willing to take on the PR to fix it. I've prototyped that change locally and its 1 line in the code and 1-2 lines in the unit test.
The
Client
class has asubscribe
function. It has a parameter calledpattern_auto_discovery_period
This value is unused and the value will always default to 60 seconds (because that is what is in the cpp files).
Here is a POC that shows the problem
Here is a screenshot of the pattern discovery happening at 60 seconds.
There is a unit test that is supposed to be testing for this: test_topics_pattern_consumer
However, this test has a bug as well. Since all the producers are made before the subscribe call the topics are all found right away.
The text was updated successfully, but these errors were encountered: