Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite sync Assembler to improve performance. #1530

Merged
merged 1 commit into from
Oct 25, 2024

Conversation

aaugustin
Copy link
Member

Previously, a latch was used to synchronize the user thread reading messages and the background thread reading from the network. This required two thread switches per message.

Now, the background thread writes messages to queue, from which the user thread reads. This allows passing several frames at each thread switch, reducing the overhead.

With this server code::

async def test(websocket):
    for i in range(int(await websocket.recv())):
        await websocket.send(f"{{\"iteration\": {i}}}")

and this client code::

with connect("ws://localhost:8765", compression=None) as websocket:
    websocket.send("1_000_000")
    for message in websocket:
        pass

an unscientific benchmark (running it on my laptop) shows a 2.5x speedup, going from 11 seconds to 4.4 seconds. Setting a very large recv_bufsize and max_size doesn't yield significant further improvement.

The new implementation mirrors the asyncio implementation and gains the option to prevent or force decoding of frames. Refs #1376.

Previously, a latch was used to synchronize the user thread reading messages and
the background thread reading from the network. This required two thread switches
per message.

Now, the background thread writes messages to queue, from which the user thread
reads. This allows passing several frames at each thread switch, reducing the
overhead.

With this server code:

    async def test(websocket):
        for i in range(int(await websocket.recv())):
            await websocket.send(f"{{\"iteration\": {i}}}")

    async with serve(test, "localhost", 8765) as server:
        await server.serve_forever()

and this client code:

    with connect("ws://localhost:8765", compression=None) as websocket:
        websocket.send("1_000_000")
        for message in websocket:
            pass

an unscientific benchmark (running it on my laptop) shows a 2.5x speedup,
going from 11 seconds to 4.4 seconds. Setting a very large recv_bufsize
and max_size doesn't yield significant further improvement.

Flow control was tested by inserting debug logs in maybe_pause/resume()
and by measuring the wait for the recv_flow_control lock. It showed the
expected behavior of pausing and unpausing coupled with some wait time.

The new implementation mirrors the asyncio implementation and gains the
option to prevent or force decoding of frames.

Fix #1376 for the threading implementation.
@aaugustin aaugustin force-pushed the rewrite-sync-assembler branch from fa78d82 to 1387c97 Compare October 25, 2024 12:05
@aaugustin aaugustin merged commit 1387c97 into main Oct 25, 2024
7 checks passed
@aaugustin aaugustin deleted the rewrite-sync-assembler branch October 25, 2024 12:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant