Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer callback is unable to utilise python raise to propagate errors #184

Closed
Samreay opened this issue Dec 27, 2023 · 14 comments
Closed

Comments

@Samreay
Copy link

Samreay commented Dec 27, 2023

To reproduce:

  1. Start a pulsar standalone instance, via
docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone
  1. Paste the following code into a file:
import time

import pulsar


def callback(result: pulsar.Result, message_id: pulsar.MessageId):
    if result == pulsar.Result.Timeout:
        raise ValueError()


client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer("topic-example", send_timeout_millis=1)
producer.send_async(b"hello", callback)
time.sleep(1)
  1. Run the code. You should see something akin to:
...
2023-12-27 14:23:19.550 INFO  [139722145527360] ProducerImpl:209 | [persistent://public/default/topic-example, ] Created producer on broker [127.0.0.1:56868 -> 127.0.0.1:6650] 
terminate called after throwing an instance of 'pybind11::error_already_set'
  what():  ValueError: <EMPTY MESSAGE>

At:
  /home/sam/arenko/service-utils/tmp.py(8): callback

[1]    53944 IOT instruction  /home/sam/arenko/service-utils/.venv/bin/python 

Something about pybind and the C++ code means that errors cannot be raised in the callback function, which is obviously an issue for propagating errors up the python callstack. This might be because the callback function exists independently of the async event loop that it should be using, and so there's nowhere for the error to go. A try, except clause around everything won't work either, because the terminate there is a forceful termination, the entire app is dead in an instant.

Expected behaviour: The callback function acts as a standard python function with normal exception handling.

@BewareMyPower
Copy link
Contributor

You should not raise an exception in the callback because the callback is called in an internal thread.

Expected behaviour: The callback function acts as a standard python function with normal exception handling.

If it's allowed to raise an exception in the callback, where will you expect to handle this exception?

@Samreay
Copy link
Author

Samreay commented Dec 27, 2023

Can this be documented at the very least then? Having a hidden "do this and your app will terminate" with no warning isn't ideal for an end user :) Additionally, doco on pulsar.Result types would be great, I struggled to handle this.

If you mean in terms of what should you see in the stack if the callback raises, I'd expect the send_async to be the root cause with the callback underneath it. If you mean in what execution context should a python async callback raise exceptions, that would be the event loop in general.

The goal, IMHO, is to get the async producer send as close to pythons standard library async as possible. Pythonically, this means you'd have something like:

try:
    result = await producer.send_async(b"hello")
except:
   catch issues here

For a more complete example, this is my current workaround (well, a simplified version of it), which uses janus Queues (ie queues with sync and async interfaces) to convert the callback function into actual python async:

import asyncio
from collections.abc import Awaitable

import janus
import pulsar


async def better_send(producer: pulsar.Producer, data: bytes) -> Awaitable[pulsar.MessageId]:
    queue = janus.Queue()

    def callback(result: pulsar.Result, message_id: pulsar.MessageId):
        queue.sync_q.put((result, message_id))

    producer.send_async(data, callback)
    result, message_id = await queue.async_q.get()
    if result == pulsar.Result.Timeout:
        raise TimeoutError()
    elif result != pulsar.Result.Ok:
        raise ValueError(str(result))
    return message_id


async def main():
    client = pulsar.Client("pulsar://localhost:6650")
    producer = client.create_producer("topic-example", send_timeout_millis=1)
    await better_send(producer, b"hello")


if __name__ == "__main__":
    asyncio.run(main())

Running this gives you a pythonic exception which doesn't crash the application, and has proper asyncio support.

Traceback (most recent call last):
  File "/home/sam/arenko/service-utils/tmp.py", line 30, in <module>
    asyncio.run(main())
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/sam/arenko/service-utils/tmp.py", line 26, in main
    await better_send(producer, b"hello")
  File "/home/sam/arenko/service-utils/tmp.py", line 17, in better_send
    raise TimeoutError()
TimeoutError

@BewareMyPower
Copy link
Contributor

I got your point now. Unfortunately, the Pulsar Python client is not well integrated with Python coroutines. It's a callback-based solution rather than coroutine. As you can see, the send_async method is not a method that can be await-ed.

Therefore, you need a traditional way to handle it just like your workaround that queues the results.

BTW, I just found you have already checked the similar discussion: #55.

Can this be documented at the very least then?

Sure.

Additionally, doco on pulsar.Result types would be great

I agree. I opened another issue for it: #185

@Samreay
Copy link
Author

Samreay commented Dec 27, 2023

Yeah its a thorny issue here. There is the option to add async support similar to how I've done it in the official client...

But ultimately, I worry this would just cause even more issues with communication with the underlying C++ client, and this probably brings the discussion back to "Should the python client be rewritten (but a vague 'someone') in python to use standard language features?"

@BewareMyPower
Copy link
Contributor

this would just cause even more issues with communication with the underlying C++ client

The root cause is that the C++ client (or the callback-based solution) does not provide a way for users to specify which thread to run the callback. What's worse, which thread calls the callback is a black box to users. Here is a similar issue I found: apache/pulsar-client-cpp#368

The Python wrapper does not do some complicated work. Ideally, it should avoid user-provided callbacks being passed to the underlying C++ APIs directly, just like what the Node.js wrapper did.

@merlimat
Copy link
Contributor

@Samreay The question is also for which reason you are using send_async() instead of the plain send.

While the send is blocking, it won't be holding the GIL, so other Python threads can make progress.

@Samreay
Copy link
Author

Samreay commented Dec 27, 2023

The workaround I have is more advanced than the one I posted above, and has proper concurrency when async sending multiple messages (unlike my example which sends one message at a time), and by having an awaitable task I can choose to wait for its completion via await or to just throw it into the event loop via async.create_task(better_send(...)).

BewareMyPower added a commit to BewareMyPower/pulsar-client-python that referenced this issue Dec 28, 2023
See apache#184

It should be documented that the callback should not throw any exception
when passed to `send_async`.
BewareMyPower added a commit that referenced this issue Dec 28, 2023
See #184

It should be documented that the callback should not throw any exception
when passed to `send_async`.
@Samreay
Copy link
Author

Samreay commented Dec 28, 2023

A bit more context:

Right now, if I have 100 messages to send and want actual error handling to exist, I either use my workaround or have to spend most of my time blocked using send because it seems the python client doesnt support batch sending by producers outside of send_async. I would be very happy to be wrong on this though and being able to sync send a batch of messages would be great!

I have a batch consume on the same topic I want to produce in with another service, but using send_sync and each message being serial means instead of getting a single nice batch consumption on the end, I get numerous consumptions

@BewareMyPower
Copy link
Contributor

Did you mean the batch receive? Currently there is no good way in Pulsar to receive a batch as a whole. There is a batchReceive API in Java client and C++ client but both of them just receive message one by one and group them into an iterable batch.

It's because the raw format of a batch in Pulsar is not a trivial concatenation of message buffers. The MessageMetadata and SingleMessageMetadata protobuf structures are involved.

@Samreay
Copy link
Author

Samreay commented Dec 28, 2023

So one of the goals for getting async working properly for my use case was that is seems the python producer can do batching but only with send_async and not send.

As an aside: it'd be great to know the batch send messages and bytes defaults and whether this should be set to 0 or -1 to be unlimited and just batch off elapsed time, the doco for all those kwargs just says "Undocumented" https://pulsar.apache.org/api/python/3.3.x/pulsar.Client.html#create_producer

So every half an hour, we have a clearing action on a market, and we get a sudden and instantanous flood of messages from a third party. We turn these into pulsar messages, and want to batch send them out. Similarly, we want to batch read those messages in another service. Right now we dont use any of the batching functionality in the python consumer, because we need the interface between consumer and readers to be identical, and batching doesnt seem to be part of the reader implementation, just the consumer.

This means our pulsar wrapper just consumes messages one by one, with a little timeout after which all the messages its consumed get passed back to the function allocated to process the messages.

image

This all is just flavouring to answer that initial question of "Why not just use send instead of send_async?", and to boil the answer back its because send blocks and if I get 10k messages at once, sending them one by one isn't ideal, and sending them via send_async without ability to error handle is also not ideal, hence the current workaround. Hope that helps explain the use cases :)

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Dec 28, 2023

it'd be great to know the batch send messages and bytes defaults and whether this should be set to 0 or -1 to be unlimited and just batch off elapsed time

Yeah these documents are missed, we should add them. For now, you can refer the C++ client documents here for all setBatchXXX methods. In short, 0 means no limit. (I'm not sure about the behavior of setting with -1 because of the signed and unsigned issue)

Here is a related issue: #187


BTW, backing to the issue title. sing a callback just forces users to pass the errors in another thread to process, which is generally implemented via a concurrent queue.

Regarding to your concern before:

But ultimately, I worry this would just cause even more issues with communication with the underlying C++ client

I think it's okay to add such APIs just like your workaround. The underlying C++ client calls the send callback (converted from a Python function via Pybind11) in Boost.Asio's event loop, which just works like asyncio's event loop.

My brainstorm:

producer = client.create_producer(topic)
coros = list()
for i in range(10):
    try:
        coros.append(producer.send_async("msg"))
    except pulsar.QueueIsFull as e:
        break
try:
    msg_ids = await asyncio.gather(*coros) # or wait here
except pulsar.PulsarException as e:
    # ...
except Exception as e:
    # ...

@BewareMyPower
Copy link
Contributor

FYI, I'm going to support async-await style APIs, see #55 (comment)

@merlimat
Copy link
Contributor

merlimat commented Dec 29, 2023

@Samreay
The way to achieve what you're describing with the current client is like:

producer = client.create_producer(topic, block_if_queue_full=True)

for i in range(100):
    producer.send_async("msg")

producer.flush()

flush will throw exception if any of the send async operations fail

@Samreay
Copy link
Author

Samreay commented Jan 2, 2024

Thanks @merlimat, I didn't realise flush would raise errors like that - very handy and I'll add this into my code to try and increase its robustness :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants