forked from permitio/fastapi_websocket_rpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
binary_rpc_test.py
113 lines (90 loc) · 3.32 KB
/
binary_rpc_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
import os
import sys
import json
from multiprocessing import Process
import pytest
import uvicorn
from fastapi import FastAPI
from fastapi_websocket_rpc import WebSocketFrameType
from fastapi_websocket_rpc.logger import LoggingModes, logging_config, get_logger
from fastapi_websocket_rpc.rpc_methods import RpcUtilityMethods
from fastapi_websocket_rpc.simplewebsocket import SimpleWebSocket
from fastapi_websocket_rpc.utils import pydantic_serialize
from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient
from fastapi_websocket_rpc.websocket_rpc_endpoint import WebsocketRPCEndpoint
# Set debug logs (and direct all logs to UVICORN format)
logging_config.set_mode(LoggingModes.UVICORN, logging.DEBUG)
logger = get_logger(__name__)
# Configurable
PORT = int(os.environ.get("PORT") or "9000")
uri = f"ws://localhost:{PORT}/ws"
class BinarySerializingWebSocket(SimpleWebSocket):
def __init__(self, websocket: SimpleWebSocket):
self._websocket = websocket
async def connect(self, uri: str, **connect_kwargs):
await self._websocket.connect(uri, **connect_kwargs)
def _serialize(self, msg):
return pydantic_serialize(msg).encode()
def _deserialize(self, buffer):
return json.loads(buffer.decode())
async def send(self, msg):
await self._websocket.send(self._serialize(msg))
async def recv(self):
msg = await self._websocket.recv()
return self._deserialize(msg)
async def close(self, code: int = 1000):
await self._websocket.close(code)
def setup_server():
app = FastAPI()
endpoint = WebsocketRPCEndpoint(
RpcUtilityMethods(),
frame_type=WebSocketFrameType.Binary,
serializing_socket_cls=BinarySerializingWebSocket,
)
endpoint.register_route(app)
uvicorn.run(app, port=PORT)
@pytest.fixture(scope="module")
def server():
# Run the server as a separate process
proc = Process(target=setup_server, args=(), daemon=True)
proc.start()
yield proc
proc.kill() # Cleanup after test
@pytest.mark.asyncio
async def test_echo(server):
"""
Test basic RPC with a simple echo
"""
logger.debug("before test_echo")
async with WebSocketRpcClient(
uri,
RpcUtilityMethods(),
default_response_timeout=4,
serializing_socket_cls=BinarySerializingWebSocket,
) as client:
logger.debug("Initialized WebSocketRpcClient")
text = "Hello World!"
logger.debug("Waiting for response...")
response = await client.other.echo(text=text)
logger.debug("Response: %s", str(response))
assert response.result == text
@pytest.mark.asyncio
async def test_structured_response(server):
"""
Test RPC with structured (pydantic model) data response
Using process details as example data
"""
async with WebSocketRpcClient(
uri,
RpcUtilityMethods(),
default_response_timeout=4,
serializing_socket_cls=BinarySerializingWebSocket,
) as client:
utils = RpcUtilityMethods()
ourProcess = await utils.get_process_details()
response = await client.other.get_process_details()
# We got a valid process id
assert isinstance(response.result["pid"], int)
# We have all the details form the other process
assert "cmd" in response.result