Skip to content

Commit

Permalink
Fix firehose client stop (#384)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarshalX authored Sep 10, 2024
1 parent 2b6b85e commit e5c6a3b
Showing 1 changed file with 4 additions and 16 deletions.
20 changes: 4 additions & 16 deletions packages/atproto_firehose/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@
from atproto_client.models.base import ParamsModelBase
from atproto_client.models.common import XrpcError
from atproto_core.exceptions import DAGCBORDecodingError

try:
# new asyncio implementation is available since websockets 13.0
from websockets.asyncio.client import connect as aconnect
except ImportError:
# fallback to the legacy implementation for older versions
from websockets.client import connect as aconnect
from websockets.client import connect as aconnect
from websockets.exceptions import (
ConnectionClosedError,
ConnectionClosedOK,
Expand Down Expand Up @@ -52,13 +46,7 @@

if t.TYPE_CHECKING:
from websockets.client import ClientConnection as SyncWebSocketClient

try:
# new asyncio implementation is available since websockets 13.0
from websockets.asyncio.client import ClientConnection as AsyncConnect
except ImportError:
# fallback to the legacy implementation for older versions
from websockets.legacy.client import Connect as AsyncConnect
from websockets.legacy.client import Connect as AsyncConnect


def _build_websocket_uri(method: str, base_uri: str, params: t.Optional[t.Dict[str, t.Any]] = None) -> str:
Expand Down Expand Up @@ -133,10 +121,10 @@ def _websocket_uri(self) -> str:
return _build_websocket_uri(self._method, self._base_uri, self._params)

def _get_client(self) -> 'SyncWebSocketClient':
return connect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=None)
return connect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=0.1)

def _get_async_client(self) -> 'AsyncConnect':
return aconnect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=None)
return aconnect(self._websocket_uri, max_size=_MAX_MESSAGE_SIZE_BYTES, close_timeout=0.1)

def _get_reconnection_delay(self) -> int:
base_sec = 2**self._reconnect_no
Expand Down

0 comments on commit e5c6a3b

Please sign in to comment.