Skip to content

Commit

Permalink
Add new asyncio implementation support for websockets 13.0; handle Co…
Browse files Browse the repository at this point in the history
…nnectionError exception; remove close_timeout (#376)
  • Loading branch information
MarshalX authored Sep 1, 2024
1 parent 6149b14 commit 3d7e593
Show file tree
Hide file tree
Showing 3 changed files with 507 additions and 439 deletions.
38 changes: 29 additions & 9 deletions packages/atproto_firehose/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
from atproto_client.models.base import ParamsModelBase
from atproto_client.models.common import XrpcError
from atproto_core.exceptions import DAGCBORDecodingError
from websockets.client import connect as aconnect

try:
# new asyncio implementation is available since websockets 13.0
from websockets.asyncio.client import connect as aconnect
except ImportError:
# fallback to the legacy implementation for older versions
from websockets.client import connect as aconnect
from websockets.exceptions import (
ConnectionClosedError,
ConnectionClosedOK,
Expand All @@ -33,10 +39,26 @@
OnCallbackErrorCallback = t.Callable[[BaseException], None]
AsyncOnCallbackErrorCallback = t.Callable[[BaseException], t.Coroutine[t.Any, t.Any, None]]

_OK_ERRORS = (ConnectionClosedOK,)
_ERR_ERRORS = (
ConnectionError,
ConnectionClosedError,
InvalidHandshake,
PayloadTooBig,
ProtocolError,
socket.gaierror,
)


if t.TYPE_CHECKING:
from websockets.client import ClientConnection as SyncWebSocketClient
from websockets.legacy.client import Connect as AsyncConnect

try:
# new asyncio implementation is available since websockets 13.0
from websockets.asyncio.client import ClientConnection as AsyncConnect
except ImportError:
# fallback to the legacy implementation for older versions
from websockets.legacy.client import Connect as AsyncConnect


def _build_websocket_uri(method: str, base_uri: str, params: t.Optional[t.Dict[str, t.Any]] = None) -> str:
Expand All @@ -59,10 +81,11 @@ def _handle_frame_decoding_error(exception: Exception) -> None:

def _handle_websocket_error_or_stop(exception: Exception) -> bool:
"""Return if the connection should be properly being closed or reraise exception."""
if isinstance(exception, (ConnectionClosedOK,)):
if isinstance(exception, _OK_ERRORS):
return True
if isinstance(exception, (ConnectionClosedError, InvalidHandshake, PayloadTooBig, ProtocolError, socket.gaierror)):
if isinstance(exception, _ERR_ERRORS):
return False

if isinstance(exception, FirehoseError):
raise exception

Expand Down Expand Up @@ -110,13 +133,10 @@ def _websocket_uri(self) -> str:
return _build_websocket_uri(self._method, self._base_uri, self._params)

def _get_client(self) -> 'SyncWebSocketClient':
return connect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES)
return connect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=None)

def _get_async_client(self) -> 'AsyncConnect':
# FIXME(DXsmiley): I've noticed that the close operation often takes the entire timeout for some reason
# By default, this is 10 seconds, which is pretty long.
# Maybe shorten it?
return aconnect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES)
return aconnect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=None)

def _get_reconnection_delay(self) -> int:
base_sec = 2**self._reconnect_no
Expand Down
Loading

0 comments on commit 3d7e593

Please sign in to comment.