Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Even more efficient httpx #206

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/sumo/wrapper/_blob_client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import httpx

from ._decorators import (
raise_for_status,
raise_for_status_async,
)

from ._retry_strategy import RetryStrategy


class BlobClient:
"""Upload blobs to blob store using pre-authorized URLs"""

def __init__(self, retry_strategy=RetryStrategy()):
def __init__(self, client, async_client, timeout, retry_strategy):
self._client = client
self._async_client = client
self._timeout = timeout
self._retry_strategy = retry_strategy
return

Expand All @@ -30,7 +29,9 @@ def upload_blob(self, blob: bytes, url: str):
}

def _put():
return httpx.put(url, content=blob, headers=headers)
return self._client.put(
url, content=blob, headers=headers, timeout=self._timeout
)

retryer = self._retry_strategy.make_retryer()

Expand All @@ -51,8 +52,9 @@ async def upload_blob_async(self, blob: bytes, url: str):
}

async def _put():
async with httpx.AsyncClient() as client:
return await client.put(url=url, content=blob, headers=headers)
return await self._async_client.put(
url=url, content=blob, headers=headers, timeout=self._timeout
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down
116 changes: 73 additions & 43 deletions src/sumo/wrapper/sumo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

import asyncio
import httpx

import jwt

from ._blob_client import BlobClient
Expand All @@ -18,7 +17,7 @@

logger = logging.getLogger("sumo.wrapper")

DEFAULT_TIMEOUT = httpx.Timeout(20.0)
DEFAULT_TIMEOUT = httpx.Timeout(30.0)


class SumoClient:
Expand All @@ -32,6 +31,7 @@ def __init__(
devicecode: bool = False,
verbosity: str = "CRITICAL",
retry_strategy=RetryStrategy(),
timeout=DEFAULT_TIMEOUT,
):
"""Initialize a new Sumo object

Expand All @@ -49,8 +49,10 @@ def __init__(
raise ValueError(f"Invalid environment: {env}")

self._retry_strategy = retry_strategy
self._client = httpx
self._blob_client = BlobClient(retry_strategy)
self._client = httpx.Client(follow_redirects=True)
self._async_client = httpx.AsyncClient(follow_redirects=True)

self._timeout = timeout

access_token = None
refresh_token = None
Expand Down Expand Up @@ -94,12 +96,39 @@ def __init__(
return

def __enter__(self):
self._client = httpx.Client()
return self

def __exit__(self, exc_type, exc_value, traceback):
self._client.close()
return
self._client = None
return False

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self._async_client.aclose()
self._async_client = None
return False

def __del__(self):
if self._client is not None:
self._client.close()
self._client = None
pass
if self._async_client is not None:

async def closeit(client):
await client.aclose()
return

try:
loop = asyncio.get_running_loop()
loop.create_task(closeit(self._async_client))
except RuntimeError:
pass
self._async_client = None
pass

def authenticate(self):
if self.auth is None:
Expand All @@ -126,7 +155,12 @@ def blob_client(self) -> BlobClient:
await sumo.blob_client.upload_blob_async(blob, blob_url)
"""

return self._blob_client
return BlobClient(
self._client,
self._async_client,
self._timeout,
self._retry_strategy,
)

@raise_for_status
def get(self, path: str, params: dict = None) -> dict:
Expand Down Expand Up @@ -169,7 +203,7 @@ def _get():
params=params,
headers=headers,
follow_redirects=True,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -244,7 +278,7 @@ def _post():
json=json,
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -290,7 +324,7 @@ def _put():
content=blob,
json=json,
headers=headers,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -328,7 +362,7 @@ def _delete():
f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer()
Expand Down Expand Up @@ -389,13 +423,12 @@ async def get_async(self, path: str, params: dict = None):
headers.update(self.auth.get_authorization())

async def _get():
async with httpx.AsyncClient(follow_redirects=True) as client:
return await client.get(
f"{self.base_url}{path}",
params=params,
headers=headers,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.get(
f"{self.base_url}{path}",
params=params,
headers=headers,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -464,15 +497,14 @@ async def post_async(
headers.update(self.auth.get_authorization())

async def _post():
async with httpx.AsyncClient() as client:
return await client.post(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.post(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
params=params,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -512,14 +544,13 @@ async def put_async(
headers.update(self.auth.get_authorization())

async def _put():
async with httpx.AsyncClient() as client:
return await client.put(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.put(
url=f"{self.base_url}{path}",
content=blob,
json=json,
headers=headers,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down Expand Up @@ -552,13 +583,12 @@ async def delete_async(self, path: str, params: dict = None) -> dict:
headers.update(self.auth.get_authorization())

async def _delete():
async with httpx.AsyncClient() as client:
return await client.delete(
url=f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=DEFAULT_TIMEOUT,
)
return await self._async_client.delete(
url=f"{self.base_url}{path}",
headers=headers,
params=params,
timeout=self._timeout,
)

retryer = self._retry_strategy.make_retryer_async()

Expand Down