diff --git a/src/sumo/wrapper/_blob_client.py b/src/sumo/wrapper/_blob_client.py index 35ac785..ea7da86 100644 --- a/src/sumo/wrapper/_blob_client.py +++ b/src/sumo/wrapper/_blob_client.py @@ -1,17 +1,16 @@ -import httpx - from ._decorators import ( raise_for_status, raise_for_status_async, ) -from ._retry_strategy import RetryStrategy - class BlobClient: """Upload blobs to blob store using pre-authorized URLs""" - def __init__(self, retry_strategy=RetryStrategy()): + def __init__(self, client, async_client, timeout, retry_strategy): + self._client = client + self._async_client = client + self._timeout = timeout self._retry_strategy = retry_strategy return @@ -30,7 +29,9 @@ def upload_blob(self, blob: bytes, url: str): } def _put(): - return httpx.put(url, content=blob, headers=headers) + return self._client.put( + url, content=blob, headers=headers, timeout=self._timeout + ) retryer = self._retry_strategy.make_retryer() @@ -51,8 +52,9 @@ async def upload_blob_async(self, blob: bytes, url: str): } async def _put(): - async with httpx.AsyncClient() as client: - return await client.put(url=url, content=blob, headers=headers) + return await self._async_client.put( + url=url, content=blob, headers=headers, timeout=self._timeout + ) retryer = self._retry_strategy.make_retryer_async() diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index af8fa29..d82df39 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -1,7 +1,6 @@ import logging - +import asyncio import httpx - import jwt from ._blob_client import BlobClient @@ -18,7 +17,7 @@ logger = logging.getLogger("sumo.wrapper") -DEFAULT_TIMEOUT = httpx.Timeout(20.0) +DEFAULT_TIMEOUT = httpx.Timeout(30.0) class SumoClient: @@ -32,6 +31,7 @@ def __init__( devicecode: bool = False, verbosity: str = "CRITICAL", retry_strategy=RetryStrategy(), + timeout=DEFAULT_TIMEOUT, ): """Initialize a new Sumo object @@ -49,8 +49,10 @@ def __init__( raise ValueError(f"Invalid environment: {env}") self._retry_strategy = retry_strategy - self._client = httpx - self._blob_client = BlobClient(retry_strategy) + self._client = httpx.Client(follow_redirects=True) + self._async_client = httpx.AsyncClient(follow_redirects=True) + + self._timeout = timeout access_token = None refresh_token = None @@ -94,12 +96,39 @@ def __init__( return def __enter__(self): - self._client = httpx.Client() return self def __exit__(self, exc_type, exc_value, traceback): self._client.close() - return + self._client = None + return False + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._async_client.aclose() + self._async_client = None + return False + + def __del__(self): + if self._client is not None: + self._client.close() + self._client = None + pass + if self._async_client is not None: + + async def closeit(client): + await client.aclose() + return + + try: + loop = asyncio.get_running_loop() + loop.create_task(closeit(self._async_client)) + except RuntimeError: + pass + self._async_client = None + pass def authenticate(self): if self.auth is None: @@ -126,7 +155,12 @@ def blob_client(self) -> BlobClient: await sumo.blob_client.upload_blob_async(blob, blob_url) """ - return self._blob_client + return BlobClient( + self._client, + self._async_client, + self._timeout, + self._retry_strategy, + ) @raise_for_status def get(self, path: str, params: dict = None) -> dict: @@ -169,7 +203,7 @@ def _get(): params=params, headers=headers, follow_redirects=True, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -244,7 +278,7 @@ def _post(): json=json, headers=headers, params=params, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -290,7 +324,7 @@ def _put(): content=blob, json=json, headers=headers, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -328,7 +362,7 @@ def _delete(): f"{self.base_url}{path}", headers=headers, params=params, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -389,13 +423,12 @@ async def get_async(self, path: str, params: dict = None): headers.update(self.auth.get_authorization()) async def _get(): - async with httpx.AsyncClient(follow_redirects=True) as client: - return await client.get( - f"{self.base_url}{path}", - params=params, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.get( + f"{self.base_url}{path}", + params=params, + headers=headers, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -464,15 +497,14 @@ async def post_async( headers.update(self.auth.get_authorization()) async def _post(): - async with httpx.AsyncClient() as client: - return await client.post( - url=f"{self.base_url}{path}", - content=blob, - json=json, - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.post( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + params=params, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -512,14 +544,13 @@ async def put_async( headers.update(self.auth.get_authorization()) async def _put(): - async with httpx.AsyncClient() as client: - return await client.put( - url=f"{self.base_url}{path}", - content=blob, - json=json, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.put( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -552,13 +583,12 @@ async def delete_async(self, path: str, params: dict = None) -> dict: headers.update(self.auth.get_authorization()) async def _delete(): - async with httpx.AsyncClient() as client: - return await client.delete( - url=f"{self.base_url}{path}", - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.delete( + url=f"{self.base_url}{path}", + headers=headers, + params=params, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async()