Skip to content

Commit

Permalink
🎨 Speeds up sending messages to user with socketio (ITISFoundation#5331)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored Feb 14, 2024
1 parent 3c2ef61 commit 4206404
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Generator
from typing import Final

from aiohttp import web
Expand Down Expand Up @@ -28,8 +28,8 @@
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_PROGRESS_EVENT,
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT,
send_group_messages,
send_messages,
send_messages_to_group,
send_messages_to_user,
)
from ..wallets import api as wallets_api
from ._constants import APP_RABBITMQ_CONSUMERS_KEY
Expand Down Expand Up @@ -105,7 +105,7 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
else:
socket_message = _convert_to_node_progress_event(rabbit_message)
if socket_message:
await send_messages(app, rabbit_message.user_id, [socket_message])
await send_messages_to_user(app, rabbit_message.user_id, [socket_message])

return True

Expand All @@ -118,7 +118,7 @@ async def _log_message_parser(app: web.Application, data: bytes) -> bool:
"data": rabbit_message.dict(exclude={"user_id", "channel_name"}),
}
]
await send_messages(app, rabbit_message.user_id, socket_messages)
await send_messages_to_user(app, rabbit_message.user_id, socket_messages)
return True


Expand All @@ -134,7 +134,7 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool:
},
}
]
await send_messages(app, rabbit_message.user_id, socket_messages)
await send_messages_to_user(app, rabbit_message.user_id, socket_messages)
return True


Expand All @@ -153,9 +153,11 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b
wallet_groups = await wallets_api.list_wallet_groups_with_read_access_by_wallet(
app, wallet_id=rabbit_message.wallet_id
)
rooms_to_notify: list[GroupID] = [item.gid for item in wallet_groups]
rooms_to_notify: Generator[GroupID, None, None] = (
item.gid for item in wallet_groups
)
for room in rooms_to_notify:
await send_group_messages(app, room, socket_messages)
await send_messages_to_group(app, room, socket_messages)
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from models_library.users import UserID
from models_library.utils.fastapi_encoders import jsonable_encoder

from ..socketio.messages import send_messages
from ..socketio.messages import send_messages_to_user


async def notify_payment_completed(
Expand All @@ -28,7 +28,7 @@ async def notify_payment_completed(
"data": jsonable_encoder(payment, by_alias=True),
}
]
await send_messages(app, user_id, messages)
await send_messages_to_user(app, user_id, messages)


async def notify_payment_method_acked(
Expand All @@ -43,4 +43,4 @@ async def notify_payment_method_acked(
"data": jsonable_encoder(payment_method_transaction, by_alias=True),
}
]
await send_messages(app, user_id, messages)
await send_messages_to_user(app, user_id, messages)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import defaultdict
from contextlib import suppress
from pprint import pformat
from typing import Any, Final
from typing import Any, Final, Generator
from uuid import UUID, uuid4

from aiohttp import web
Expand Down Expand Up @@ -96,8 +96,8 @@
from ..socketio.messages import (
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_UPDATED_EVENT,
send_group_messages,
send_messages,
send_messages_to_group,
send_messages_to_user,
)
from ..storage import api as storage_api
from ..users.api import FullNameDict, get_user_fullname, get_user_role
Expand Down Expand Up @@ -1456,13 +1456,15 @@ async def notify_project_state_update(
]

if notify_only_user:
await send_messages(app, user_id=f"{notify_only_user}", messages=messages)
await send_messages_to_user(
app, user_id=f"{notify_only_user}", messages=messages
)
else:
rooms_to_notify: list[GroupID] = [
rooms_to_notify: Generator[GroupID, None, None] = (
gid for gid, rights in project["accessRights"].items() if rights["read"]
]
)
for room in rooms_to_notify:
await send_group_messages(app, room, messages)
await send_messages_to_group(app, room, messages)


