From 256c709c06950c83e203e6318a0bef5240efe06d Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Wed, 29 Nov 2023 18:15:06 +0000 Subject: [PATCH 01/25] Added websocket client handler, and websocket-client module --- fastapi_websocket_rpc/__init__.py | 2 + fastapi_websocket_rpc/simplewebsocket.py | 7 ++ fastapi_websocket_rpc/websocket_rpc_client.py | 89 ++++++++++++++++++- 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/fastapi_websocket_rpc/__init__.py b/fastapi_websocket_rpc/__init__.py index 08c7dba..74f5a7f 100644 --- a/fastapi_websocket_rpc/__init__.py +++ b/fastapi_websocket_rpc/__init__.py @@ -1,5 +1,7 @@ from .rpc_methods import RpcMethodsBase, RpcUtilityMethods from .websocket_rpc_client import WebSocketRpcClient +from .websocket_rpc_client import WebSocketClientHandler +from .websocket_rpc_client import WebSocketsClientHandler from .websocket_rpc_endpoint import WebsocketRPCEndpoint from .rpc_channel import RpcChannel from .schemas import WebSocketFrameType diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index ec62ee6..f8c6043 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -9,6 +9,10 @@ class SimpleWebSocket(ABC): Abstract base class for all websocket related wrappers. """ + @abstractmethod + def connect(self, uri: str, **connect_kwargs): + pass + @abstractmethod def send(self, msg): pass @@ -26,6 +30,9 @@ class JsonSerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): self._websocket = websocket + async def connect(self, uri: str, **connect_kwargs): + await self._websocket.connect(uri, **connect_kwargs) + def _serialize(self, msg): return pydantic_serialize(msg) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 2cbf472..39d676f 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -5,6 +5,7 @@ import tenacity from tenacity.retry import retry_if_exception +import websocket import websockets from websockets.exceptions import InvalidStatusCode, WebSocketException, ConnectionClosedError, ConnectionClosedOK @@ -16,6 +17,80 @@ logger = get_logger("RPC_CLIENT") +class WebSocketClientHandler(SimpleWebSocket): + """ + Handler that use https://websocket-client.readthedocs.io/en/latest module. + This implementation supports HTTP proxy, though HTTP_PROXY and HTTPS_PROXY environment variable. + This is not documented, but in code, see https://github.com/websocket-client/websocket-client/blob/master/websocket/_url.py#L163 + The module is not written in coroutine: https://websocket-client.readthedocs.io/en/latest/threading.html#asyncio-library-usage, so + as a workaround, the send/recv are called in "run_in_executor" method. This is not optimal in case these are blocking (low network I/O), + then this blocks the whole event loop. + TODO: remove this implementation after https://github.com/python-websockets/websockets/issues/364 is fixed and use WebSocketsClientHandler instead. + """ + def __init__(self): + self._websocket = None + + """ + Args: + **kwargs: Additional args passed to connect + https://websocket-client.readthedocs.io/en/latest/examples.html#connection-options + https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect + """ + async def connect(self, uri: str, **connect_kwargs): + self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs) + + async def send(self, msg): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg) + + async def recv(self): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) + return msg + + async def close(self, code: int = 1000): + if self._websocket is None: + # Case not opened yet, nothing to close. + await self._websocket.close(code) + +class WebSocketsClientHandler(SimpleWebSocket): + """ + Handler that use https://websockets.readthedocs.io/en/stable module. + This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364). + """ + def __init__(self): + self._websocket = None + + """ + Args: + **kwargs: Additional args passed to connect + https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection + """ + async def connect(self, uri: str, **connect_kwargs): + self._websocket = await websockets.connect(uri, **connect_kwargs) + + async def send(self, msg): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + await self._websocket.send(msg) + + async def recv(self): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + msg = await self._websocket.recv() + return msg + + async def close(self, code: int = 1000): + if self._websocket is not None: + # Case not opened yet, nothing to close. + await self._websocket.close(code) + def isNotInvalidStatusCode(value): return not isinstance(value, InvalidStatusCode) @@ -59,6 +134,7 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None, on_disconnect: List[OnDisconnectCallback] = None, keep_alive: float = 0, serializing_socket_cls: Type[SimpleWebSocket] = JsonSerializingWebSocket, + websocket_client_handler_cls: Type[SimpleWebSocket] = WebSocketsClientHandler, **kwargs): """ Args: @@ -71,8 +147,7 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None, on_disconnect (List[Coroutine]): callbacks on connection termination (each callback is called with the channel) keep_alive(float): interval in seconds to send a keep-alive ping, Defaults to 0, which means keep alive is disabled. - **kwargs: Additional args passed to connect (@see class Connect at websockets/client.py) - https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect + **kwargs: Additional args passed to connect, depends on websocket_client_handler_cls usage: @@ -105,15 +180,20 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None, self._on_connect = on_connect # serialization self._serializing_socket_cls = serializing_socket_cls + # websocket client implementation + self._websocket_client_handler_cls = websocket_client_handler_cls async def __connect__(self): try: try: logger.info(f"Trying server - {self.uri}") # Start connection - raw_ws = await websockets.connect(self.uri, **self.connect_kwargs) + #raw_ws = await websockets.connect(self.uri, **self.connect_kwargs) + #raw_ws = WebSocketClientSWS(websocket.create_connection(self.uri, **self.connect_kwargs)) + raw_ws = self._websocket_client_handler_cls() # Wrap socket in our serialization class self.ws = self._serializing_socket_cls(raw_ws) + await self.ws.connect(self.uri, **self.connect_kwargs) # Init an RPC channel to work on-top of the connection self.channel = RpcChannel( self.methods, self.ws, default_response_timeout=self.default_response_timeout) @@ -205,7 +285,8 @@ async def reader(self): # task was canceled except asyncio.CancelledError: pass - except websockets.exceptions.ConnectionClosed: + #except websockets.exceptions.ConnectionClosed: + except websocket.WebSocketConnectionClosedException: logger.info("Connection was terminated.") await self.close() except: From a293a3e53f1a383590835a3467dd61373704e51a Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Wed, 29 Nov 2023 18:25:04 +0000 Subject: [PATCH 02/25] small fix in close --- fastapi_websocket_rpc/websocket_rpc_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 39d676f..12299bf 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -53,8 +53,8 @@ async def recv(self): return msg async def close(self, code: int = 1000): - if self._websocket is None: - # Case not opened yet, nothing to close. + if self._websocket is not None: + # Case opened, we have something to close. await self._websocket.close(code) class WebSocketsClientHandler(SimpleWebSocket): @@ -88,7 +88,7 @@ async def recv(self): async def close(self, code: int = 1000): if self._websocket is not None: - # Case not opened yet, nothing to close. + # Case opened, we have something to close. await self._websocket.close(code) def isNotInvalidStatusCode(value): From 6126bf64464b239744d2adcec45458b15e2bdef7 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Fri, 1 Dec 2023 10:05:25 +0000 Subject: [PATCH 03/25] handled exceptions --- fastapi_websocket_rpc/simplewebsocket.py | 9 ++ fastapi_websocket_rpc/websocket_rpc_client.py | 84 ++++++++++++++----- 2 files changed, 72 insertions(+), 21 deletions(-) diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index f8c6043..6ffdf86 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -25,6 +25,10 @@ def recv(self): def close(self, code: int = 1000): pass + @abstractmethod + def handle_exception(self, exception: Exception): + pass + class JsonSerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): @@ -49,3 +53,8 @@ async def recv(self): async def close(self, code: int = 1000): await self._websocket.close(code) + + async def handle_exception(self, exception: Exception): + await self._websocket.handle_exception(exception) + + diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 12299bf..551a05f 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -57,6 +57,37 @@ async def close(self, code: int = 1000): # Case opened, we have something to close. await self._websocket.close(code) + async def handle_exception(self, exception: Exception): + try: + raise exception + # See https://websocket-client.readthedocs.io/en/latest/exceptions.html + except websocket._exceptions.WebSocketAddressException: + logger.info("websocket address info cannot be found") + raise + except websocket._exceptions.WebSocketBadStatusException: + logger.info("bad handshake status code") + raise + except websocket._exceptions.WebSocketConnectionClosedException: + logger.info("remote host closed the connection or some network error happened") + raise + except websocket._exceptions.WebSocketPayloadException: + logger.info( + f"WebSocket payload is invalid") + raise + except websocket._exceptions.WebSocketProtocolException: + logger.info(f"WebSocket protocol is invalid") + raise + except websocket._exceptions.WebSocketProxyException: + logger.info(f"proxy error occurred") + raise + except OSError as err: + logger.info("RPC Connection failed - %s", err) + raise + except Exception as err: + logger.exception("RPC Error") + raise + + class WebSocketsClientHandler(SimpleWebSocket): """ Handler that use https://websockets.readthedocs.io/en/stable module. @@ -91,6 +122,32 @@ async def close(self, code: int = 1000): # Case opened, we have something to close. await self._websocket.close(code) + async def handle_exception(self, exception: Exception): + try: + raise exception + except ConnectionRefusedError: + logger.info("RPC connection was refused by server") + raise + except ConnectionClosedError: + logger.info("RPC connection lost") + raise + except ConnectionClosedOK: + logger.info("RPC connection closed") + raise + except InvalidStatusCode as err: + logger.info( + f"RPC Websocket failed - with invalid status code {err.status_code}") + raise + except WebSocketException as err: + logger.info(f"RPC Websocket failed - with {err}") + raise + except OSError as err: + logger.info("RPC Connection failed - %s", err) + raise + except Exception as err: + logger.exception("RPC Error") + raise + def isNotInvalidStatusCode(value): return not isinstance(value, InvalidStatusCode) @@ -217,28 +274,13 @@ async def __connect__(self): await self.channel.close() self.cancel_tasks() raise - except ConnectionRefusedError: - logger.info("RPC connection was refused by server") - raise - except ConnectionClosedError: - logger.info("RPC connection lost") - raise - except ConnectionClosedOK: - logger.info("RPC connection closed") - raise - except InvalidStatusCode as err: - logger.info( - f"RPC Websocket failed - with invalid status code {err.status_code}") - raise - except WebSocketException as err: - logger.info(f"RPC Websocket failed - with {err}") - raise - except OSError as err: - logger.info("RPC Connection failed - %s", err) - raise except Exception as err: - logger.exception("RPC Error") - raise + if self.ws is not None: + # Exception could be websocket client specific. + await self.ws.handle_exception(err) + else: + logger.exception("RPC Error") + raise async def __aenter__(self): if self.retry_config is False: From 55987a4753cc982848336988e96f10e0a97f20c0 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Fri, 1 Dec 2023 10:17:39 +0000 Subject: [PATCH 04/25] added doc --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index e345817..4df3141 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,20 @@ from fastapi_websocket_rpc.logger import logging_config, LoggingModes logging_config.set_mode(LoggingModes.UVICORN) ``` +## HTTP(S) Proxy +By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If proxy is important, others websocket client implementation can be used, such as websocket-client module (https://websocket-client.readthedocs.io). Here is how to use it, by defining websocket_client_handler_cls parameter: + +```python +import asyncio +from fastapi_websocket_rpc import RpcMethodsBase, WebSocketRpcClient, WebSocketClientHandler + +async def run_client(uri): + async with WebSocketRpcClient(uri, RpcMethodsBase(), websocket_client_handler_cls = WebSocketClientHandler) as client: +``` + +Just set standard environment variables (lowercase and uppercase works): http_proxy, https_proxy, and no_proxy before running python script. + + ## Pull requests - welcome! - Please include tests for new features From f582aa63380b0eea5bb77e42ebe24012e4364599 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Fri, 1 Dec 2023 10:18:55 +0000 Subject: [PATCH 05/25] requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 35074d0..fb7a4b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ packaging>=20.4 pydantic>=1.9.1 uvicorn>=0.17.6,<1 websockets>=10.3,<11 +websocket-client>=1.6.4 tenacity>=8.0.1,<9 From ce683c24708e8d2b6b44109c0a8286f934aa163e Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Fri, 1 Dec 2023 10:20:07 +0000 Subject: [PATCH 06/25] clean --- fastapi_websocket_rpc/websocket_rpc_client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 551a05f..c4e8b59 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -245,8 +245,6 @@ async def __connect__(self): try: logger.info(f"Trying server - {self.uri}") # Start connection - #raw_ws = await websockets.connect(self.uri, **self.connect_kwargs) - #raw_ws = WebSocketClientSWS(websocket.create_connection(self.uri, **self.connect_kwargs)) raw_ws = self._websocket_client_handler_cls() # Wrap socket in our serialization class self.ws = self._serializing_socket_cls(raw_ws) From a81a05ccf4aa7107081674212e800efbaeef8fb9 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Fri, 1 Dec 2023 11:42:37 +0000 Subject: [PATCH 07/25] handle connection closed exception --- fastapi_websocket_rpc/simplewebsocket.py | 8 ++++ fastapi_websocket_rpc/websocket_rpc_client.py | 38 +++++++++++-------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index 6ffdf86..3a9218c 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -29,6 +29,10 @@ def close(self, code: int = 1000): def handle_exception(self, exception: Exception): pass + @abstractmethod + def isConnectionClosedException(self, exception: Exception) -> bool: + pass + class JsonSerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): @@ -57,4 +61,8 @@ async def close(self, code: int = 1000): async def handle_exception(self, exception: Exception): await self._websocket.handle_exception(exception) + async def isConnectionClosedException(self, exception: Exception) -> bool: + return await self._websocket.isConnectionClosedException(exception) + + diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index c4e8b59..026bc70 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -22,19 +22,20 @@ class WebSocketClientHandler(SimpleWebSocket): Handler that use https://websocket-client.readthedocs.io/en/latest module. This implementation supports HTTP proxy, though HTTP_PROXY and HTTPS_PROXY environment variable. This is not documented, but in code, see https://github.com/websocket-client/websocket-client/blob/master/websocket/_url.py#L163 - The module is not written in coroutine: https://websocket-client.readthedocs.io/en/latest/threading.html#asyncio-library-usage, so - as a workaround, the send/recv are called in "run_in_executor" method. This is not optimal in case these are blocking (low network I/O), - then this blocks the whole event loop. + The module is not written as coroutine: https://websocket-client.readthedocs.io/en/latest/threading.html#asyncio-library-usage, so + as a workaround, the send/recv are called in "run_in_executor" method. TODO: remove this implementation after https://github.com/python-websockets/websockets/issues/364 is fixed and use WebSocketsClientHandler instead. + + Note: the connect timeout, if not specified, is the default socket connect timeout, which could be around 2min, so a bit longer than WebSocketsClientHandler. """ def __init__(self): self._websocket = None """ Args: - **kwargs: Additional args passed to connect - https://websocket-client.readthedocs.io/en/latest/examples.html#connection-options - https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect + **kwargs: Additional args passed to connect + https://websocket-client.readthedocs.io/en/latest/examples.html#connection-options + https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect """ async def connect(self, uri: str, **connect_kwargs): self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs) @@ -87,6 +88,10 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise + async def isConnectionClosedException(self, exception: Exception) -> bool: + # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened + return isinstance(exception, websocket.WebSocketConnectionClosedException) + class WebSocketsClientHandler(SimpleWebSocket): """ @@ -98,8 +103,8 @@ def __init__(self): """ Args: - **kwargs: Additional args passed to connect - https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection + **kwargs: Additional args passed to connect + https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection """ async def connect(self, uri: str, **connect_kwargs): self._websocket = await websockets.connect(uri, **connect_kwargs) @@ -148,6 +153,9 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise + async def isConnectionClosedException(self, exception: Exception) -> bool: + return isinstance(exception, websockets.exceptions.ConnectionClosed) + def isNotInvalidStatusCode(value): return not isinstance(value, InvalidStatusCode) @@ -325,13 +333,13 @@ async def reader(self): # task was canceled except asyncio.CancelledError: pass - #except websockets.exceptions.ConnectionClosed: - except websocket.WebSocketConnectionClosedException: - logger.info("Connection was terminated.") - await self.close() - except: - logger.exception("RPC Reader task failed") - raise + except Exception as err: + if self.ws.isConnectionClosedException(err): + logger.info("Connection was terminated.") + await self.close() + else: + logger.exception("RPC Reader task failed") + raise async def _keep_alive(self): try: From 7b82905bb206076d8fb393a37f6ddce1d9cbd2e4 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Mon, 4 Dec 2023 08:46:25 +0000 Subject: [PATCH 08/25] Fix UnitTest support python 3.7: see https://github.com/websocket-client/websocket-client/blob/master/ChangeLog 1.1.0 supports HTTP_PROXY --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index fb7a4b8..a6b142f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,5 @@ packaging>=20.4 pydantic>=1.9.1 uvicorn>=0.17.6,<1 websockets>=10.3,<11 -websocket-client>=1.6.4 +websocket-client>=1.1.0 tenacity>=8.0.1,<9 From 080144be3f61392e2ef793d8d15c1a134abb2b21 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Mon, 4 Dec 2023 10:08:33 +0000 Subject: [PATCH 09/25] Fix await websocket._exceptions.WebSocketConnectionClosedException --- fastapi_websocket_rpc/websocket_rpc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 026bc70..6ec463d 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -334,7 +334,7 @@ async def reader(self): except asyncio.CancelledError: pass except Exception as err: - if self.ws.isConnectionClosedException(err): + if await self.ws.isConnectionClosedException(err): logger.info("Connection was terminated.") await self.close() else: From 161ebda0aa095f107abc3a2e65c880398d51ea0c Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Mon, 4 Dec 2023 11:13:20 +0000 Subject: [PATCH 10/25] fix close but scheduler shutdowned --- fastapi_websocket_rpc/websocket_rpc_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 6ec463d..a654dd1 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -56,7 +56,8 @@ async def recv(self): async def close(self, code: int = 1000): if self._websocket is not None: # Case opened, we have something to close. - await self._websocket.close(code) + #await asyncio.get_event_loop().run_in_executor(None, self._websocket.close,code) + self._websocket.close(code) async def handle_exception(self, exception: Exception): try: From bd4eb71f69d9b8bfde76659810da69d45b8afe69 Mon Sep 17 00:00:00 2001 From: Antoine TRAN Date: Mon, 4 Dec 2023 11:45:51 +0000 Subject: [PATCH 11/25] fix close but scheduler shutdowned --- fastapi_websocket_rpc/websocket_rpc_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index a654dd1..5da3bb4 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -56,7 +56,6 @@ async def recv(self): async def close(self, code: int = 1000): if self._websocket is not None: # Case opened, we have something to close. - #await asyncio.get_event_loop().run_in_executor(None, self._websocket.close,code) self._websocket.close(code) async def handle_exception(self, exception: Exception): @@ -126,7 +125,7 @@ async def recv(self): async def close(self, code: int = 1000): if self._websocket is not None: # Case opened, we have something to close. - await self._websocket.close(code) + self._websocket.close(code) async def handle_exception(self, exception: Exception): try: From c9572492bf0726b3441341f2d8ea55212599ebd2 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Tue, 5 Dec 2023 16:59:55 +0100 Subject: [PATCH 12/25] Update README.md Co-authored-by: Or Weis --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4df3141..205bb70 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ logging_config.set_mode(LoggingModes.UVICORN) ``` ## HTTP(S) Proxy -By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If proxy is important, others websocket client implementation can be used, such as websocket-client module (https://websocket-client.readthedocs.io). Here is how to use it, by defining websocket_client_handler_cls parameter: +By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If the ability to use a proxy is important to, another websocket client implementation can be used, e.g. websocket-client (https://websocket-client.readthedocs.io). Here is how to use it, by defining websocket_client_handler_cls parameter: ```python import asyncio From f02a8d2b490cd28965e80e160f17bee5932ebf9e Mon Sep 17 00:00:00 2001 From: trana Date: Tue, 5 Dec 2023 17:13:24 +0100 Subject: [PATCH 13/25] websocket-client is optional --- README.md | 8 +++++++- fastapi_websocket_rpc/websocket_rpc_client.py | 6 +++++- requirements.txt | 1 - setup.py | 7 +++++-- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 205bb70..4f14154 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,13 @@ logging_config.set_mode(LoggingModes.UVICORN) ``` ## HTTP(S) Proxy -By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If the ability to use a proxy is important to, another websocket client implementation can be used, e.g. websocket-client (https://websocket-client.readthedocs.io). Here is how to use it, by defining websocket_client_handler_cls parameter: +By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If the ability to use a proxy is important to, another websocket client implementation can be used, e.g. websocket-client (https://websocket-client.readthedocs.io). Here is how to use it. Installation: + +``` +pip install websocket-client +``` + +Then use websocket_client_handler_cls parameter: ```python import asyncio diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 5da3bb4..42000b0 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -5,7 +5,6 @@ import tenacity from tenacity.retry import retry_if_exception -import websocket import websockets from websockets.exceptions import InvalidStatusCode, WebSocketException, ConnectionClosedError, ConnectionClosedOK @@ -16,6 +15,11 @@ logger = get_logger("RPC_CLIENT") +try: + import websocket +except ImportError: + # Websocket-client optional module not installed. + pass class WebSocketClientHandler(SimpleWebSocket): """ diff --git a/requirements.txt b/requirements.txt index a6b142f..35074d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,4 @@ packaging>=20.4 pydantic>=1.9.1 uvicorn>=0.17.6,<1 websockets>=10.3,<11 -websocket-client>=1.1.0 tenacity>=8.0.1,<9 diff --git a/setup.py b/setup.py index 30cd797..5137d81 100644 --- a/setup.py +++ b/setup.py @@ -5,8 +5,11 @@ def get_requirements(env=""): if env: env = "-{}".format(env) with open("requirements{}.txt".format(env)) as fp: - return [x.strip() for x in fp.read().split("\n") if not x.startswith("#")] - + requirements = [x.strip() for x in fp.read().split("\n") if not x.startswith("#")] + withWebsocketClient = os.environ.get("WITH_WEBSOCKET_CLIENT", "False") + if bool(withWebsocketClient): + requirements.append("websocket-client>=1.1.0") + return requirements with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() From 3624a612bcfe67ff2548cd84199f7aed4b2fb163 Mon Sep 17 00:00:00 2001 From: trana Date: Tue, 5 Dec 2023 17:35:30 +0100 Subject: [PATCH 14/25] Using BaseConnectionClosedException --- fastapi_websocket_rpc/simplewebsocket.py | 15 +++++----- fastapi_websocket_rpc/websocket_rpc_client.py | 30 +++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index 3a9218c..4406490 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -4,6 +4,12 @@ from .utils import pydantic_serialize +class BaseConnectionClosedException(Exception): + """ + Base exception related to closed TCP connection. + """ + pass + class SimpleWebSocket(ABC): """ Abstract base class for all websocket related wrappers. @@ -17,6 +23,7 @@ def connect(self, uri: str, **connect_kwargs): def send(self, msg): pass + # This is where BaseConnectionClosedException can be thrown. @abstractmethod def recv(self): pass @@ -29,10 +36,6 @@ def close(self, code: int = 1000): def handle_exception(self, exception: Exception): pass - @abstractmethod - def isConnectionClosedException(self, exception: Exception) -> bool: - pass - class JsonSerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): @@ -61,8 +64,4 @@ async def close(self, code: int = 1000): async def handle_exception(self, exception: Exception): await self._websocket.handle_exception(exception) - async def isConnectionClosedException(self, exception: Exception) -> bool: - return await self._websocket.isConnectionClosedException(exception) - - diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 42000b0..479cc25 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -54,7 +54,11 @@ async def recv(self): if self._websocket is None: # connect must be called before. logging.error("Websocket connect() must be called before.") - msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) + try: + msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) + except websocket.WebSocketConnectionClosedException as err: + # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened + raise BaseConnectionClosedException from err return msg async def close(self, code: int = 1000): @@ -92,11 +96,6 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise - async def isConnectionClosedException(self, exception: Exception) -> bool: - # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened - return isinstance(exception, websocket.WebSocketConnectionClosedException) - - class WebSocketsClientHandler(SimpleWebSocket): """ Handler that use https://websockets.readthedocs.io/en/stable module. @@ -123,7 +122,10 @@ async def recv(self): if self._websocket is None: # connect must be called before. logging.error("Websocket connect() must be called before.") - msg = await self._websocket.recv() + try: + msg = await self._websocket.recv() + except websockets.exceptions.ConnectionClosed as err: + raise BaseConnectionClosedException from err return msg async def close(self, code: int = 1000): @@ -157,9 +159,6 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise - async def isConnectionClosedException(self, exception: Exception) -> bool: - return isinstance(exception, websockets.exceptions.ConnectionClosed) - def isNotInvalidStatusCode(value): return not isinstance(value, InvalidStatusCode) @@ -337,13 +336,12 @@ async def reader(self): # task was canceled except asyncio.CancelledError: pass + except BaseConnectionClosedException: + logger.info("Connection was terminated.") + await self.close() except Exception as err: - if await self.ws.isConnectionClosedException(err): - logger.info("Connection was terminated.") - await self.close() - else: - logger.exception("RPC Reader task failed") - raise + logger.exception("RPC Reader task failed") + raise async def _keep_alive(self): try: From 7d899469f23299b7e808729acd0bbfba7a9d0143 Mon Sep 17 00:00:00 2001 From: trana Date: Thu, 21 Dec 2023 16:15:54 +0100 Subject: [PATCH 15/25] Fix remark BaseConnectionClosedException ugly --- fastapi_websocket_rpc/simplewebsocket.py | 11 +++------- fastapi_websocket_rpc/websocket_rpc_client.py | 20 ++++++++++++------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index 4406490..54a1330 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -4,12 +4,6 @@ from .utils import pydantic_serialize -class BaseConnectionClosedException(Exception): - """ - Base exception related to closed TCP connection. - """ - pass - class SimpleWebSocket(ABC): """ Abstract base class for all websocket related wrappers. @@ -23,7 +17,7 @@ def connect(self, uri: str, **connect_kwargs): def send(self, msg): pass - # This is where BaseConnectionClosedException can be thrown. + # If return None, then it means Connection is closed, and we stop receiving and close. @abstractmethod def recv(self): pass @@ -55,7 +49,8 @@ async def send(self, msg): async def recv(self): msg = await self._websocket.recv() - + if msg is None: + return None return self._deserialize(msg) async def close(self, code: int = 1000): diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 479cc25..b10edf4 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -57,8 +57,10 @@ async def recv(self): try: msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) except websocket.WebSocketConnectionClosedException as err: + logger.debug("Connection closed.", exc_info=True) # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened - raise BaseConnectionClosedException from err + # Returning None to ensure we get out of the loop, with no Exception. + return None return msg async def close(self, code: int = 1000): @@ -124,8 +126,9 @@ async def recv(self): logging.error("Websocket connect() must be called before.") try: msg = await self._websocket.recv() - except websockets.exceptions.ConnectionClosed as err: - raise BaseConnectionClosedException from err + except websockets.exceptions.ConnectionClosed: + logger.debug("Connection closed.", exc_info=True) + return None return msg async def close(self, code: int = 1000): @@ -331,14 +334,17 @@ async def reader(self): try: while True: raw_message = await self.ws.recv() - await self.channel.on_message(raw_message) + if raw_message is None: + # None is a special case where connection is closed. + logger.info("Connection was terminated.") + await self.close() + break + else: + await self.channel.on_message(raw_message) # Graceful external termination options # task was canceled except asyncio.CancelledError: pass - except BaseConnectionClosedException: - logger.info("Connection was terminated.") - await self.close() except Exception as err: logger.exception("RPC Reader task failed") raise From 54e12efd4353ffa1a0f94bbc97e041385f9771c4 Mon Sep 17 00:00:00 2001 From: trana Date: Thu, 21 Dec 2023 16:26:57 +0100 Subject: [PATCH 16/25] Removed handle_exception and moved it to connect() to avoid reraise --- fastapi_websocket_rpc/websocket_rpc_client.py | 100 ++++++++---------- 1 file changed, 45 insertions(+), 55 deletions(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index b10edf4..e7e742c 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -42,35 +42,8 @@ def __init__(self): https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect """ async def connect(self, uri: str, **connect_kwargs): - self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs) - - async def send(self, msg): - if self._websocket is None: - # connect must be called before. - logging.error("Websocket connect() must be called before.") - await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg) - - async def recv(self): - if self._websocket is None: - # connect must be called before. - logging.error("Websocket connect() must be called before.") - try: - msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) - except websocket.WebSocketConnectionClosedException as err: - logger.debug("Connection closed.", exc_info=True) - # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened - # Returning None to ensure we get out of the loop, with no Exception. - return None - return msg - - async def close(self, code: int = 1000): - if self._websocket is not None: - # Case opened, we have something to close. - self._websocket.close(code) - - async def handle_exception(self, exception: Exception): try: - raise exception + self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs) # See https://websocket-client.readthedocs.io/en/latest/exceptions.html except websocket._exceptions.WebSocketAddressException: logger.info("websocket address info cannot be found") @@ -98,36 +71,22 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise -class WebSocketsClientHandler(SimpleWebSocket): - """ - Handler that use https://websockets.readthedocs.io/en/stable module. - This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364). - """ - def __init__(self): - self._websocket = None - - """ - Args: - **kwargs: Additional args passed to connect - https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection - """ - async def connect(self, uri: str, **connect_kwargs): - self._websocket = await websockets.connect(uri, **connect_kwargs) - async def send(self, msg): if self._websocket is None: # connect must be called before. logging.error("Websocket connect() must be called before.") - await self._websocket.send(msg) + await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg) async def recv(self): if self._websocket is None: # connect must be called before. logging.error("Websocket connect() must be called before.") try: - msg = await self._websocket.recv() - except websockets.exceptions.ConnectionClosed: + msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) + except websocket.WebSocketConnectionClosedException as err: logger.debug("Connection closed.", exc_info=True) + # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened + # Returning None to ensure we get out of the loop, with no Exception. return None return msg @@ -136,9 +95,22 @@ async def close(self, code: int = 1000): # Case opened, we have something to close. self._websocket.close(code) - async def handle_exception(self, exception: Exception): +class WebSocketsClientHandler(SimpleWebSocket): + """ + Handler that use https://websockets.readthedocs.io/en/stable module. + This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364). + """ + def __init__(self): + self._websocket = None + + """ + Args: + **kwargs: Additional args passed to connect + https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection + """ + async def connect(self, uri: str, **connect_kwargs): try: - raise exception + self._websocket = await websockets.connect(uri, **connect_kwargs) except ConnectionRefusedError: logger.info("RPC connection was refused by server") raise @@ -162,6 +134,28 @@ async def handle_exception(self, exception: Exception): logger.exception("RPC Error") raise + async def send(self, msg): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + await self._websocket.send(msg) + + async def recv(self): + if self._websocket is None: + # connect must be called before. + logging.error("Websocket connect() must be called before.") + try: + msg = await self._websocket.recv() + except websockets.exceptions.ConnectionClosed: + logger.debug("Connection closed.", exc_info=True) + return None + return msg + + async def close(self, code: int = 1000): + if self._websocket is not None: + # Case opened, we have something to close. + self._websocket.close(code) + def isNotInvalidStatusCode(value): return not isinstance(value, InvalidStatusCode) @@ -287,12 +281,8 @@ async def __connect__(self): self.cancel_tasks() raise except Exception as err: - if self.ws is not None: - # Exception could be websocket client specific. - await self.ws.handle_exception(err) - else: - logger.exception("RPC Error") - raise + logger.exception("RPC Error") + raise async def __aenter__(self): if self.retry_config is False: From 385df4392aca04a49ff934ed349f8f3eb1b30495 Mon Sep 17 00:00:00 2001 From: trana Date: Thu, 21 Dec 2023 16:30:37 +0100 Subject: [PATCH 17/25] Renamed WebSocketClientHandler to ProxyEnabledWebSocketClientHandler --- README.md | 4 ++-- fastapi_websocket_rpc/__init__.py | 2 +- fastapi_websocket_rpc/websocket_rpc_client.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4f14154..1fb33fd 100644 --- a/README.md +++ b/README.md @@ -164,10 +164,10 @@ Then use websocket_client_handler_cls parameter: ```python import asyncio -from fastapi_websocket_rpc import RpcMethodsBase, WebSocketRpcClient, WebSocketClientHandler +from fastapi_websocket_rpc import RpcMethodsBase, WebSocketRpcClient, ProxyEnabledWebSocketClientHandler async def run_client(uri): - async with WebSocketRpcClient(uri, RpcMethodsBase(), websocket_client_handler_cls = WebSocketClientHandler) as client: + async with WebSocketRpcClient(uri, RpcMethodsBase(), websocket_client_handler_cls = ProxyEnabledWebSocketClientHandler) as client: ``` Just set standard environment variables (lowercase and uppercase works): http_proxy, https_proxy, and no_proxy before running python script. diff --git a/fastapi_websocket_rpc/__init__.py b/fastapi_websocket_rpc/__init__.py index 74f5a7f..662fe50 100644 --- a/fastapi_websocket_rpc/__init__.py +++ b/fastapi_websocket_rpc/__init__.py @@ -1,6 +1,6 @@ from .rpc_methods import RpcMethodsBase, RpcUtilityMethods from .websocket_rpc_client import WebSocketRpcClient -from .websocket_rpc_client import WebSocketClientHandler +from .websocket_rpc_client import ProxyEnabledWebSocketClientHandler from .websocket_rpc_client import WebSocketsClientHandler from .websocket_rpc_endpoint import WebsocketRPCEndpoint from .rpc_channel import RpcChannel diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index e7e742c..312d992 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -21,7 +21,7 @@ # Websocket-client optional module not installed. pass -class WebSocketClientHandler(SimpleWebSocket): +class ProxyEnabledWebSocketClientHandler (SimpleWebSocket): """ Handler that use https://websocket-client.readthedocs.io/en/latest module. This implementation supports HTTP proxy, though HTTP_PROXY and HTTPS_PROXY environment variable. From 3e985f6f0e43746daff64860ee3f2473f3fd382a Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 10:49:06 +0100 Subject: [PATCH 18/25] Fix double logging on connect error --- fastapi_websocket_rpc/websocket_rpc_client.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index 312d992..d545973 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -249,14 +249,20 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None, self._websocket_client_handler_cls = websocket_client_handler_cls async def __connect__(self): + logger.info(f"Trying server - {self.uri}") + try: + raw_ws = self._websocket_client_handler_cls() + # Wrap socket in our serialization class + self.ws = self._serializing_socket_cls(raw_ws) + except: + logger.exception("Class instantiation error.") + raise + # No try/catch for connect() to avoid double error logging. Any exception from the method must be handled by + # itself for logging, then raised and caught outside of connect() (e.g.: for retry purpose). + # Start connection + await self.ws.connect(self.uri, **self.connect_kwargs) try: try: - logger.info(f"Trying server - {self.uri}") - # Start connection - raw_ws = self._websocket_client_handler_cls() - # Wrap socket in our serialization class - self.ws = self._serializing_socket_cls(raw_ws) - await self.ws.connect(self.uri, **self.connect_kwargs) # Init an RPC channel to work on-top of the connection self.channel = RpcChannel( self.methods, self.ws, default_response_timeout=self.default_response_timeout) From f9916f9c1a7cbdca6616dcf2de1292f3bb190b62 Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 14:52:00 +0100 Subject: [PATCH 19/25] Fix Can't instantiate abstract class ProxyEnabledWebSocketClientHandler with abstract methods handle_exception --- fastapi_websocket_rpc/simplewebsocket.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fastapi_websocket_rpc/simplewebsocket.py b/fastapi_websocket_rpc/simplewebsocket.py index 54a1330..f66b3d9 100644 --- a/fastapi_websocket_rpc/simplewebsocket.py +++ b/fastapi_websocket_rpc/simplewebsocket.py @@ -26,10 +26,6 @@ def recv(self): def close(self, code: int = 1000): pass - @abstractmethod - def handle_exception(self, exception: Exception): - pass - class JsonSerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): @@ -56,7 +52,3 @@ async def recv(self): async def close(self, code: int = 1000): await self._websocket.close(code) - async def handle_exception(self, exception: Exception): - await self._websocket.handle_exception(exception) - - From 21ef43d655d5c29784b95e4c645a3a1a30700af7 Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 16:24:59 +0100 Subject: [PATCH 20/25] Fix Can't instantiate abstract class WebSocketSimplifier with abstract methods connect --- fastapi_websocket_rpc/websocket_rpc_endpoint.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fastapi_websocket_rpc/websocket_rpc_endpoint.py b/fastapi_websocket_rpc/websocket_rpc_endpoint.py index 5a887e6..dfdb8a5 100644 --- a/fastapi_websocket_rpc/websocket_rpc_endpoint.py +++ b/fastapi_websocket_rpc/websocket_rpc_endpoint.py @@ -21,6 +21,10 @@ def __init__(self, websocket: WebSocket, frame_type: WebSocketFrameType = WebSoc self.websocket = websocket self.frame_type = frame_type + # This method is only useful on websocket_rpc_client. Here on endpoint file, it has nothing to connect to. + def connect(self, uri: str, **connect_kwargs): + pass + @property def send(self): if self.frame_type == WebSocketFrameType.Binary: From 7917f3135c333626b76e3b6d378d30df6a07726e Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 16:45:26 +0100 Subject: [PATCH 21/25] Fix wrong class name WebSocketConnectionClosedException --- fastapi_websocket_rpc/websocket_rpc_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_client.py b/fastapi_websocket_rpc/websocket_rpc_client.py index d545973..7b3079b 100644 --- a/fastapi_websocket_rpc/websocket_rpc_client.py +++ b/fastapi_websocket_rpc/websocket_rpc_client.py @@ -83,7 +83,7 @@ async def recv(self): logging.error("Websocket connect() must be called before.") try: msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv) - except websocket.WebSocketConnectionClosedException as err: + except websocket._exceptions.WebSocketConnectionClosedException as err: logger.debug("Connection closed.", exc_info=True) # websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened # Returning None to ensure we get out of the loop, with no Exception. From 80ff102fecf311e2d742b8de6b15d0d79d1532c5 Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 18:21:32 +0100 Subject: [PATCH 22/25] setup.py: missing import os --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5137d81..9af6d4c 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ from setuptools import setup, find_packages - +import os def get_requirements(env=""): if env: From 91d8d80c68fb972859e1e478d297b24a7de9fcb9 Mon Sep 17 00:00:00 2001 From: trana Date: Wed, 3 Jan 2024 20:46:26 +0100 Subject: [PATCH 23/25] Fix "pytest cannot find module": rm __initL__.py, manual sys.path, add pythonpath --- __init__.py | 0 pytest.ini | 1 + tests/advanced_rpc_test.py | 4 ---- tests/basic_rpc_test.py | 4 ---- tests/binary_rpc_test.py | 5 ----- tests/custom_methods_test.py | 4 ---- tests/fast_api_depends_test.py | 4 ---- tests/trigger_flow_test.py | 4 ---- 8 files changed, 1 insertion(+), 25 deletions(-) delete mode 100644 __init__.py diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pytest.ini b/pytest.ini index 16c88ba..e4a4dcb 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ # Handling DeprecationWarning 'asyncio_mode' default value [pytest] asyncio_mode = strict +pythonpath = . diff --git a/tests/advanced_rpc_test.py b/tests/advanced_rpc_test.py index 654e970..d82a324 100644 --- a/tests/advanced_rpc_test.py +++ b/tests/advanced_rpc_test.py @@ -1,10 +1,6 @@ import os import sys -# Add parent path to use local src as package for tests -sys.path.append(os.path.abspath(os.path.join( - os.path.dirname(__file__), os.path.pardir))) - import time import asyncio from multiprocessing import Process diff --git a/tests/basic_rpc_test.py b/tests/basic_rpc_test.py index 68b1a87..b82bf19 100644 --- a/tests/basic_rpc_test.py +++ b/tests/basic_rpc_test.py @@ -2,10 +2,6 @@ import os import sys -# Add parent path to use local src as package for tests -sys.path.append(os.path.abspath(os.path.join( - os.path.dirname(__file__), os.path.pardir))) - import asyncio from multiprocessing import Process diff --git a/tests/binary_rpc_test.py b/tests/binary_rpc_test.py index e34c7f9..656716c 100644 --- a/tests/binary_rpc_test.py +++ b/tests/binary_rpc_test.py @@ -2,11 +2,6 @@ import os import sys -# Add parent path to use local src as package for tests -sys.path.append( - os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)) -) - import json from multiprocessing import Process diff --git a/tests/custom_methods_test.py b/tests/custom_methods_test.py index c8937b9..013cb05 100644 --- a/tests/custom_methods_test.py +++ b/tests/custom_methods_test.py @@ -12,10 +12,6 @@ import os import sys -# Add parent path to use local src as package for tests -sys.path.append(os.path.abspath(os.path.join( - os.path.dirname(__file__), os.path.pardir))) - # Configurable PORT = int(os.environ.get("PORT") or "9000") diff --git a/tests/fast_api_depends_test.py b/tests/fast_api_depends_test.py index 07baf0a..8b6868d 100644 --- a/tests/fast_api_depends_test.py +++ b/tests/fast_api_depends_test.py @@ -3,10 +3,6 @@ from websockets.exceptions import InvalidStatusCode -# Add parent path to use local src as package for tests -sys.path.append(os.path.abspath(os.path.join( - os.path.dirname(__file__), os.path.pardir))) - from multiprocessing import Process import pytest diff --git a/tests/trigger_flow_test.py b/tests/trigger_flow_test.py index 4eb1fe7..6c0cfd7 100644 --- a/tests/trigger_flow_test.py +++ b/tests/trigger_flow_test.py @@ -5,10 +5,6 @@ import os import sys -# Add parent path to use local src as package for tests -sys.path.append(os.path.abspath(os.path.join( - os.path.dirname(__file__), os.path.pardir))) - from fastapi_websocket_rpc.utils import gen_uid from fastapi_websocket_rpc.websocket_rpc_endpoint import WebsocketRPCEndpoint from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient From c64a0c9c7eb1e138a953804f96fcca48b6557e35 Mon Sep 17 00:00:00 2001 From: trana Date: Thu, 4 Jan 2024 10:51:23 +0100 Subject: [PATCH 24/25] pytest verbose --- .github/workflows/tests.yml | 2 +- pytest.ini | 5 +++++ tests/basic_rpc_test.py | 2 +- tests/binary_rpc_test.py | 8 +++++++- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f6be2ad..8781869 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,4 +37,4 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest + pytest -v --capture=tee-sys diff --git a/pytest.ini b/pytest.ini index e4a4dcb..359a939 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,8 @@ [pytest] asyncio_mode = strict pythonpath = . +log_cli = 1 +log_cli_level = DEBUG +log_cli_format = %(asctime)s [%(levelname)s] (%(filename)s:%(lineno)s) %(message)s +log_date_format = %Y-%m-%d %H:%M:%S + diff --git a/tests/basic_rpc_test.py b/tests/basic_rpc_test.py index b82bf19..f637547 100644 --- a/tests/basic_rpc_test.py +++ b/tests/basic_rpc_test.py @@ -10,7 +10,7 @@ from fastapi import FastAPI from fastapi_websocket_rpc.rpc_methods import RpcUtilityMethods -from fastapi_websocket_rpc.logger import logging_config, LoggingModes +from fastapi_websocket_rpc.logger import logging_config, LoggingModes, get_logger from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient from fastapi_websocket_rpc.websocket_rpc_endpoint import WebsocketRPCEndpoint from fastapi_websocket_rpc.utils import gen_uid diff --git a/tests/binary_rpc_test.py b/tests/binary_rpc_test.py index 656716c..b068549 100644 --- a/tests/binary_rpc_test.py +++ b/tests/binary_rpc_test.py @@ -10,7 +10,7 @@ from fastapi import FastAPI from fastapi_websocket_rpc import WebSocketFrameType -from fastapi_websocket_rpc.logger import LoggingModes, logging_config +from fastapi_websocket_rpc.logger import LoggingModes, logging_config, get_logger from fastapi_websocket_rpc.rpc_methods import RpcUtilityMethods from fastapi_websocket_rpc.simplewebsocket import SimpleWebSocket from fastapi_websocket_rpc.utils import pydantic_serialize @@ -20,6 +20,8 @@ # Set debug logs (and direct all logs to UVICORN format) logging_config.set_mode(LoggingModes.UVICORN, logging.DEBUG) +logger = get_logger(__name__) + # Configurable PORT = int(os.environ.get("PORT") or "9000") uri = f"ws://localhost:{PORT}/ws" @@ -72,14 +74,18 @@ async def test_echo(server): """ Test basic RPC with a simple echo """ + logger.debug("before test_echo") async with WebSocketRpcClient( uri, RpcUtilityMethods(), default_response_timeout=4, serializing_socket_cls=BinarySerializingWebSocket, ) as client: + logger.debug("Initialized WebSocketRpcClient") text = "Hello World!" + logger.debug("Waiting for response...") response = await client.other.echo(text=text) + logger.debug("Response: %s", str(response)) assert response.result == text From ed2ce1259164539b06c43695ec845ed58322b7e2 Mon Sep 17 00:00:00 2001 From: trana Date: Thu, 4 Jan 2024 11:24:27 +0100 Subject: [PATCH 25/25] Fix Can't instantiate abstract class BinarySerializingWebSocket with abstract method connect --- fastapi_websocket_rpc/websocket_rpc_endpoint.py | 2 +- tests/binary_rpc_test.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/fastapi_websocket_rpc/websocket_rpc_endpoint.py b/fastapi_websocket_rpc/websocket_rpc_endpoint.py index dfdb8a5..4197728 100644 --- a/fastapi_websocket_rpc/websocket_rpc_endpoint.py +++ b/fastapi_websocket_rpc/websocket_rpc_endpoint.py @@ -22,7 +22,7 @@ def __init__(self, websocket: WebSocket, frame_type: WebSocketFrameType = WebSoc self.frame_type = frame_type # This method is only useful on websocket_rpc_client. Here on endpoint file, it has nothing to connect to. - def connect(self, uri: str, **connect_kwargs): + async def connect(self, uri: str, **connect_kwargs): pass @property diff --git a/tests/binary_rpc_test.py b/tests/binary_rpc_test.py index b068549..b9858ed 100644 --- a/tests/binary_rpc_test.py +++ b/tests/binary_rpc_test.py @@ -31,6 +31,9 @@ class BinarySerializingWebSocket(SimpleWebSocket): def __init__(self, websocket: SimpleWebSocket): self._websocket = websocket + async def connect(self, uri: str, **connect_kwargs): + await self._websocket.connect(uri, **connect_kwargs) + def _serialize(self, msg): return pydantic_serialize(msg).encode()