Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added websocket client handler, and websocket-client module #32

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
256c709
Added websocket client handler, and websocket-client module
Nov 29, 2023
a293a3e
small fix in close
Nov 29, 2023
6126bf6
handled exceptions
Dec 1, 2023
55987a4
added doc
Dec 1, 2023
f582aa6
requirements.txt
Dec 1, 2023
ce683c2
clean
Dec 1, 2023
a81a05c
handle connection closed exception
Dec 1, 2023
7b82905
Fix UnitTest support python 3.7: see https://github.com/websocket-cli…
Dec 4, 2023
080144b
Fix await websocket._exceptions.WebSocketConnectionClosedException
Dec 4, 2023
161ebda
fix close but scheduler shutdowned
Dec 4, 2023
bd4eb71
fix close but scheduler shutdowned
Dec 4, 2023
c957249
Update README.md
antoinetran Dec 5, 2023
f02a8d2
websocket-client is optional
Dec 5, 2023
3624a61
Using BaseConnectionClosedException
Dec 5, 2023
7d89946
Fix remark BaseConnectionClosedException ugly
Dec 21, 2023
54e12ef
Removed handle_exception and moved it to connect() to avoid reraise
Dec 21, 2023
385df43
Renamed WebSocketClientHandler to ProxyEnabledWebSocketClientHandler
Dec 21, 2023
3e985f6
Fix double logging on connect error
Jan 3, 2024
f9916f9
Fix Can't instantiate abstract class ProxyEnabledWebSocketClientHandler
Jan 3, 2024
21ef43d
Fix Can't instantiate abstract class WebSocketSimplifier with abstract
Jan 3, 2024
7917f31
Fix wrong class name WebSocketConnectionClosedException
Jan 3, 2024
80ff102
setup.py: missing import os
Jan 3, 2024
91d8d80
Fix "pytest cannot find module": rm __initL__.py, manual sys.path, add
Jan 3, 2024
c64a0c9
pytest verbose
Jan 4, 2024
ed2ce12
Fix Can't instantiate abstract class BinarySerializingWebSocket with
Jan 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)
```

## HTTP(S) Proxy
By default, fastapi-websocket-rpc uses websockets module as websocket client handler. This does not support HTTP(S) Proxy, see https://github.com/python-websockets/websockets/issues/364 . If proxy is important, others websocket client implementation can be used, such as websocket-client module (https://websocket-client.readthedocs.io). Here is how to use it, by defining websocket_client_handler_cls parameter:
antoinetran marked this conversation as resolved.
Show resolved Hide resolved

```python
import asyncio
from fastapi_websocket_rpc import RpcMethodsBase, WebSocketRpcClient, WebSocketClientHandler