async def notify_project_node_update(
Expand Down Expand Up @@ -1493,7 +1495,7 @@ async def notify_project_node_update(
]

for room in rooms_to_notify:
await send_group_messages(app, room, messages)
await send_messages_to_group(app, room, messages)


async def retrieve_and_notify_project_locked_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..products.api import Product, get_current_product
from ..resource_manager.user_sessions import managed_resource
from ._utils import EnvironDict, SocketID, get_socket_server, register_socketio_handler
from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_messages
from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_messages_to_user

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,7 +144,7 @@ async def connect(
"data": {"interval": _EMIT_INTERVAL_S},
}
]
await send_messages(
await send_messages_to_user(
app,
user_id,
heart_beat_messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from servicelib.utils import logged_gather
from socketio import AsyncServer

from ..resource_manager.user_sessions import managed_resource
from ._utils import get_socket_server

_logger = logging.getLogger(__name__)
Expand All @@ -33,32 +32,27 @@
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT: Final[str] = "walletOsparcCreditsUpdated"


async def send_messages(
async def send_messages_to_user(
app: Application, user_id: UserID, messages: Sequence[SocketMessageDict]
) -> None:
sio: AsyncServer = get_socket_server(app)

socket_ids: list[str] = []
with managed_resource(user_id, None, app) as user_session:
socket_ids = await user_session.find_socket_ids()

await logged_gather(
*(
sio.emit(
message["event_type"],
json_dumps(message["data"]),
room=SocketIORoomStr.from_socket_id(sid),
room=SocketIORoomStr.from_user_id(user_id=user_id),
)
for message in messages
for sid in socket_ids
),
reraise=False,
log=_logger,
max_concurrency=100,
)


async def send_group_messages(
async def send_messages_to_group(
app: Application, group_id: GroupID, messages: Sequence[SocketMessageDict]
) -> None:
sio: AsyncServer = get_socket_server(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async def test_log_workflow_only_receives_messages_if_subscribed(
"""
mocked_send_messages = mocker.patch(
"simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_messages",
"simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_messages_to_user",
autospec=True,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access

from typing import Iterator
from unittest.mock import AsyncMock

import pytest
Expand All @@ -19,19 +18,19 @@


@pytest.fixture
def mock_send_messages(mocker: MockerFixture) -> Iterator[dict]:
def mock_send_messages(mocker: MockerFixture) -> dict:
reference = {}

async def mock_send_message(*args) -> None:
reference["args"] = args

mocker.patch.object(
_rabbitmq_exclusive_queue_consumers,
"send_messages",
"send_messages_to_user",
side_effect=mock_send_message,
)

yield reference
return reference


@pytest.mark.parametrize(
Expand Down
5 changes: 1 addition & 4 deletions services/web/server/tests/unit/with_dbs/03/test_socketio.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def app_environment(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatc


@pytest.mark.skip(
reason="Pending https://github.com/ITISFoundation/osparc-simcore/issues/3387"
reason="Pending https://github.com/ITISFoundation/osparc-simcore/issues/5332"
)
@pytest.mark.parametrize("user_role", (UserRole.USER,))
async def test_socketio_session_client_to_server(
Expand All @@ -64,9 +64,6 @@ async def test_socketio_session_client_to_server(
user_role: UserRole,
mocker: MockerFixture,
):
#
mock = mocker.patch("simcore_service_webserver.socketio._handlers.managed_resource")
mock.__enter__.side_effect = mocker.MagicMock()

assert client.app
assert client.server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ async def test_one_time_payment_worfklow(
assert settings.PAYMENTS_FAKE_COMPLETION is False

send_message = mocker.patch(
"simcore_service_webserver.payments._socketio.send_messages", autospec=True
"simcore_service_webserver.payments._socketio.send_messages_to_user",
autospec=True,
)
mock_rut_add_credits_to_wallet = mocker.patch(
"simcore_service_webserver.payments._onetime_api.add_credits_to_wallet",
Expand Down Expand Up @@ -151,7 +152,8 @@ async def test_multiple_payments(
assert settings.PAYMENTS_FAKE_COMPLETION is False

send_message = mocker.patch(
"simcore_service_webserver.payments._socketio.send_messages", autospec=True
"simcore_service_webserver.payments._socketio.send_messages_to_user",
autospec=True,
)
mocker.patch(
"simcore_service_webserver.payments._onetime_api.add_credits_to_wallet",
Expand Down Expand Up @@ -247,7 +249,8 @@ async def test_complete_payment_errors(
):
assert client.app
send_message = mocker.patch(
"simcore_service_webserver.payments._socketio.send_messages", autospec=True
"simcore_service_webserver.payments._socketio.send_messages_to_user",
autospec=True,
)

wallet = logged_user_wallet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ async def test_payment_method_worfklow(
assert settings.PAYMENTS_FAKE_COMPLETION is False

send_message = mocker.patch(
"simcore_service_webserver.payments._socketio.send_messages", autospec=True
"simcore_service_webserver.payments._socketio.send_messages_to_user",
autospec=True,
)

wallet = logged_user_wallet
Expand Down Expand Up @@ -341,7 +342,8 @@ async def test_one_time_payment_with_payment_method(
assert client.app

send_message = mocker.patch(
"simcore_service_webserver.payments._socketio.send_messages", autospec=True
"simcore_service_webserver.payments._socketio.send_messages_to_user",
autospec=True,
)
mock_rut_add_credits_to_wallet = mocker.patch(
"simcore_service_webserver.payments._onetime_api.add_credits_to_wallet",
Expand Down

0 comments on commit 4206404

Please sign in to comment.