From a22c0d154fa4903da4f0ceb09c9b1c1db45742e6 Mon Sep 17 00:00:00 2001 From: Raymond Wiker Date: Thu, 20 Jun 2024 12:54:24 +0200 Subject: [PATCH 1/5] Make async methods and blob_client use httpx client objects more efficiently. --- src/sumo/wrapper/_blob_client.py | 15 +++-- src/sumo/wrapper/sumo_client.py | 102 ++++++++++++++++++------------- 2 files changed, 68 insertions(+), 49 deletions(-) diff --git a/src/sumo/wrapper/_blob_client.py b/src/sumo/wrapper/_blob_client.py index 35ac785..c4076bd 100644 --- a/src/sumo/wrapper/_blob_client.py +++ b/src/sumo/wrapper/_blob_client.py @@ -1,17 +1,16 @@ -import httpx - from ._decorators import ( raise_for_status, raise_for_status_async, ) -from ._retry_strategy import RetryStrategy - class BlobClient: """Upload blobs to blob store using pre-authorized URLs""" - def __init__(self, retry_strategy=RetryStrategy()): + def __init__(self, client, async_client, timeout, retry_strategy): + self._client = client + self._async_client = client + self._timeout = timeout self._retry_strategy = retry_strategy return @@ -30,12 +29,13 @@ def upload_blob(self, blob: bytes, url: str): } def _put(): - return httpx.put(url, content=blob, headers=headers) + return self._client.put(url, content=blob, headers=headers, timeout = self._timeout) retryer = self._retry_strategy.make_retryer() return retryer(_put) + @raise_for_status_async async def upload_blob_async(self, blob: bytes, url: str): """Upload a blob async. @@ -51,8 +51,7 @@ async def upload_blob_async(self, blob: bytes, url: str): } async def _put(): - async with httpx.AsyncClient() as client: - return await client.put(url=url, content=blob, headers=headers) + return await self._async_client.put(url=url, content=blob, headers=headers, timeout=self._timeout) retryer = self._retry_strategy.make_retryer_async() diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index af8fa29..57579a3 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -18,7 +18,7 @@ logger = logging.getLogger("sumo.wrapper") -DEFAULT_TIMEOUT = httpx.Timeout(20.0) +DEFAULT_TIMEOUT = httpx.Timeout(30.0) class SumoClient: @@ -32,6 +32,7 @@ def __init__( devicecode: bool = False, verbosity: str = "CRITICAL", retry_strategy=RetryStrategy(), + timeout = DEFAULT_TIMEOUT, ): """Initialize a new Sumo object @@ -49,8 +50,10 @@ def __init__( raise ValueError(f"Invalid environment: {env}") self._retry_strategy = retry_strategy - self._client = httpx - self._blob_client = BlobClient(retry_strategy) + self._client = httpx.Client(follow_redirects=True) + self._async_client = httpx.AsyncClient(follow_redirects=True) + + self._timeout = timeout access_token = None refresh_token = None @@ -94,12 +97,33 @@ def __init__( return def __enter__(self): - self._client = httpx.Client() return self def __exit__(self, exc_type, exc_value, traceback): self._client.close() - return + self._client = None + self._async_client.close() + self._async_client = None + return False + + def __aenter__(self): + return self + + def __aexit__(self, exc_type, exc_value, traceback): + self._client.close() + self._client = None + self._async_client.close() + self._async_client = None + return False + + def __del__(self): + if self._client is not None: + self._client.close() + self._client = None + pass + if self._async_client is not None: + self._async_client.close() + self._async_client = None def authenticate(self): if self.auth is None: @@ -126,7 +150,7 @@ def blob_client(self) -> BlobClient: await sumo.blob_client.upload_blob_async(blob, blob_url) """ - return self._blob_client + return BlobClient(self._client, self._async_client, self._timeout, self._retry_strategy) @raise_for_status def get(self, path: str, params: dict = None) -> dict: @@ -169,7 +193,7 @@ def _get(): params=params, headers=headers, follow_redirects=True, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -244,7 +268,7 @@ def _post(): json=json, headers=headers, params=params, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -290,7 +314,7 @@ def _put(): content=blob, json=json, headers=headers, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -328,7 +352,7 @@ def _delete(): f"{self.base_url}{path}", headers=headers, params=params, - timeout=DEFAULT_TIMEOUT, + timeout=self._timeout, ) retryer = self._retry_strategy.make_retryer() @@ -389,13 +413,12 @@ async def get_async(self, path: str, params: dict = None): headers.update(self.auth.get_authorization()) async def _get(): - async with httpx.AsyncClient(follow_redirects=True) as client: - return await client.get( - f"{self.base_url}{path}", - params=params, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.get( + f"{self.base_url}{path}", + params=params, + headers=headers, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -464,15 +487,14 @@ async def post_async( headers.update(self.auth.get_authorization()) async def _post(): - async with httpx.AsyncClient() as client: - return await client.post( - url=f"{self.base_url}{path}", - content=blob, - json=json, - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.post( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + params=params, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -512,14 +534,13 @@ async def put_async( headers.update(self.auth.get_authorization()) async def _put(): - async with httpx.AsyncClient() as client: - return await client.put( - url=f"{self.base_url}{path}", - content=blob, - json=json, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.put( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() @@ -552,13 +573,12 @@ async def delete_async(self, path: str, params: dict = None) -> dict: headers.update(self.auth.get_authorization()) async def _delete(): - async with httpx.AsyncClient() as client: - return await client.delete( - url=f"{self.base_url}{path}", - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) + return await self._async_client.delete( + url=f"{self.base_url}{path}", + headers=headers, + params=params, + timeout=self._timeout, + ) retryer = self._retry_strategy.make_retryer_async() From 8f96933482de64fdc95620de98616fad3ba47ec9 Mon Sep 17 00:00:00 2001 From: Raymond Wiker Date: Thu, 20 Jun 2024 12:59:58 +0200 Subject: [PATCH 2/5] Make black and flake8 happy. --- src/sumo/wrapper/_blob_client.py | 9 ++++++--- src/sumo/wrapper/sumo_client.py | 9 +++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/sumo/wrapper/_blob_client.py b/src/sumo/wrapper/_blob_client.py index c4076bd..ea7da86 100644 --- a/src/sumo/wrapper/_blob_client.py +++ b/src/sumo/wrapper/_blob_client.py @@ -29,13 +29,14 @@ def upload_blob(self, blob: bytes, url: str): } def _put(): - return self._client.put(url, content=blob, headers=headers, timeout = self._timeout) + return self._client.put( + url, content=blob, headers=headers, timeout=self._timeout + ) retryer = self._retry_strategy.make_retryer() return retryer(_put) - @raise_for_status_async async def upload_blob_async(self, blob: bytes, url: str): """Upload a blob async. @@ -51,7 +52,9 @@ async def upload_blob_async(self, blob: bytes, url: str): } async def _put(): - return await self._async_client.put(url=url, content=blob, headers=headers, timeout=self._timeout) + return await self._async_client.put( + url=url, content=blob, headers=headers, timeout=self._timeout + ) retryer = self._retry_strategy.make_retryer_async() diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index 57579a3..8f41111 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -32,7 +32,7 @@ def __init__( devicecode: bool = False, verbosity: str = "CRITICAL", retry_strategy=RetryStrategy(), - timeout = DEFAULT_TIMEOUT, + timeout=DEFAULT_TIMEOUT, ): """Initialize a new Sumo object @@ -150,7 +150,12 @@ def blob_client(self) -> BlobClient: await sumo.blob_client.upload_blob_async(blob, blob_url) """ - return BlobClient(self._client, self._async_client, self._timeout, self._retry_strategy) + return BlobClient( + self._client, + self._async_client, + self._timeout, + self._retry_strategy, + ) @raise_for_status def get(self, path: str, params: dict = None) -> dict: From c7ad443f0cdcdc3a53ce4ddb7037ca907bcce0fe Mon Sep 17 00:00:00 2001 From: Raymond Wiker Date: Thu, 20 Jun 2024 15:09:50 +0200 Subject: [PATCH 3/5] __aenter__ and __aexit__ are async functions, obviously. --- src/sumo/wrapper/sumo_client.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index 8f41111..8fdd30f 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -1,7 +1,6 @@ import logging - +import asyncio import httpx - import jwt from ._blob_client import BlobClient @@ -102,17 +101,13 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self._client.close() self._client = None - self._async_client.close() - self._async_client = None return False - def __aenter__(self): + async def __aenter__(self): return self - def __aexit__(self, exc_type, exc_value, traceback): - self._client.close() - self._client = None - self._async_client.close() + async def __aexit__(self, exc_type, exc_value, traceback): + await self._async_client.aclose() self._async_client = None return False @@ -122,7 +117,10 @@ def __del__(self): self._client = None pass if self._async_client is not None: - self._async_client.close() + # async def closeit(): + # await self._async_client.aclose() + # return + # asyncio.run(closeit()) self._async_client = None def authenticate(self): From 3a53f0161959e60341ca01cc674354b48292b015 Mon Sep 17 00:00:00 2001 From: Raymond Wiker Date: Fri, 21 Jun 2024 11:52:22 +0200 Subject: [PATCH 4/5] Working(?) code to close async client in SumoClient.__del__ --- src/sumo/wrapper/sumo_client.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index 8fdd30f..9f43e45 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -1,3 +1,4 @@ +import sys import logging import asyncio import httpx @@ -117,11 +118,18 @@ def __del__(self): self._client = None pass if self._async_client is not None: - # async def closeit(): - # await self._async_client.aclose() - # return - # asyncio.run(closeit()) + + async def closeit(client): + await client.aclose() + return + + try: + loop = asyncio.get_running_loop() + loop.create_task(closeit(self._async_client)) + except RuntimeError as ex: + pass self._async_client = None + pass def authenticate(self): if self.auth is None: From 56bc95b18f9828942076403e9cafaed4e697540f Mon Sep 17 00:00:00 2001 From: Raymond Wiker Date: Fri, 21 Jun 2024 11:54:24 +0200 Subject: [PATCH 5/5] Make flake8 happy, again. --- src/sumo/wrapper/sumo_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index 9f43e45..d82df39 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -1,4 +1,3 @@ -import sys import logging import asyncio import httpx @@ -126,7 +125,7 @@ async def closeit(client): try: loop = asyncio.get_running_loop() loop.create_task(closeit(self._async_client)) - except RuntimeError as ex: + except RuntimeError: pass self._async_client = None pass