async def run_client(uri):
async with WebSocketRpcClient(uri, RpcMethodsBase(), websocket_client_handler_cls = WebSocketClientHandler) as client:
```

Just set standard environment variables (lowercase and uppercase works): http_proxy, https_proxy, and no_proxy before running python script.


## Pull requests - welcome!
- Please include tests for new features

Expand Down
2 changes: 2 additions & 0 deletions fastapi_websocket_rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .rpc_methods import RpcMethodsBase, RpcUtilityMethods
from .websocket_rpc_client import WebSocketRpcClient
from .websocket_rpc_client import WebSocketClientHandler
from .websocket_rpc_client import WebSocketsClientHandler
from .websocket_rpc_endpoint import WebsocketRPCEndpoint
from .rpc_channel import RpcChannel
from .schemas import WebSocketFrameType
24 changes: 24 additions & 0 deletions fastapi_websocket_rpc/simplewebsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ class SimpleWebSocket(ABC):
Abstract base class for all websocket related wrappers.
"""

@abstractmethod
def connect(self, uri: str, **connect_kwargs):
pass

@abstractmethod
def send(self, msg):
pass
Expand All @@ -21,11 +25,22 @@ def recv(self):
def close(self, code: int = 1000):
pass

@abstractmethod
def handle_exception(self, exception: Exception):
pass

@abstractmethod
def isConnectionClosedException(self, exception: Exception) -> bool:
pass


class JsonSerializingWebSocket(SimpleWebSocket):
def __init__(self, websocket: SimpleWebSocket):
self._websocket = websocket

async def connect(self, uri: str, **connect_kwargs):
await self._websocket.connect(uri, **connect_kwargs)

def _serialize(self, msg):
return pydantic_serialize(msg)

Expand All @@ -42,3 +57,12 @@ async def recv(self):

async def close(self, code: int = 1000):
await self._websocket.close(code)

async def handle_exception(self, exception: Exception):
await self._websocket.handle_exception(exception)

async def isConnectionClosedException(self, exception: Exception) -> bool:
return await self._websocket.isConnectionClosedException(exception)



189 changes: 159 additions & 30 deletions fastapi_websocket_rpc/websocket_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tenacity
from tenacity.retry import retry_if_exception

import websocket
orweis marked this conversation as resolved.
Show resolved Hide resolved
import websockets
from websockets.exceptions import InvalidStatusCode, WebSocketException, ConnectionClosedError, ConnectionClosedOK

Expand All @@ -16,6 +17,145 @@
logger = get_logger("RPC_CLIENT")


class WebSocketClientHandler(SimpleWebSocket):
"""
Handler that use https://websocket-client.readthedocs.io/en/latest module.
This implementation supports HTTP proxy, though HTTP_PROXY and HTTPS_PROXY environment variable.
This is not documented, but in code, see https://github.com/websocket-client/websocket-client/blob/master/websocket/_url.py#L163
The module is not written as coroutine: https://websocket-client.readthedocs.io/en/latest/threading.html#asyncio-library-usage, so
as a workaround, the send/recv are called in "run_in_executor" method.
TODO: remove this implementation after https://github.com/python-websockets/websockets/issues/364 is fixed and use WebSocketsClientHandler instead.

Note: the connect timeout, if not specified, is the default socket connect timeout, which could be around 2min, so a bit longer than WebSocketsClientHandler.
"""
def __init__(self):
self._websocket = None

"""
Args:
**kwargs: Additional args passed to connect
https://websocket-client.readthedocs.io/en/latest/examples.html#connection-options
https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect
"""
async def connect(self, uri: str, **connect_kwargs):
self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs)

async def send(self, msg):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg)

async def recv(self):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv)
return msg

async def close(self, code: int = 1000):
if self._websocket is not None:
# Case opened, we have something to close.
await self._websocket.close(code)

async def handle_exception(self, exception: Exception):
try:
raise exception
# See https://websocket-client.readthedocs.io/en/latest/exceptions.html
except websocket._exceptions.WebSocketAddressException:
logger.info("websocket address info cannot be found")
raise
except websocket._exceptions.WebSocketBadStatusException:
logger.info("bad handshake status code")
raise
except websocket._exceptions.WebSocketConnectionClosedException:
logger.info("remote host closed the connection or some network error happened")
raise
except websocket._exceptions.WebSocketPayloadException:
logger.info(
f"WebSocket payload is invalid")
raise
except websocket._exceptions.WebSocketProtocolException:
logger.info(f"WebSocket protocol is invalid")
raise
except websocket._exceptions.WebSocketProxyException:
logger.info(f"proxy error occurred")
raise
except OSError as err:
logger.info("RPC Connection failed - %s", err)
raise
except Exception as err:
logger.exception("RPC Error")
raise

async def isConnectionClosedException(self, exception: Exception) -> bool:
# websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened
return isinstance(exception, websocket.WebSocketConnectionClosedException)


class WebSocketsClientHandler(SimpleWebSocket):
orweis marked this conversation as resolved.
Show resolved Hide resolved
"""
Handler that use https://websockets.readthedocs.io/en/stable module.
This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364).
"""
def __init__(self):
self._websocket = None

"""
Args:
**kwargs: Additional args passed to connect
https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection
"""
async def connect(self, uri: str, **connect_kwargs):
self._websocket = await websockets.connect(uri, **connect_kwargs)

async def send(self, msg):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
await self._websocket.send(msg)

async def recv(self):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
msg = await self._websocket.recv()
return msg

async def close(self, code: int = 1000):
if self._websocket is not None:
# Case opened, we have something to close.
await self._websocket.close(code)

async def handle_exception(self, exception: Exception):
try:
raise exception
orweis marked this conversation as resolved.
Show resolved Hide resolved
except ConnectionRefusedError:
logger.info("RPC connection was refused by server")
raise
except ConnectionClosedError:
logger.info("RPC connection lost")
raise
except ConnectionClosedOK:
logger.info("RPC connection closed")
raise
except InvalidStatusCode as err:
logger.info(
f"RPC Websocket failed - with invalid status code {err.status_code}")
raise
except WebSocketException as err:
logger.info(f"RPC Websocket failed - with {err}")
raise
except OSError as err:
logger.info("RPC Connection failed - %s", err)
raise
except Exception as err:
logger.exception("RPC Error")
raise

async def isConnectionClosedException(self, exception: Exception) -> bool:
return isinstance(exception, websockets.exceptions.ConnectionClosed)

def isNotInvalidStatusCode(value):
return not isinstance(value, InvalidStatusCode)

Expand Down Expand Up @@ -59,6 +199,7 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None,
on_disconnect: List[OnDisconnectCallback] = None,
keep_alive: float = 0,
serializing_socket_cls: Type[SimpleWebSocket] = JsonSerializingWebSocket,
websocket_client_handler_cls: Type[SimpleWebSocket] = WebSocketsClientHandler,
**kwargs):
"""
Args:
Expand All @@ -71,8 +212,7 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None,
on_disconnect (List[Coroutine]): callbacks on connection termination (each callback is called with the channel)
keep_alive(float): interval in seconds to send a keep-alive ping, Defaults to 0, which means keep alive is disabled.

**kwargs: Additional args passed to connect (@see class Connect at websockets/client.py)
https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect
**kwargs: Additional args passed to connect, depends on websocket_client_handler_cls


usage:
Expand Down Expand Up @@ -105,15 +245,18 @@ def __init__(self, uri: str, methods: RpcMethodsBase = None,
self._on_connect = on_connect
# serialization
self._serializing_socket_cls = serializing_socket_cls
# websocket client implementation
self._websocket_client_handler_cls = websocket_client_handler_cls

async def __connect__(self):
try:
try:
logger.info(f"Trying server - {self.uri}")
# Start connection
raw_ws = await websockets.connect(self.uri, **self.connect_kwargs)
raw_ws = self._websocket_client_handler_cls()
# Wrap socket in our serialization class
self.ws = self._serializing_socket_cls(raw_ws)
await self.ws.connect(self.uri, **self.connect_kwargs)
# Init an RPC channel to work on-top of the connection
self.channel = RpcChannel(
self.methods, self.ws, default_response_timeout=self.default_response_timeout)
Expand All @@ -137,28 +280,13 @@ async def __connect__(self):
await self.channel.close()
self.cancel_tasks()
raise
except ConnectionRefusedError:
logger.info("RPC connection was refused by server")
raise
except ConnectionClosedError:
logger.info("RPC connection lost")
raise
except ConnectionClosedOK:
logger.info("RPC connection closed")
raise
except InvalidStatusCode as err:
logger.info(
f"RPC Websocket failed - with invalid status code {err.status_code}")
raise
except WebSocketException as err:
logger.info(f"RPC Websocket failed - with {err}")
raise
except OSError as err:
logger.info("RPC Connection failed - %s", err)
raise
except Exception as err:
logger.exception("RPC Error")
raise
if self.ws is not None:
# Exception could be websocket client specific.
await self.ws.handle_exception(err)
else:
logger.exception("RPC Error")
raise

async def __aenter__(self):
if self.retry_config is False:
Expand Down Expand Up @@ -205,12 +333,13 @@ async def reader(self):
# task was canceled
except asyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosed:
logger.info("Connection was terminated.")
await self.close()
except:
logger.exception("RPC Reader task failed")
raise
except Exception as err:
if self.ws.isConnectionClosedException(err):
antoinetran marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Connection was terminated.")
await self.close()
else:
logger.exception("RPC Reader task failed")
raise

async def _keep_alive(self):
try:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ packaging>=20.4
pydantic>=1.9.1
uvicorn>=0.17.6,<1
websockets>=10.3,<11
websocket-client>=1.6.4
orweis marked this conversation as resolved.
Show resolved Hide resolved
tenacity>=8.0.1,<9
Loading