Skip to content

Commit

Permalink
Even more efficient httpx (#206)
Browse files Browse the repository at this point in the history
* Make async methods and blob_client use httpx client objects more efficiently.

---------

Co-authored-by: Raymond Wiker <[email protected]>
  • Loading branch information
rwiker and rwiker authored Jun 21, 2024
1 parent 5dde6f6 commit e399121
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 51 deletions.
18 changes: 10 additions & 8 deletions src/sumo/wrapper/_blob_client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import httpx

from ._decorators import (
raise_for_status,
raise_for_status_async,
)

from ._retry_strategy import RetryStrategy


class BlobClient:
"""Upload blobs to blob store using pre-authorized URLs"""

def __init__(self, retry_strategy=RetryStrategy()):
def __init__(self, client, async_client, timeout, retry_strategy):
self._client = client
self._async_client = client
self._timeout = timeout
self._retry_strategy = retry_strategy
return

Expand All @@ -30,7 +29,9 @@ def upload_blob(self, blob: bytes, url: str):
}

def _put():
return httpx.put(url, content=blob, headers=headers)
return self._client.put(
url, content=blob, headers=headers, timeout=self._timeout
)

retryer = self._retry_strategy.make_retryer()

Expand All @@ -51,8 +52,9 @@ async def upload_blob_async(self, blob: bytes, url: str):
}

async def _put():
async with httpx.AsyncClient() as client:
return await client.put(url=url, content=blob, headers=headers)
return await self._async_client.put(
url=url, content=blob, headers=headers, timeout=self._timeout
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down
116 changes: 73 additions & 43 deletions src/sumo/wrapper/sumo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

import asyncio
import httpx

import jwt

from ._blob_client import BlobClient
Expand All @@ -18,7 +17,7 @@

logger = logging.getLogger("sumo.wrapper")

DEFAULT_TIMEOUT = httpx.Timeout(20.0)
DEFAULT_TIMEOUT = httpx.Timeout(30.0)


class SumoClient:
Expand All @@ -32,6 +31,7 @@ def __init__(
devicecode: bool = False,
verbosity: str = "CRITICAL",
retry_strategy=RetryStrategy(),
timeout=DEFAULT_TIMEOUT,
):
"""Initialize a new Sumo object
Expand All @@ -49,8 +49,10 @@ def __init__(
raise ValueError(f"Invalid environment: {env}")

self._retry_strategy = retry_strategy
self._client = httpx
self._blob_client = BlobClient(retry_strategy)
self._client = httpx.Client(follow_redirects=True)
self._async_client = httpx.AsyncClient(follow_redirects=True)

self._timeout = timeout

access_token = None
refresh_token = None
Expand Down Expand Up @@ -94,12 +96,39 @@ def __init__(
return

def __enter__(self):
self._client = httpx.Client()
return self

def __exit__(self, exc_type, exc_value, traceback):
self._client.close()
return
self._client = None
return False

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self._async_client.aclose()
self._async_client = None
return False

def __del__(self):
if self._client is not None:
self._client.close()
self._client = None
pass
if self._async_client is not None:

async def closeit(client):
await client.aclose()
return

try:
loop = asyncio.get_running_loop()
loop.create_task(closeit(self._async_client))
except RuntimeError:
pass
self._async_client = None
pass

def authenticate(self):
if self.auth is None:
Expand All @@ -126,7 +155,12 @@ def blob_client(self) -> BlobClient:
await sumo.blob_client.upload_blob_async(blob, blob_url)
"""

return self._blob_client
return BlobClient(
self._client,
self._async_client,
self._timeout,
self._retry_strategy,
)

@raise_for_status
def get(self, path: str, params: dict = None) -> dict:
Expand Down Expand Up @@ -169,7 +203,7 @@ def _get():
params=params,
headers=headers,
follow_redirects=True,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -244,7 +278,7 @@ def _post():
json=json,
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -290,7 +324,7 @@ def _put():
content=blob,
json=json,
headers=headers,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -328,7 +362,7 @@ def _delete():
f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -389,13 +423,12 @@ async def get_async(self, path: str, params: dict = None):
headers.update(self.auth.get_authorization())

async def _get():
async with httpx.AsyncClient(follow_redirects=True) as client:
return await client.get(
f"{self.base_url}{path}",
params=params,
headers=headers,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.get(
f"{self.base_url}{path}",
params=params,
headers=headers,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -464,15 +497,14 @@ async def post_async(
headers.update(self.auth.get_authorization())

async def _post():
async with httpx.AsyncClient() as client:
return await client.post(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.post(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
params=params,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -512,14 +544,13 @@ async def put_async(
headers.update(self.auth.get_authorization())

async def _put():
async with httpx.AsyncClient() as client:
return await client.put(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.put(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -552,13 +583,12 @@ async def delete_async(self, path: str, params: dict = None) -> dict:
headers.update(self.auth.get_authorization())

async def _delete():
async with httpx.AsyncClient() as client:
return await client.delete(
url=f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.delete(
url=f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down

0 comments on commit e399121

Please sign in to comment.