Skip to content

Commit

Permalink
Support websocket with asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
perklet committed Nov 17, 2023
1 parent 43bed85 commit 2c98a72
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
4 changes: 4 additions & 0 deletions curl_cffi/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def _set_error_buffer(self):
self.setopt(CurlOpt.VERBOSE, 1)
lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)

def debug(self):
self.setopt(CurlOpt.VERBOSE, 1)
lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)

def __del__(self):
self.close()

Expand Down
24 changes: 14 additions & 10 deletions curl_cffi/requests/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .errors import RequestsError
from .headers import Headers, HeaderTypes
from .models import Request, Response
from .websockets import WebSocket, AsyncWebSocket
from .websockets import WebSocket

try:
import gevent
Expand Down Expand Up @@ -560,7 +560,7 @@ def connect(self, url, *args, **kwargs):
# https://curl.se/docs/websocket.html
self.curl.setopt(CurlOpt.CONNECT_ONLY, 2)
self.curl.perform()
return WebSocket(self)
return WebSocket(self, self.curl)

def request(
self,
Expand Down Expand Up @@ -731,7 +731,7 @@ def __init__(
```
"""
super().__init__(**kwargs)
self.loop = loop
self._loop = loop
self._acurl = async_curl
self.max_clients = max_clients
self._closed = False
Expand All @@ -742,10 +742,14 @@ def __init__(
):
warnings.warn(WINDOWS_WARN)

@property
def loop(self):
if self._loop is None:
self._loop = asyncio.get_running_loop()
return self._loop

@property
def acurl(self):
if self.loop is None:
self.loop = asyncio.get_running_loop()
if self._acurl is None:
self._acurl = AsyncCurl(loop=self.loop)
return self._acurl
Expand Down Expand Up @@ -806,13 +810,13 @@ async def stream(self, *args, **kwargs):
finally:
await rsp.aclose()

async def connect(self, *args, **kwargs):
async def connect(self, url, *args, **kwargs):
curl = await self.pop_curl()
self._set_curl_options(*args, **kwargs)
# curl.debug()
self._set_curl_options(curl, "GET", url, *args, **kwargs)
curl.setopt(CurlOpt.CONNECT_ONLY, 2) # https://curl.se/docs/websocket.html
task = self.acurl.add_handle(curl)
await task
return AsyncWebSocket(self, curl)
await self.loop.run_in_executor(None, curl.perform)
return WebSocket(self, curl)

async def request(
self,
Expand Down
42 changes: 22 additions & 20 deletions curl_cffi/requests/websockets.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import asyncio
from curl_cffi.const import CurlECode, CurlWsFlag
from curl_cffi.curl import CurlError


class WebSocket:
def __init__(self, session):
def __init__(self, session, curl):
self.session = session

@property
def c(self):
return self.session.curl
self.curl = curl
self._loop = None

def recv_fragment(self):
return self.c.ws_recv()
return self.curl.ws_recv()

def recv(self):
chunks = []
# TODO use select here
while True:
try:
chunk, frame = self.c.ws_recv()
chunk, frame = self.curl.ws_recv()
chunks.append(chunk)
if frame.bytesleft == 0:
break
Expand All @@ -31,22 +30,25 @@ def recv(self):
return b"".join(chunks)

def send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY):
return self.c.ws_send(payload, flags)
return self.curl.ws_send(payload, flags)

def close(self):
return self.c.close()

# FIXME how to reset. or can a curl handle connect to two websockets?
self.curl.close()

class AsyncWebSocket:
def __init__(self, session, curl):
self.session = session
self.curl = curl
@property
def loop(self):
if self._loop is None:
self._loop = asyncio.get_running_loop()
return self._loop

async def recv(self):
return await self.curl.ws_recv()
async def arecv(self):
return await self.loop.run_in_executor(None, self.recv)

async def send(self):
return await self.curl.ws_send()
async def asend(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY):
return await self.loop.run_in_executor(None, self.send, payload, flags)

async def close(self):
return await self.curl.close()
async def aclose(self):
await self.loop.run_in_executor(None, self.close)
self.curl.reset()
self.session.push_curl(curl)
14 changes: 14 additions & 0 deletions examples/websocket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
import asyncio
from curl_cffi import requests

with requests.Session() as s:
w = s.connect("ws://localhost:8765")
w.send(b"Foo")
reply = w.recv()
print(reply)
assert reply == b"Hello Foo!"


async def async_examples():
async with requests.AsyncSession() as s:
w = await s.connect("ws://localhost:8765")
await w.asend(b"Bar")
reply = await w.arecv()
print(reply)
assert reply == b"Hello Bar!"


asyncio.run(async_examples())

0 comments on commit 2c98a72

Please sign in to comment.