diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index b136e3e..d182b82 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.7", "3.8", "3.9"] + python-version: ["3.8", "3.9"] permissions: contents: read id-token: write diff --git a/requirements/requirements.txt b/requirements/requirements.txt index f96e654..be1b9bb 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,4 +3,5 @@ msal-extensions>=1.0.0 PyYAML>=5.4 setuptools>=49.2.1 pyjwt>=2.4.0 -httpx>=0.24.1 +httpx>=0.25.0 +tenacity>=8.2.3 diff --git a/setup.py b/setup.py index 300ab1e..ac45090 100644 --- a/setup.py +++ b/setup.py @@ -42,13 +42,15 @@ def parse_requirements(fname): "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "Natural Language :: English", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ], use_scm_version={"write_to": "src/sumo/wrapper/version.py"}, author="Equinor ASA", install_requires=REQUIREMENTS, - python_requires=">=3.6", + python_requires=">=3.8", packages=find_packages("src"), package_dir={"": "src"}, include_package_data=True, diff --git a/src/sumo/wrapper/__init__.py b/src/sumo/wrapper/__init__.py index 62ebcb2..b9f0c40 100644 --- a/src/sumo/wrapper/__init__.py +++ b/src/sumo/wrapper/__init__.py @@ -1,4 +1,3 @@ -from ._call_sumo_api import CallSumoApi from .sumo_client import SumoClient -__all__ = ["CallSumoApi", "SumoClient"] +__all__ = ["SumoClient"] diff --git a/src/sumo/wrapper/_auth.py b/src/sumo/wrapper/_auth.py deleted file mode 100644 index 0aa1a96..0000000 --- a/src/sumo/wrapper/_auth.py +++ /dev/null @@ -1,224 +0,0 @@ -import datetime -import msal -import json -import stat -import sys -import os -import logging - - -logger = logging.getLogger("sumo.wrapper") - -TENANT = "3aa4a235-b6e2-48d5-9195-7fcf05b459b0" - -AUTHORITY_HOST_URI = "https://login.microsoftonline.com" -AUTHORITY_URI = AUTHORITY_HOST_URI + "/" + TENANT -HOME_DIR = os.path.expanduser("~") - - -class Auth: - def __init__( - self, - client_id, - resource_id, - authority=AUTHORITY_URI, - client_credentials=None, - writeback=False, - verbosity="CRITICAL", - ): - logger.setLevel(verbosity) - logger.debug("Initialize Auth") - self.client_id = client_id - logger.debug("client_id is %s", self.client_id) - self.resource_id = resource_id - logger.debug("client_id is %s", self.client_id) - self.scope = self.resource_id + "/.default" - self.authority = authority - self.client_credentials = client_credentials - self.writeback = writeback - logger.debug("self.writeback is %s", self.writeback) - self.token_path = os.path.join( - HOME_DIR, ".sumo", str(self.resource_id) + ".token" - ) - self._get_cache() - self.app = msal.PublicClientApplication( - self.client_id, - authority=AUTHORITY_URI, - client_credential=self.client_credentials, - token_cache=self.cache, - ) - - logger.debug("self.app has been initialized") - logger.debug("Getting accounts") - self.accounts = self.app.get_accounts() - - logger.debug("self.accounts is %s", self.accounts) - - if self._cache_available(): - if not self.accounts: - logger.debug("Token cache found but have no accounts") - if self.writeback: - logger.debug("Writeback is True, running device_code") - self._oauth_device_code() - else: - raise RuntimeError( - "The locally stored token has no accounts. " - "Please check your access or run 'sumo_login' to re-create your token." - ) - else: - logger.debug( - "There are accounts. Calling _oauth_get_token_silent()" - ) - if not self._oauth_get_token_silent(): - logger.debug("self._oauth_get_token_silent returned False") - if self.writeback: - logger.debug( - "self.writeback is True, calling device_code" - ) - self._oauth_device_code() - - else: - logger.debug("No token cache found, reauthenticate") - self._oauth_device_code() - - def get_token(self): - logger.debug("Starting get_token") - - is_expired = self.is_token_expired() - - logger.debug("self.is_token_expired is %s", str(is_expired)) - - if is_expired: - self._oauth_get_token_silent() - - logger.debug( - "Returning access_token. Length of access token is %s", - len(self.result["access_token"]), - ) - return self.result["access_token"] - - def is_token_expired(self): - """ - Check if token is expired or about to expire. - """ - logger.debug("is_token_expired() is starting") - is_expired = datetime.datetime.now() > self.expiring_date - logger.debug("is_expired: %s", str(is_expired)) - return is_expired - - def _oauth_get_token_silent(self): - logger.debug("_oauth_get_token_silent starting") - logger.info("Getting access token") - if not self.accounts: - logger.debug("Get accounts") - self.accounts = self.app.get_accounts() - - if not self._check_token_security(): - raise SystemError("The token is not stored safely.") - - self.result = self.app.acquire_token_silent_with_error( - [self.scope], account=self.accounts[0] - ) - - if "access_token" in self.result: - logger.info("Access token found") - elif "error" in self.result: - logger.info("Error getting access token") - logger.debug(self.result["error"]) - return False - else: - logger.info("Failed getting access token") - return False - - self._set_expiring_date(int(self.result["expires_in"])) - - if self.writeback: - self._write_cache() - - logger.debug("_oauth_get_token_silent() has finished") - - return True - - def _set_expiring_date(self, time_left, threshold=60): - """ - Defines the access token expiring date. Sets a threshold to update the token before it expires - - Parameter - time_left: time, in seconds, until the token expires. - threshold: how many seconds before expiration the token is allowed to be updated. - """ - self.expiring_date = datetime.datetime.now() + datetime.timedelta( - seconds=time_left - threshold - ) - logger.debug("self.expiring_date set to %s", self.expiring_date) - - def _cache_available(self): - if os.path.isfile(self.token_path): - logger.debug("cache is available") - return True - logger.debug("cache is not available") - return False - - def _check_token_security(self): - if sys.platform.lower().startswith("win"): - return True - - access_stats = os.stat(self.token_path) - - return not bool(access_stats.st_mode & (stat.S_IRWXG | stat.S_IRWXO)) - - def _oauth_device_code(self): - flow = self.app.initiate_device_flow(scopes=[self.scope]) - - if "user_code" not in flow: - raise ValueError( - "Fail to create device flow. Err: %s" - % json.dumps(flow, indent=4) - ) - else: - print(flow["message"]) - - self.result = self.app.acquire_token_by_device_flow(flow) - try: - self._set_expiring_date(int(self.result["expires_in"])) - except KeyError: - logger.debug(self.result) - self._write_cache() - - def _write_cache(self): - logger.debug("Writing cache") - old_mask = os.umask(0o077) - - dir_path = os.path.dirname(self.token_path) - os.makedirs(dir_path, exist_ok=True) - - with open(self.token_path, "w") as file: - logger.debug("Writing to %s", self.token_path) - file.write(self.cache.serialize()) - - if not sys.platform.lower().startswith("win"): - os.chmod(self.token_path, 0o600) - os.chmod(dir_path, 0o700) - - os.umask(old_mask) - - def _read_cache(self): - with open(self.token_path, "r") as file: - logger.debug("Reading from %s", self.token_path) - self.cache.deserialize(file.read()) - - def _get_cache(self): - logger.debug("_get_cache") - self.cache = msal.SerializableTokenCache() - - if self._cache_available(): - logger.debug("cache is available, reading it") - self._read_cache() - - -if __name__ == "__main__": - auth = Auth( - "1826bd7c-582f-4838-880d-5b4da5c3eea2", - "88d2b022-3539-4dda-9e66-853801334a86", - ) - logger.debug(auth.get_token()) diff --git a/src/sumo/wrapper/_blob_client.py b/src/sumo/wrapper/_blob_client.py index 62d0f40..9e5c59f 100644 --- a/src/sumo/wrapper/_blob_client.py +++ b/src/sumo/wrapper/_blob_client.py @@ -1,11 +1,13 @@ import httpx -from ._request_error import raise_request_error_exception +from .decorators import raise_for_status, http_retry class BlobClient: """Upload blobs to blob store using pre-authorized URLs""" + @raise_for_status + @http_retry def upload_blob(self, blob: bytes, url: str): """Upload a blob. @@ -16,20 +18,15 @@ def upload_blob(self, blob: bytes, url: str): headers = { "Content-Type": "application/octet-stream", - "Content-Length": str(len(blob)), "x-ms-blob-type": "BlockBlob", } - try: - response = httpx.put(url, data=blob, headers=headers) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) + response = httpx.put(url, content=blob, headers=headers) return response + @raise_for_status + @http_retry async def upload_blob_async(self, blob: bytes, url: str): """Upload a blob async. @@ -40,19 +37,10 @@ async def upload_blob_async(self, blob: bytes, url: str): headers = { "Content-Type": "application/octet-stream", - "Content-Length": str(len(blob)), "x-ms-blob-type": "BlockBlob", } - try: - async with httpx.AsyncClient() as client: - response = await client.put( - url=url, data=blob, headers=headers - ) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) + async with httpx.AsyncClient() as client: + response = await client.put(url=url, content=blob, headers=headers) return response diff --git a/src/sumo/wrapper/_call_azure_api.py b/src/sumo/wrapper/_call_azure_api.py deleted file mode 100644 index 3efc24d..0000000 --- a/src/sumo/wrapper/_call_azure_api.py +++ /dev/null @@ -1,329 +0,0 @@ -import httpx -import logging - -from ._auth import Auth -from ._request_error import AuthenticationError, TransientError, PermanentError - -logger = logging.getLogger("sumo.wrapper") - - -def _raise_request_error_exception(code, message): - """ - Raise the proper authentication error according to the code received from sumo. - """ - - logger.debug("code: %s", code) - logger.debug("message: %s", message) - - if 502 <= code <= 504 or code == 404 or code == 500: - raise TransientError(code, message) - elif 401 <= code <= 403: - raise AuthenticationError(code, message) - else: - raise PermanentError(code, message) - - -class CallAzureApi: - """ - This class can be used for generating an Azure OAuth2 bear token and send a request to Azure JSON rest endpoint. - The Azure clientId "1826bd7c-582f-4838-880d-5b4da5c3eea2" needs to have permissions to the resourceId sent in. - - Parameters - resourceId: - Need to be an Azure resourceId - """ - - def __init__( - self, - resource_id, - client_id, - outside_token=False, - writeback=False, - verbosity="CRITICAL", - ): - logger.setLevel(level=verbosity) - - self.resource_id = resource_id - self.client_id = client_id - self.writeback = writeback - logger.debug("self.writeback is %s", self.writeback) - - logger.debug("CallAzureApi is initializing") - - logger.debug("resource_id is %s", resource_id) - logger.debug("client_id is %s", client_id) - logger.debug("outside_token is %s", outside_token) - - if outside_token: - self.auth = None - self.bearer = None - else: - logger.debug("outside_token is false, calling self._authenticate") - self._authenticate() - - def __str__(self): - str_repr = [ - "{key}='{value}'".format(key=k, value=v) - for k, v in self.__dict__.items() - ] - return ", ".join(str_repr) - - def __repr__(self): - return self.__str__() - - def get_bearer_token(self): - """ - Get an Azure OAuth2 bear token. - You need to open this URL in a web browser https://microsoft.com/devicelogin, and enter the displayed code. - - Return - accessToken: - The Bearer Authorization string - """ - logger.debug("Getting bearer token") - return self.bearer - - def _authenticate(self): - """ - Authenticate the user, generating a bearer token that is valid for one hour. - """ - logger.debug("Running _authenticate") - self.auth = Auth(self.client_id, self.resource_id, self.writeback) - self._generate_bearer_token() - - def _generate_bearer_token(self): - """ - Generate the access token through the authentication object. - """ - logger.debug("Running _generate_bearer_token()") - - self.bearer = "Bearer " + self.auth.get_token() - - logger.debug( - "Setting self.bearer. Length of self.bearer is %s", - str(len(self.bearer)), - ) - logger.debug("_generate_bearer_token is finished.") - - def _is_token_expired(self): - """ - Checks if one hour (with five secs tolerance) has passed since last authentication - """ - logger.debug("Checking if token has expired") - - is_expired = self.auth.is_token_expired() - - logger.debug( - "Answer from self.auth.is_token_expired() was %s", str(is_expired) - ) - - return is_expired - - def _process_token(self, bearer): - if bearer: - logger.debug("Bearer exist, returning bearer") - return "Bearer " + bearer - - if self._is_token_expired(): - logger.debug("Token is expired, calling for generating it.") - self._generate_bearer_token() - - logger.debug("self.bearer is being returned from _process_token()") - logger.debug("Length of self.bearer is %s", str(len(self.bearer))) - - return self.bearer - - def get_json(self, url, bearer=None): - """ - Send an request to the url. - - Parameters - url - Need to be a Azure rest url that returns a JSON. - bearer - Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - json: - The json respond from the entered URL - """ - - logger.debug("get_json() is starting") - - bearer = self._process_token(bearer) - - headers = {"Content-Type": "application/json", "Authorization": bearer} - - response = httpx.get(url, headers=headers) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return response.json() - - def get_image(self, url, bearer=None): - """ - Send an request to the url for the image. - - Parameters - url - Need to be a Azure rest url that returns a JSON. - bearer - Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - image: - raw image - """ - - logger.debug("get_image() is starting") - - bearer = self._process_token(bearer) - - headers = {"Content-Type": "html/text", "Authorization": bearer} - - response = httpx.get(url, headers=headers) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return None - - def get_content(self, url, bearer=None): - """ - Send an request to the url. - - Parameters - url - Need to be a Azure rest url that returns a JSON. - bearer - Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - content: - The content respond from the entered URL. - """ - - logger.debug("get_content() is starting") - - bearer = self._process_token(bearer) - - headers = {"Content-Type": "application/json", "Authorization": bearer} - - response = httpx.get(url, headers=headers) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return response.content - - def post(self, url, blob=None, json=None, bearer=None): - """ - Post binary or json to the url and return the response as json. - - Parameters - url: Need to be a Azure rest url that returns a JSON. - blob: Optional, the binary to save - json: Optional, the json to save - bearer: Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - string: The string respond from the entered URL - """ - - logger.debug("post() is starting") - - bearer = self._process_token(bearer) - - if blob and json: - raise ValueError( - "Both blob and json given to post - can only have one at the time." - ) - - headers = { - "Content-Type": "application/json" - if json is not None - else "application/octet-stream", - "Authorization": bearer, - "Content-Length": str(len(json) if json else len(blob)), - } - - try: - response = httpx.post(url, data=blob, json=json, headers=headers) - except httpx.ProxyError as err: - _raise_request_error_exception(503, err) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return response - - def put(self, url, blob=None, json=None, bearer=None): - """ - Put binary to the url and return the response as json. - - Parameters - url: Need to be a Azure rest url that returns a JSON. - blob: Optional, the binary to save - json: Optional, the json to save - bearer: Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - string: The string respond from the entered URL - """ - - logger.debug("put() is starting") - - bearer = self._process_token(bearer) - - if blob and json: - raise ValueError( - "Both blob and json given to put - can only have one at the time." - ) - - headers = { - "Content-Type": "application/json" - if json is not None - else "application/octet-stream", - "Content-Length": str(len(json) if json else len(blob)), - "x-ms-blob-type": "BlockBlob", - } - - if url.find("sig=") < 0: - headers["Authorization"] = bearer - - try: - response = httpx.put(url, data=blob, json=json, headers=headers) - except httpx.ProxyError as err: - _raise_request_error_exception(503, err) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return response - - def delete_object(self, url, bearer=None): - """ - Send delete to the url and return the response as json. - - Parameters - url: Need to be a Azure rest url that returns a JSON. - bearer: Optional, if not entered it will generate one by calling the get_bearer_token method - - Return - json: The json respond from the entered URL - """ - logger.debug("delete_object is starting") - bearer = self._process_token(bearer) - - headers = { - "Content-Type": "application/json", - "Authorization": bearer, - } - - response = httpx.delete(url, headers=headers) - - if response.is_error: - _raise_request_error_exception(response.status_code, response.text) - - return response.json() diff --git a/src/sumo/wrapper/_call_sumo_api.py b/src/sumo/wrapper/_call_sumo_api.py deleted file mode 100644 index bd21eea..0000000 --- a/src/sumo/wrapper/_call_sumo_api.py +++ /dev/null @@ -1,422 +0,0 @@ -import logging - -from .config import APP_REGISTRATION -from ._call_azure_api import CallAzureApi - -logger = logging.getLogger("sumo.wrapper") - - -class CallSumoApi: - """ - This class can be used for calling the Sumo APi. - """ - - def __init__( - self, - env="dev", - resource_id=None, - client_id=None, - outside_token=False, - writeback=False, - verbosity="CRITICAL", - ): - """Initialize the wrapper. Chooses among multiple environments.""" - - logger.setLevel(level=verbosity) - - if env == "localhost": - self.base_url = "http://localhost:8084/api/v1" - else: - self.base_url = f"https://main-sumo-{env}.radix.equinor.com/api/v1" - - resource_id = ( - resource_id - if resource_id - else APP_REGISTRATION[env]["RESOURCE_ID"] - ) - client_id = ( - client_id if client_id else APP_REGISTRATION[env]["CLIENT_ID"] - ) - - self.callAzureApi = CallAzureApi( - resource_id, client_id, outside_token, writeback=writeback - ) - - def __str__(self): - str_repr = [ - "{key}='{value}'".format(key=k, value=v) - for k, v in self.__dict__.items() - ] - return ", ".join(str_repr) - - def __repr__(self): - return self.__str__() - - def userdata(self, bearer=None): - """Get user data from Sumo endpoint /userdata""" - url = f"{self.base_url}/userdata" - return self.callAzureApi.get_json(url, bearer) - - def userphoto(self, bearer=None): - """Get user photo from Sumo endpoint /userphoto""" - url = f"{self.base_url}/userphoto" - return self.callAzureApi.get_image(url, bearer) - - def userprofile(self, bearer=None): - """Get user profile from Sumo endpoint /userprofile""" - url = f"{self.base_url}/userprofile" - return self.callAzureApi.get_json(url, bearer) - - # For discussion: Do we need to print the code and expect the user to manually - # type it on the browser or is there a better way to do it - def get_bearer_token(self): - """ - Generating an Azure OAuth2 bear token. - You need to open this URL in a web browser https://microsoft.com/devicelogin, - and enter the code that is printed. - - Return - accessToken: - The Bearer Authorization string - """ - return self.callAzureApi.get_bearer_token() - - def search( - self, - query, - select=None, - buckets=None, - search_from=0, - search_size="100", - search_after=None, - bearer=None, - ): - """ - Search for specific objects. - - Parameters - query string, in Lucene search syntax. - select string, comma-separated list of fields to return. Default: all fields. - buckets string, comma-separated list of fields to build buckets from. Default: none. - search_from string, start index for search result (for paging through lare result sets. Default: 0. - search_size, int, max number of hits to return. Default: 10. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - json: - Search results. - """ - url = f"{self.base_url}/search?$query={query}" - - if search_from is not None: - url = f"{url}&$from={search_from}" - if search_size is not None: - url = f"{url}&$size={search_size}" - if search_after is not None: - url = f"{url}&$search_after={search_after}" - if select: - url = f"{url}&$select={select}" - if buckets: - url = f"{url}&$buckets={buckets}" - - return self.callAzureApi.get_json(url, bearer) - - def searchroot( - self, - query, - select=None, - buckets=None, - search_from=0, - search_size="100", - bearer=None, - ): - """ - Search for parent objects (object without parent) - """ - url = f"{self.base_url}/searchroot?$query={query}" - - if search_from is None: - search_from = 0 - url = f"{url}&$from={search_from}" - - if search_size is None: - search_size = 100 - url = f"{url}&$size={search_size}" - - if select: - url = f"{url}&$select={select}" - if buckets: - url = f"{url}&$buckets={buckets}" - - return self.callAzureApi.get_json(url, bearer) - - def get_objects(self, bearer=None): - """ - Returns a list with all stored JSON objects. - - Parameters - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - list: - list of objects. - """ - url = f"{self.base_url}/Objects" - return self.callAzureApi.get_json(url, bearer) - - def get_json(self, object_id, bearer=None): - """ - Returns the stored json-document for the given objectid. - - Parameters - object_id string, the id for the json document to return. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - json: - Json document for the given objectId. - """ - url = f"{self.base_url}/objects('{object_id}')" - return self.callAzureApi.get_json(url, bearer) - - def save_top_level_json(self, json, bearer=None): - """ - Adds a new top-level json object to SUMO. - - Parameters - json json, Json document to save. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - string: - The object_id of the newly created object. - """ - return self._post_objects(json=json, bearer=bearer) - - def update_top_level_json( - self, json, object_id=None, url=None, bearer=None - ): - """ - Updates a top-level json object in SUMO. - - Parameters - json: json, JSON documents to save. - object_id: string, the ID of the object to be modified. - url: string, the url where the JSON is stored. When not None, it overrides object ID - bearer: string, Azure OAuth2 bear token Default: will create one. - """ - if not object_id and not url: - raise ValueError("Error: object ID and url cannot be both null.") - - return self._put_objects( - json=json, object_id=object_id, url=url, bearer=bearer - ) - - def save_child_level_json(self, parent_id, json, bearer=None): - """ - Creates a new child object (json document) for the object given by objectId. - Fails if objectId does not exist. - Also sets the _sumo.parent_object attribute for the new object to point at the parent object. - - Parameters - parent_id: string, the id of the json object that this json document will be attached to. - json: json, JSON document to save. - bearer: string, Azure OAuth2 bear token Default: will create one. - - Return - string: The object id of the newly created object, or error message. - """ - return self._post_objects( - object_id=parent_id, json=json, bearer=bearer - ) - - def update_child_level_json( - self, json, object_id=None, url=None, bearer=None - ): - """ - Updates a child-level json object in SUMO. - - Parameters - json: json, JSON documents to save. - object_id: string, the ID of the object to be modified. - url: string, the url where the JSON is stored. When not None, it overrides object ID - bearer: string, Azure OAuth2 bear token Default: will create one. - """ - if not object_id and not url: - raise ValueError("Error: object ID and url cannot be both null.") - - return self._put_objects( - json=json, object_id=object_id, url=url, bearer=bearer - ) - - def delete_object(self, object_id, bearer=None): - """ - Deletes the stored json-document for the given objectid. - - Parameters - object_id string, the id of the json object that will be deleted. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - json: - A json object that includes the id of the deleted object. - """ - url = f"{self.base_url}/objects('{object_id}')" - return self.callAzureApi.delete_object(url, bearer) - - def save_blob(self, blob, object_id=None, bearer=None, url=None): - """ - Save a binary file to blob storage. - - Parameters - object_id: string, the id of the json object that this blob document will be attached to. - blob: binary, the binary to save - bearer: string, Azure OAuth2 bear token Default: will create one. - - """ - return self._put_objects( - object_id=object_id, json=None, blob=blob, bearer=bearer, url=url - ) - - def get_blob(self, object_id, bearer=None): - """ - Get a binary file from blob-storage as a binary-stream for the objectId. - - Parameters - object_id string, the id for the blob document to return. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - binary: Binary-stream for the objectId. - """ - url = f"{self.base_url}/objects('{object_id}')/blob" - return self.callAzureApi.get_content(url, bearer) - - def delete_blob(self, object_id, bearer=None): - """ - Deletes the stored blob for the given objectid. - - Parameters - object_id string, the id of the blob object that will be deleted. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - json: - A json object that includes the id of the deleted object. - """ - url = f"{self.base_url}/objects('{object_id}')/blob" - return self.callAzureApi.delete_object(url, bearer) - - def get_blob_uri(self, object_id, bearer=None): - """ - Get the redirect uri to blob storage for uploading a blob - Parameters - object_id string, the id of the json object that will be deleted. - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - string: - """ - url = f"{self.base_url}/objects('{object_id}')/blob/authuri" - return self.callAzureApi.get_content(url, bearer) - - def save_blob_and_json(self, parent_id, metadata_json, blob, bearer=None): - """ - Uploads a regular surface metadata and its blob object. - - Parameters - parent_id string, the id of the parent ensemble. - metadata_json json, the regular surface metadata. - blob binary, binary data to be uploaded - bearer string, Azure OAuth2 bear token Default: will create one. - - Return - response object from metadata upload. - """ - response_json = self.save_child_level_json( - parent_id, metadata_json, bearer - ) - blob_url = response_json.json().get("blob_url") - _ = self.save_blob(blob, url=blob_url, bearer=bearer) - return response_json - - def aggregate_surfaces( - self, operation, object_ids, nan_as_zero="false", bearer=None - ): - """ - Perform an aggregation on surfaces described by the operation parameter for - the objects in the object_ids list. A new surface object is returned. - - Parameters - operation: list of strings, the operations to perfome: MEAN, MEDIAN, MIN, MAX, STD - and PXX for a specific percentile. - object_ids: list, the object-ids to - bearer: string, Azure OAuth2 bear token Default: will create one. - nan_as_zero: value that decides if NaN values in surfaces should be - zero (nan_as_zero = "true") or stay as NaN (default). - Return - surface: The aggregated surface - """ - - json = {} - json["operation"] = operation - json["object_ids"] = object_ids - json["nan_as_zero"] = nan_as_zero - url = f"{self.base_url}/aggregate" - return self._post_objects(url=url, json=json, bearer=bearer) - - def _post_objects( - self, json, blob=None, object_id=None, bearer=None, url=None - ): - """ - Post a new object into sumo. - - Parameters - json: JSON dictionary, containing the object's metadata - blob: binary, the binary data linked to the object - object_id: string, the id of the json object this object will be attached to. - bearer: string, Azure OAuth2 bear token Default: will create one. - - Return - The object_id of the newly uploaded object, or error message. - """ - - if not url: - url = f"{self.base_url}/objects" - - if object_id: - url = f"{url}('{object_id}')" - - if blob: - url = f"{url}/blob" - - return self.callAzureApi.post( - url=url, blob=blob, json=json, bearer=bearer - ) - - def _put_objects( - self, object_id=None, blob=None, json=None, bearer=None, url=None - ): - """ - Post a new object into sumo. - Parameters - object_id: string, the id of object being updated. - blob: binary, the binary data linked to the object - json: JSON dictionary, containing the object's metadata - bearer: string, Azure OAuth2 bear token Default: will create one. - - Return - The parent_id of the newly uploaded object, or error message. - """ - if url is None: - url = f"{self.base_url}/objects" - - if object_id: - url = f"{url}('{object_id}')" - - if blob: - url = f"{url}/blob" - - return self.callAzureApi.put( - url=url, blob=blob, json=json, bearer=bearer - ) diff --git a/src/sumo/wrapper/decorators.py b/src/sumo/wrapper/decorators.py new file mode 100644 index 0000000..e3a21b7 --- /dev/null +++ b/src/sumo/wrapper/decorators.py @@ -0,0 +1,45 @@ +import tenacity as tn +import httpx + + +def raise_for_status(func): + def wrapper(*args, **kwargs): + return func(*args, **kwargs).raise_for_status() + + return wrapper + + +def http_unpack(func): + def wrapper(*args, **kwargs): + response = func(*args, **kwargs) + ct = response.headers["Content-Type"] + if ct.startswith("application/octet-stream"): + return response.content + if ct.startswith("application/json"): + return response.json() + # ELSE: + return response.text + + return wrapper + + +# Define the conditions for retrying based on exception types +def is_retryable_exception(exception): + return isinstance(exception, (httpx.TimeoutException, httpx.ConnectError)) + + +# Define the conditions for retrying based on HTTP status codes +def is_retryable_status_code(response): + return response.status_code in [502, 503, 504] + + +def http_retry(func): + return tn.retry( + func, + stop=tn.stop_after_attempt(6), + retry=( + tn.retry_if_exception(is_retryable_exception) + | tn.retry_if_result(is_retryable_status_code) + ), + wait=tn.wait_exponential_jitter(), + ) diff --git a/src/sumo/wrapper/sumo_client.py b/src/sumo/wrapper/sumo_client.py index 51cf4b8..369cf12 100644 --- a/src/sumo/wrapper/sumo_client.py +++ b/src/sumo/wrapper/sumo_client.py @@ -2,14 +2,16 @@ import time import httpx + import jwt from ._blob_client import BlobClient from ._logging import LogHandlerSumo from ._new_auth import NewAuth -from ._request_error import raise_request_error_exception from .config import APP_REGISTRATION, TENANT_ID +from .decorators import http_unpack, raise_for_status, http_retry + logger = logging.getLogger("sumo.wrapper") DEFAULT_TIMEOUT = httpx.Timeout(20.0) @@ -162,6 +164,9 @@ def _process_params(self, params_dict: dict) -> dict: return None if prefixed_params == {} else prefixed_params + @http_unpack + @raise_for_status + @http_retry def get(self, path: str, **params) -> dict: """Performs a GET-request to the Sumo API. @@ -205,14 +210,10 @@ def get(self, path: str, **params) -> dict: timeout=DEFAULT_TIMEOUT, ) - if response.is_error: - raise_request_error_exception(response.status_code, response.text) - - if "/blob" in path: - return response.content - - return response.json() + return response + @raise_for_status + @http_retry def post( self, path: str, @@ -259,7 +260,6 @@ def post( json=object_metadata ) """ - token = self._retrieve_token() if blob and json: @@ -274,23 +274,18 @@ def post( "authorization": f"Bearer {token}", } - try: - response = httpx.post( - f"{self.base_url}{path}", - data=blob, - json=json, - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) - + response = httpx.post( + f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + params=params, + timeout=DEFAULT_TIMEOUT, + ) return response + @raise_for_status + @http_retry def put( self, path: str, blob: bytes = None, json: dict = None ) -> httpx.Response: @@ -324,22 +319,19 @@ def put( "authorization": f"Bearer {token}", } - try: - response = httpx.put( - f"{self.base_url}{path}", - data=blob, - json=json, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) + response = httpx.put( + f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + timeout=DEFAULT_TIMEOUT, + ) return response + @http_unpack + @raise_for_status + @http_retry def delete(self, path: str) -> dict: """Performs a DELETE-request to the Sumo API. @@ -371,10 +363,7 @@ def delete(self, path: str) -> dict: timeout=DEFAULT_TIMEOUT, ) - if response.is_error: - raise_request_error_exception(response.status_code, response.text) - - return response.json() + return response def getLogger(self, name): """Gets a logger object that sends log objects into the message_log @@ -394,6 +383,9 @@ def getLogger(self, name): logger.addHandler(handler) return logger + @http_unpack + @raise_for_status + @http_retry async def get_async(self, path: str, **params): """Performs an async GET-request to the Sumo API. @@ -436,14 +428,10 @@ async def get_async(self, path: str, **params): timeout=DEFAULT_TIMEOUT, ) - if response.is_error: - raise_request_error_exception(response.status_code, response.text) - - if "/blob" in path: - return response.content - - return response.json() + return response + @raise_for_status + @http_retry async def post_async( self, path: str, @@ -505,24 +493,20 @@ async def post_async( "authorization": f"Bearer {token}", } - try: - async with httpx.AsyncClient() as client: - response = await client.post( - url=f"{self.base_url}{path}", - data=blob, - json=json, - headers=headers, - params=params, - timeout=DEFAULT_TIMEOUT, - ) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) + async with httpx.AsyncClient() as client: + response = await client.post( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + params=params, + timeout=DEFAULT_TIMEOUT, + ) return response + @raise_for_status + @http_retry async def put_async( self, path: str, blob: bytes = None, json: dict = None ) -> httpx.Response: @@ -556,23 +540,20 @@ async def put_async( "authorization": f"Bearer {token}", } - try: - async with httpx.AsyncClient() as client: - response = await client.put( - url=f"{self.base_url}{path}", - data=blob, - json=json, - headers=headers, - timeout=DEFAULT_TIMEOUT, - ) - except httpx.ProxyError as err: - raise_request_error_exception(503, err) - - if response.is_error: - raise_request_error_exception(response.status_code, response.text) + async with httpx.AsyncClient() as client: + response = await client.put( + url=f"{self.base_url}{path}", + content=blob, + json=json, + headers=headers, + timeout=DEFAULT_TIMEOUT, + ) return response + @http_unpack + @raise_for_status + @http_retry async def delete_async(self, path: str) -> dict: """Performs an async DELETE-request to the Sumo API. @@ -605,7 +586,4 @@ async def delete_async(self, path: str) -> dict: timeout=DEFAULT_TIMEOUT, ) - if response.is_error: - raise_request_error_exception(response.status_code, response.text) - - return response.json() + return response diff --git a/tests/test_call_sumo_api.py b/tests/test_call_sumo_api.py deleted file mode 100644 index 82f4707..0000000 --- a/tests/test_call_sumo_api.py +++ /dev/null @@ -1,439 +0,0 @@ -"""Example code for communicating with Sumo""" -import sys - -try: - sys.path.index("../src") # Or os.getcwd() for this directory -except ValueError: - sys.path.append("../src") # Or os.getcwd() for this directory - -import pytest -import yaml - -from time import sleep -from sumo.wrapper import CallSumoApi - - -class Connection: - def __init__(self): - self.api = CallSumoApi(env="dev") - - -def _upload_parent_object(C, json): - response = C.api.save_top_level_json(json=json) - if not 200 <= response.status_code < 202: - raise Exception(f"code: {response.status_code}, text: {response.text}") - return response - - -def _upload_blob(C, blob, url=None, object_id=None): - response = C.api.save_blob(object_id=object_id, blob=blob, url=url) - print("Blob save " + str(response.status_code), flush=True) - if not 200 <= response.status_code < 202: - raise Exception( - f"blob upload to object_id {object_id} returned {response.text} {response.status_code}" - ) - return response - - -def _get_blob_uri(C, object_id): - response = C.api.get_blob_uri(object_id=object_id) - print("Blob save " + str(response.status_code), flush=True) - if not 200 <= response.status_code < 202: - raise Exception( - f"get blob uri for {object_id} returned {response.text} {response.status_code}" - ) - return response - - -def _download_object(C, object_id): - json = C.api.get_json(object_id=object_id) - return json - - -def _upload_child_level_json(C, parent_id, json): - response = C.api.save_child_level_json(parent_id=parent_id, json=json) - if not 200 <= response.status_code < 202: - raise Exception(f"Response: {response.status_code}, Text: {response.text}") - return response - - -def _delete_object(C, object_id): - response = C.api.delete_object(object_id=object_id) - return response - - -class ValueKeeper: - """Class for keeping/passing values between tests""" - - pass - - -""" TESTS """ - - -def test_upload_search_delete_ensemble_child(): - """ - Testing the wrapper functionalities. - - We upload an ensemble object along with a child. After that, we search for - those objects to make sure they are available to the user. We then delete - them and repeat the search to check if they were properly removed from sumo. - """ - C = Connection() - B = b"123456789" - - # Upload Ensemble - with open("tests/testdata/case.yml", "r") as stream: - fmu_case_metadata = yaml.safe_load(stream) - - response_case = _upload_parent_object(C=C, json=fmu_case_metadata) - - assert 200 <= response_case.status_code <= 202 - assert isinstance(response_case.json(), dict) - - case_id = response_case.json().get("objectid") - fmu_case_id = fmu_case_metadata.get("fmu").get("case").get("uuid") - - # Upload Regular Surface - with open("tests/testdata/surface.yml", "r") as stream: - fmu_surface_metadata = yaml.safe_load(stream) - - fmu_surface_id = fmu_surface_metadata.get("fmu").get("realization").get("id") - response_surface = _upload_child_level_json( - C=C, parent_id=case_id, json=fmu_surface_metadata - ) - - assert 200 <= response_surface.status_code <= 202 - assert isinstance(response_surface.json(), dict) - - surface_id = response_surface.json().get("objectid") - - # Upload BLOB - url = response_surface.json().get("blob_url") - response_blob = _upload_blob(C=C, blob=B, object_id=surface_id, url=url) - assert 200 <= response_blob.status_code <= 202 - - sleep(4) - - # Search for ensemble - query = f"{fmu_case_id}" - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - assert len(hits) == 1 - assert hits[0].get("_id") == case_id - - # Search for child object - - search_results = C.api.search(query, select="_source") - - total = search_results.get("hits").get("total").get("value") - assert total == 2 - - get_result = _download_object(C, object_id=surface_id) - assert get_result["_id"] == surface_id - - # Search for blob - bin_obj = C.api.get_blob(object_id=surface_id) - assert bin_obj == B - - # Delete Ensemble - result = _delete_object(C=C, object_id=case_id) - assert result == "Accepted" - - sleep(4) - - # Search for ensemble - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - - assert len(hits) == 0 - - # Search for child object - search_results = C.api.search(query, select="source") - total = search_results.get("hits").get("total").get("value") - assert total == 0 - - -def test_direct_blob_store_upload(): - """ - Testing the wrapper functionalities. - - We upload an ensemble object along with a child. After that, we search for - those objects to make sure they are available to the user. We then delete - them and repeat the search to check if they were properly removed from sumo. - """ - C = Connection() - B = b"123456789" - - # Upload Ensemble - with open("tests/testdata/case.yml", "r") as stream: - fmu_case_metadata = yaml.safe_load(stream) - - response_case = _upload_parent_object(C=C, json=fmu_case_metadata) - - assert 200 <= response_case.status_code <= 202 - assert isinstance(response_case.json(), dict) - - case_id = response_case.json().get("objectid") - fmu_case_id = fmu_case_metadata.get("fmu").get("case").get("uuid") - - # Upload Regular Surface - with open("tests/testdata/surface.yml", "r") as stream: - fmu_surface_metadata = yaml.safe_load(stream) - - fmu_surface_id = fmu_surface_metadata.get("fmu").get("realization").get("id") - response_surface = _upload_child_level_json( - C=C, parent_id=case_id, json=fmu_surface_metadata - ) - - assert 200 <= response_surface.status_code <= 202 - assert isinstance(response_surface.json(), dict) - - surface_id = response_surface.json().get("objectid") - - # Upload BLOB - blob_url = response_surface.json().get("blob_url") - response_blob = _upload_blob(C=C, blob=B, url=blob_url) - assert 200 <= response_blob.status_code <= 202 - - sleep(4) - - # Search for ensemble - query = f"{fmu_case_id}" - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - assert len(hits) == 1 - assert hits[0].get("_id") == case_id - - # Search for child object - - search_results = C.api.search(query, select="_source") - - total = search_results.get("hits").get("total").get("value") - assert total == 2 - - get_result = _download_object(C, object_id=surface_id) - assert get_result["_id"] == surface_id - - # Search for blob - bin_obj = C.api.get_blob(object_id=surface_id) - assert bin_obj == B - - # Delete Ensemble - result = _delete_object(C=C, object_id=case_id) - assert result == "Accepted" - - sleep(4) - - # Search for ensemble - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - - assert len(hits) == 0 - - # Search for child object - search_results = C.api.search(query, select="source") - total = search_results.get("hits").get("total").get("value") - assert total == 0 - - -def test_direct_blob_store_upload_single_operation(): - """ - Testing the wrapper functionalities. - - We upload an ensemble object along with a child. After that, we search for - those objects to make sure they are available to the user. We then delete - them and repeat the search to check if they were properly removed from sumo. - """ - - C = Connection() - B = b"123456789" - - # Upload Ensemble - with open("tests/testdata/case.yml", "r") as stream: - fmu_case_metadata = yaml.safe_load(stream) - - response_case = _upload_parent_object(C=C, json=fmu_case_metadata) - - assert 200 <= response_case.status_code <= 202 - assert isinstance(response_case.json(), dict) - - case_id = response_case.json().get("objectid") - fmu_case_id = fmu_case_metadata.get("fmu").get("case").get("uuid") - - # Upload Regular Surface - with open("tests/testdata/surface.yml", "r") as stream: - fmu_surface_metadata = yaml.safe_load(stream) - - fmu_surface_id = fmu_surface_metadata.get("fmu").get("realization").get("id") - - response_surface = C.api.save_blob_and_json(case_id, fmu_surface_metadata, B) - - assert 200 <= response_surface.status_code <= 202 - assert isinstance(response_surface.json(), dict) - - surface_id = response_surface.json().get("objectid") - - sleep(4) - - # Search for ensemble - query = f"{fmu_case_id}" - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - assert len(hits) == 1 - assert hits[0].get("_id") == case_id - - # Search for child object - - search_results = C.api.search(query, select="_source") - - total = search_results.get("hits").get("total").get("value") - assert total == 2 - - get_result = _download_object(C, object_id=surface_id) - assert get_result["_id"] == surface_id - - # Search for blob - bin_obj = C.api.get_blob(object_id=surface_id) - assert bin_obj == B - - # Delete Ensemble - result = _delete_object(C=C, object_id=case_id) - assert result == "Accepted" - - sleep(4) - - # Search for ensemble - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - - assert len(hits) == 0 - - # Search for child object - search_results = C.api.search(query, select="source") - total = search_results.get("hits").get("total").get("value") - assert total == 0 - - -def test_search_after(): - """ - Testing the wrapper functionalities. - - We upload an ensemble object along with a child. After that, we search for - those objects to make sure they are available to the user. We then delete - them and repeat the search to check if they were properly removed from sumo. - """ - - C = Connection() - B = b"123456789" - - # Upload Ensemble - with open("tests/testdata/case.yml", "r") as stream: - fmu_case_metadata = yaml.safe_load(stream) - - response_case = _upload_parent_object(C=C, json=fmu_case_metadata) - - assert 200 <= response_case.status_code <= 202 - assert isinstance(response_case.json(), dict) - - case_id = response_case.json().get("objectid") - fmu_case_id = fmu_case_metadata.get("fmu").get("case").get("uuid") - - # Upload Regular Surface - with open("tests/testdata/surface.yml", "r") as stream: - fmu_surface_metadata = yaml.safe_load(stream) - - fmu_surface_id = fmu_surface_metadata.get("fmu").get("realization").get("id") - - response_surface = C.api.save_blob_and_json(case_id, fmu_surface_metadata, B) - - assert 200 <= response_surface.status_code <= 202 - assert isinstance(response_surface.json(), dict) - - surface_id = response_surface.json().get("objectid") - - sleep(2) - - # Search for child object - - query = f"{fmu_case_id}" - search_results = C.api.search(query, search_size="1", select="_source") - - num_results = len(search_results.get("hits").get("hits")) - search_after = search_results.get("hits").get("hits")[num_results - 1].get("sort") - - assert num_results == 1 - - search_results = C.api.search(query, search_after=search_after, select="_source") - num_results = len(search_results.get("hits").get("hits")) - - # assert num_results == 1 - - # Delete Ensemble - result = _delete_object(C=C, object_id=case_id) - assert result == "Accepted" - - sleep(3) - - # Search for ensemble - search_results = C.api.searchroot(query, select="_source") - - hits = search_results.get("hits").get("hits") - - assert len(hits) == 0 - - # Search for child object - search_results = C.api.search(query, select="source") - total = search_results.get("hits").get("total").get("value") - assert total == 0 - - -def test_fail_on_wrong_metadata(): - """ - Upload a parent object with erroneous metadata, confirm failure - """ - C = Connection() - with pytest.raises(Exception): - assert _upload_parent_object(C=C, json={"some field": "some value"}) - - -def test_upload_duplicate_ensemble(): - """ - Adding a duplicate ensemble, both tries must return same id. - """ - C = Connection() - - with open("tests/testdata/case.yml", "r") as stream: - fmu_metadata1 = yaml.safe_load(stream) - - with open("tests/testdata/case.yml", "r") as stream: - fmu_metadata2 = yaml.safe_load(stream) - - # upload case metadata, get object_id - response1 = _upload_parent_object(C=C, json=fmu_metadata1) - assert 200 <= response1.status_code <= 202 - - # upload duplicated case metadata, get object_id - response2 = _upload_parent_object(C=C, json=fmu_metadata2) - assert 200 <= response2.status_code <= 202 - - case_id1 = response1.json().get("objectid") - case_id2 = response2.json().get("objectid") - assert case_id1 == case_id2 - - get_result = _download_object(C, object_id=case_id1) - assert get_result["_id"] == case_id1 - - # Delete Ensemble - result = _delete_object(C=C, object_id=case_id1) - assert result == "Accepted" - - # Search for ensemble - with pytest.raises(Exception): - assert _download_object(C, object_id=case_id2)