Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Firehose Client: block on make message handler call, add on error callback #157

Merged
merged 11 commits into from
Oct 27, 2023

Conversation

DXsmiley
Copy link
Contributor

The original _AsyncWebsocketClient._process_message_frame spawns a new task and then returns immediately, allowing the client to enter a very fast client.recv() -> create_task loop.

    def _process_message_frame(self, frame: 'MessageFrame') -> None:
        task: asyncio.Task = self._loop.create_task(self._on_message_callback(frame))
        self._on_message_tasks.add(task)
        task.add_done_callback(self._on_message_callback_done)

This isn't a problem when consuming the head of the firehose in real-time, however when replaying past events, if message processing is at all non-trivial, it causes tasks to be spawned far faster than they can be completed. A glut of in-flight tasks creates a huge amount of scheduling overhead, and a high amount of contention on any shared resources such as databases, or internal locks. There's no way to slow down the frequency of client.recv() so you end up with a memory-leak-esque situation.

I think the original behaviour is unintuitive and quite an unexpected pit-fall, so I've replaced it with a normal (async) callback. The client waits for the callback to finish before receiving the next message. This means there's no "concurrency" out of the box, but provides a much more predictable interface for people to build their own systems on top of; I think it's valuable that the websocket client is as unopinionated about the message processing side of things as possible.

For example, the callback I'm currently using in my own code is simply this:

messages_to_process: 'asyncio.Queue[MessageFrame]' = asyncio.Queue(maxsize=20)

async def on_message_handler(message: 'MessageFrame') -> None:
    await messages_to_process.put(message)

messages_to_process.put blocks while the queue is full, which means that client.recv() is not called faster we can handle, and I'm able to easily configure whatever concurrency I want on the queue-consumer side. This still allows for message receiving and processing to occur simultaneously, but prevents the former from out-pacing the latter.

I've also made some changes to _WebsocketClient and _WebsocketClientBase to account for the differences between _WebsocketClient and _AsyncWebsocketClient.

@MarshalX
Copy link
Owner

Thank you! I'll take a look as soon as done with my main job

Copy link
Owner

@MarshalX MarshalX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls make sure that the difference between async and sync version _process_message_frame method only in "await" statements. for now, it is not. and probably we don't need _print_exception anymore? because now we are in the catch block

@MarshalX
Copy link
Owner

MarshalX commented Sep 21, 2023

could we move this part to the base class?

    def _process_raw_frame(self, data: bytes) -> None:
        frame = Frame.from_bytes(data)
        if isinstance(frame, ErrorFrame):
            raise FirehoseError(XrpcError(frame.body.error, frame.body.message))
        if isinstance(frame, MessageFrame):
            self._process_message_frame(frame)
        else:
            raise FirehoseDecodingError('Unknown frame type')

idk smth like this:

    def _pre_process_raw_frame(self, data: bytes) -> Frame:   # in base
        frame = Frame.from_bytes(data)
        if isinstance(frame, ErrorFrame):
            raise FirehoseError(XrpcError(frame.body.error, frame.body.message))
        if not isinstance(frame, MessageFrame):
            raise FirehoseDecodingError('Unknown frame type')

        return frame
    
    def _process_raw_frame(self, data: bytes) -> None:  # in sync
        frame = self._pre_process_raw_frame(data)
        self._process_message_frame(frame)

    async def _process_raw_frame(self, data: bytes) -> None:  # in async
        frame = self._pre_process_raw_frame(data)
        await self._process_message_frame(frame)

@MarshalX
Copy link
Owner

btw dotn we want to have async version of _on_callback_error_callback in async client?

Copy link
Owner

@MarshalX MarshalX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the use of wait_for :)

atproto/firehose/client.py Outdated Show resolved Hide resolved
atproto/firehose/client.py Show resolved Hide resolved
atproto/firehose/client.py Show resolved Hide resolved
atproto/firehose/client.py Show resolved Hide resolved
atproto/firehose/client.py Outdated Show resolved Hide resolved
atproto/firehose/client.py Outdated Show resolved Hide resolved
atproto/firehose/client.py Outdated Show resolved Hide resolved
@DXsmiley
Copy link
Contributor Author

I've allowed the stop event to interrupt the client.recv but it's uh... the code's not great I'm gonna be honest.

@MarshalX
Copy link
Owner

MarshalX commented Sep 28, 2023

uh..

getting towards scope-creep for this PR.

let's not include wait_for tricks in this pr at all. as i understood that will not be ported to sync client. let's merge callback improvements

@DXsmiley
Copy link
Contributor Author

DXsmiley commented Oct 2, 2023

Alright, I've rolled back a few changes and cleaned things up a bit.

@MarshalX MarshalX changed the title Rework _AsyncWebsocketClient Async Firehose Client: block on make message handler call, add on error callback Oct 27, 2023
@MarshalX MarshalX merged commit 24a19d7 into MarshalX:main Oct 27, 2023
6 checks passed
@MarshalX
Copy link
Owner

@DXsmiley thank you so much for your contribution! and sorry for a long time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants