From 953547fbfb4f20e6bc28ebd584a5cb6dd302b4d6 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Sun, 1 Oct 2023 15:01:09 +0530 Subject: [PATCH 1/6] introduce retry for fetching data source configurations --- .../docs/getting-started/configuration.mdx | 1 + packages/opal-client/opal_client/config.py | 22 +++++++--- .../opal-client/opal_client/data/fetcher.py | 43 +++++++++++++++++-- .../opal-client/opal_client/policy/options.py | 2 +- 4 files changed, 58 insertions(+), 10 deletions(-) diff --git a/documentation/docs/getting-started/configuration.mdx b/documentation/docs/getting-started/configuration.mdx index 9a775d106..e54eee311 100644 --- a/documentation/docs/getting-started/configuration.mdx +++ b/documentation/docs/getting-started/configuration.mdx @@ -127,6 +127,7 @@ Please use this table as a reference. | POLICY_STORE_AUTH_OAUTH_CLIENT_ID | The client id OPAL will use to authenticate against the OAuth server. | | | POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET | The client secret OPAL will use to authenticate against the OAuth server. | | | POLICY_STORE_CONN_RETRY | Retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA). | | +| DATA_STORE_CONN_RETRY | Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot). | | | POLICY_STORE_POLICY_PATHS_TO_IGNORE | Which policy paths pushed to the client should be ignored. List of glob style paths, or paths without wildcards but ending with "/\*\*" indicating a parent path (ignoring all under it). | | | POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server). | | | INLINE_OPA_ENABLED | Whether or not OPAL should run OPA by itself in the same container. | | diff --git a/packages/opal-client/opal_client/config.py b/packages/opal-client/opal_client/config.py index 2f5320b69..a8b7e2244 100644 --- a/packages/opal-client/opal_client/config.py +++ b/packages/opal-client/opal_client/config.py @@ -1,7 +1,7 @@ from enum import Enum from opal_client.engine.options import CedarServerOptions, OpaServerOptions -from opal_client.policy.options import PolicyConnRetryOptions +from opal_client.policy.options import ConnRetryOptions from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreTypes from opal_common.confi import Confi, confi from opal_common.config import opal_common_config @@ -49,16 +49,16 @@ class OpalClientConfig(Confi): description="the client secret OPAL will use to authenticate against the OAuth server.", ) - POLICY_STORE_CONN_RETRY: PolicyConnRetryOptions = confi.model( + POLICY_STORE_CONN_RETRY: ConnRetryOptions = confi.model( "POLICY_STORE_CONN_RETRY", - PolicyConnRetryOptions, + ConnRetryOptions, # defaults are being set according to PolicyStoreConnRetryOptions pydantic definitions (see class) {}, description="retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA)", ) - POLICY_UPDATER_CONN_RETRY: PolicyConnRetryOptions = confi.model( + POLICY_UPDATER_CONN_RETRY: ConnRetryOptions = confi.model( "POLICY_UPDATER_CONN_RETRY", - PolicyConnRetryOptions, + ConnRetryOptions, { "wait_strategy": "random_exponential", "max_wait": 10, @@ -68,6 +68,18 @@ class OpalClientConfig(Confi): description="retry options when connecting to the policy source (e.g. the policy bundle server)", ) + DATA_STORE_CONN_RETRY: ConnRetryOptions = confi.model( + "DATA_STORE_CONN_RETRY", + ConnRetryOptions, + { + "wait_strategy": "random_exponential", + "max_wait": 10, + "attempts": 5, + "wait_time": 1, + }, + description="retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)", + ) + POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list( "POLICY_STORE_POLICY_PATHS_TO_IGNORE", [], diff --git a/packages/opal-client/opal_client/data/fetcher.py b/packages/opal-client/opal_client/data/fetcher.py index 0bb0a2385..5cd302583 100644 --- a/packages/opal-client/opal_client/data/fetcher.py +++ b/packages/opal-client/opal_client/data/fetcher.py @@ -1,6 +1,7 @@ import asyncio from typing import Any, Dict, List, Optional, Tuple +from fastapi import HTTPException from opal_client.config import opal_client_config from opal_client.policy_store.base_policy_store_client import JsonableValue from opal_common.config import opal_common_config @@ -9,12 +10,20 @@ from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig from opal_common.logger import logger from opal_common.utils import get_authorization_header, tuple_to_dict +from tenacity import retry class DataFetcher: """fetches policy data from backend.""" - def __init__(self, default_data_url: str = None, token: str = None): + # Use as default config the configuration provider by opal_client_config.DATA_STORE_CONN_RETRY + # Add reraise as true (an option not available for control from the higher-level config) + DEFAULT_RETRY_CONFIG = opal_client_config.DATA_STORE_CONN_RETRY.toTenacityConfig() + DEFAULT_RETRY_CONFIG["reraise"] = True + + def __init__( + self, default_data_url: str = None, token: str = None, retry_config=None + ): """ Args: @@ -36,6 +45,9 @@ def __init__(self, default_data_url: str = None, token: str = None): self._default_fetcher_config = HttpFetcherConfig( headers=self._auth_headers, is_json=True ) + self._retry_config = ( + retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG + ) async def __aenter__(self): await self.start() @@ -65,13 +77,36 @@ async def handle_url( return None logger.info("Fetching data from url: {url}", url=url) + response = None + + @retry(**self._retry_config) + async def handle_url_with_retry(): + nonlocal response + try: + # ask the engine to get our data + response = await self._engine.handle_url(url, config=config) + if response.status >= 400: + logger.error( + "error while fetching url: {url}, got response code: {code}", + url=url, + code=response.status, + ) + raise HTTPException( + status_code=response.status, + detail=f"bad status code(>=400) returned accessing url: {url}", + ) + return response + except asyncio.TimeoutError as e: + logger.exception("Timeout while fetching url: {url}", url=url) + raise + try: - # ask the engine to get our data - response = await self._engine.handle_url(url, config=config) + await handle_url_with_retry() + except HTTPException: return response except asyncio.TimeoutError as e: - logger.exception("Timeout while fetching url: {url}", url=url) raise + return response async def handle_urls( self, urls: List[Tuple[str, FetcherConfig, Optional[JsonableValue]]] = None diff --git a/packages/opal-client/opal_client/policy/options.py b/packages/opal-client/opal_client/policy/options.py index 276465ed9..7d0c3ac83 100644 --- a/packages/opal-client/opal_client/policy/options.py +++ b/packages/opal-client/opal_client/policy/options.py @@ -20,7 +20,7 @@ class WaitStrategy(str, Enum): random_exponential = "random_exponential" -class PolicyConnRetryOptions(BaseModel): +class ConnRetryOptions(BaseModel): wait_strategy: WaitStrategy = Field( WaitStrategy.fixed, description="waiting strategy (e.g. fixed for fixed-time waiting, exponential for exponential back-off) (default fixed)", From 2c63a27016068fe55222a236bc4a0f887d857dd9 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Wed, 4 Oct 2023 14:21:56 +0530 Subject: [PATCH 2/6] check for response type of returned value by engine.handle_url --- packages/opal-client/opal_client/data/fetcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/opal-client/opal_client/data/fetcher.py b/packages/opal-client/opal_client/data/fetcher.py index 5cd302583..70643be9c 100644 --- a/packages/opal-client/opal_client/data/fetcher.py +++ b/packages/opal-client/opal_client/data/fetcher.py @@ -1,6 +1,7 @@ import asyncio from typing import Any, Dict, List, Optional, Tuple +import aiohttp from fastapi import HTTPException from opal_client.config import opal_client_config from opal_client.policy_store.base_policy_store_client import JsonableValue @@ -85,7 +86,10 @@ async def handle_url_with_retry(): try: # ask the engine to get our data response = await self._engine.handle_url(url, config=config) - if response.status >= 400: + if ( + isinstance(response, aiohttp.ClientResponse) + and response.status >= 400 + ): logger.error( "error while fetching url: {url}, got response code: {code}", url=url, From 58c87fb3d65394493457d5be59c7626499d46dd0 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Sun, 10 Dec 2023 12:28:16 +0530 Subject: [PATCH 3/6] set raise_for_status to true for HttpFetchProvider to raise exception for >400 status code for retry --- .../opal-client/opal_client/data/fetcher.py | 35 +++---------------- .../fetcher/engine/fetching_engine.py | 6 +++- .../fetcher/providers/http_fetch_provider.py | 4 ++- 3 files changed, 13 insertions(+), 32 deletions(-) diff --git a/packages/opal-client/opal_client/data/fetcher.py b/packages/opal-client/opal_client/data/fetcher.py index 70643be9c..f41afdfe9 100644 --- a/packages/opal-client/opal_client/data/fetcher.py +++ b/packages/opal-client/opal_client/data/fetcher.py @@ -34,11 +34,15 @@ def __init__( # defaults default_data_url: str = default_data_url or opal_client_config.DEFAULT_DATA_URL token: str = token or opal_client_config.CLIENT_TOKEN + self._retry_config = ( + retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG + ) # The underlying fetching engine self._engine = FetchingEngine( worker_count=opal_common_config.FETCHING_WORKER_COUNT, callback_timeout=opal_common_config.FETCHING_CALLBACK_TIMEOUT, enqueue_timeout=opal_common_config.FETCHING_ENQUEUE_TIMEOUT, + retry_config=self._retry_config, ) self._data_url = default_data_url self._token = token @@ -46,9 +50,6 @@ def __init__( self._default_fetcher_config = HttpFetcherConfig( headers=self._auth_headers, is_json=True ) - self._retry_config = ( - retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG - ) async def __aenter__(self): await self.start() @@ -80,37 +81,11 @@ async def handle_url( logger.info("Fetching data from url: {url}", url=url) response = None - @retry(**self._retry_config) - async def handle_url_with_retry(): - nonlocal response - try: - # ask the engine to get our data - response = await self._engine.handle_url(url, config=config) - if ( - isinstance(response, aiohttp.ClientResponse) - and response.status >= 400 - ): - logger.error( - "error while fetching url: {url}, got response code: {code}", - url=url, - code=response.status, - ) - raise HTTPException( - status_code=response.status, - detail=f"bad status code(>=400) returned accessing url: {url}", - ) - return response - except asyncio.TimeoutError as e: - logger.exception("Timeout while fetching url: {url}", url=url) - raise - try: - await handle_url_with_retry() - except HTTPException: + response = await self._engine.handle_url(url, config=config) return response except asyncio.TimeoutError as e: raise - return response async def handle_urls( self, urls: List[Tuple[str, FetcherConfig, Optional[JsonableValue]]] = None diff --git a/packages/opal-common/opal_common/fetcher/engine/fetching_engine.py b/packages/opal-common/opal_common/fetcher/engine/fetching_engine.py index 0e203a6f7..d0696844c 100644 --- a/packages/opal-common/opal_common/fetcher/engine/fetching_engine.py +++ b/packages/opal-common/opal_common/fetcher/engine/fetching_engine.py @@ -36,6 +36,7 @@ def __init__( worker_count: int = DEFAULT_WORKER_COUNT, callback_timeout: int = DEFAULT_CALLBACK_TIMEOUT, enqueue_timeout: int = DEFAULT_ENQUEUE_TIMEOUT, + retry_config=None, ) -> None: # The internal task queue (created at start_workers) self._queue: asyncio.Queue = None @@ -51,6 +52,7 @@ def __init__( self._callback_timeout = callback_timeout # time in seconds before time out on adding a task to queue (when full) self._enqueue_timeout = enqueue_timeout + self._retry_config = retry_config def start_workers(self): if self._queue is None: @@ -145,7 +147,9 @@ async def queue_url( fetcher = config.fetcher # init a URL event - event = FetchEvent(url=url, fetcher=fetcher, config=config) + event = FetchEvent( + url=url, fetcher=fetcher, config=config, retry=self._retry_config + ) return await self.queue_fetch_event(event, callback) async def queue_fetch_event( diff --git a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py index 88d27668f..05189876f 100644 --- a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py +++ b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py @@ -71,7 +71,9 @@ async def __aenter__(self): headers = {} if self._event.config.headers is not None: headers = self._event.config.headers - self._session = await ClientSession(headers=headers).__aenter__() + self._session = await ClientSession( + headers=headers, raise_for_status=True + ).__aenter__() return self async def __aexit__(self, exc_type=None, exc_val=None, tb=None): From e3134c9b2bf57292d419f35ce1a75abd08362cc5 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Sun, 10 Dec 2023 12:33:19 +0530 Subject: [PATCH 4/6] undo changes from previous implementation where in retry was explici --- packages/opal-client/opal_client/data/fetcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/opal-client/opal_client/data/fetcher.py b/packages/opal-client/opal_client/data/fetcher.py index f41afdfe9..84d5d150f 100644 --- a/packages/opal-client/opal_client/data/fetcher.py +++ b/packages/opal-client/opal_client/data/fetcher.py @@ -79,12 +79,12 @@ async def handle_url( return None logger.info("Fetching data from url: {url}", url=url) - response = None - try: + # ask the engine to get our data response = await self._engine.handle_url(url, config=config) return response except asyncio.TimeoutError as e: + logger.exception("Timeout while fetching url: {url}", url=url) raise async def handle_urls( From 49c117330c3a5f167118c679c38d6c5fc83ec325 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Sun, 10 Dec 2023 12:35:06 +0530 Subject: [PATCH 5/6] remvoed unused imports --- packages/opal-client/opal_client/data/fetcher.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/opal-client/opal_client/data/fetcher.py b/packages/opal-client/opal_client/data/fetcher.py index 84d5d150f..419ceed7e 100644 --- a/packages/opal-client/opal_client/data/fetcher.py +++ b/packages/opal-client/opal_client/data/fetcher.py @@ -1,8 +1,6 @@ import asyncio from typing import Any, Dict, List, Optional, Tuple -import aiohttp -from fastapi import HTTPException from opal_client.config import opal_client_config from opal_client.policy_store.base_policy_store_client import JsonableValue from opal_common.config import opal_common_config @@ -11,7 +9,6 @@ from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig from opal_common.logger import logger from opal_common.utils import get_authorization_header, tuple_to_dict -from tenacity import retry class DataFetcher: From 36008f8c0b44fbef6749aef2f425220dc78e2024 Mon Sep 17 00:00:00 2001 From: thilak reddy Date: Tue, 12 Dec 2023 21:21:14 +0530 Subject: [PATCH 6/6] update docs for moving policy and data retry config variables under respective headings --- documentation/docs/getting-started/configuration.mdx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/documentation/docs/getting-started/configuration.mdx b/documentation/docs/getting-started/configuration.mdx index e54eee311..add256438 100644 --- a/documentation/docs/getting-started/configuration.mdx +++ b/documentation/docs/getting-started/configuration.mdx @@ -127,9 +127,7 @@ Please use this table as a reference. | POLICY_STORE_AUTH_OAUTH_CLIENT_ID | The client id OPAL will use to authenticate against the OAuth server. | | | POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET | The client secret OPAL will use to authenticate against the OAuth server. | | | POLICY_STORE_CONN_RETRY | Retry options when connecting to the policy store (i.e. the agent that handles the policy, e.g. OPA). | | -| DATA_STORE_CONN_RETRY | Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot). | | | POLICY_STORE_POLICY_PATHS_TO_IGNORE | Which policy paths pushed to the client should be ignored. List of glob style paths, or paths without wildcards but ending with "/\*\*" indicating a parent path (ignoring all under it). | | -| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server). | | | INLINE_OPA_ENABLED | Whether or not OPAL should run OPA by itself in the same container. | | | INLINE_OPA_CONFIG | If inline OPA is indeed enabled, the user can set the [server configuration options](https://docs.opal.ac/getting-started/running-opal/run-opal-client/opa-runner-parameters) that affects how OPA will start when running `opa run --server` inline. Watch escaping quotes. | {"config_file":"/mnt/opa/config"} | | INLINE_OPA_LOG_FORMAT | | | @@ -141,9 +139,10 @@ Please use this table as a reference. ## Policy Updater Configuration Variables -| Variables | Description | Example | -| ------------------------ | --------------------------------------------------------------------------------------- | ------- | -| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | | +| Variables | Description | Example | +| ------------------------- | --------------------------------------------------------------------------------------- | ------- | +| POLICY_SUBSCRIPTION_DIRS | The directories in a policy repo we should subscribe to for policy code (rego) modules. | | +| POLICY_UPDATER_CONN_RETRY | Retry options when connecting to the policy source (e.g. the policy bundle server | | ## Data Updater Configuration Variables @@ -156,6 +155,7 @@ Please use this table as a reference. | SHOULD_REPORT_ON_DATA_UPDATES | Should the client report on updates to callbacks defined in DEFAULT_UPDATE_CALLBACKS or within the given updates. | | | DEFAULT_UPDATE_CALLBACK_CONFIG | | | | DEFAULT_UPDATE_CALLBACKS | Where/How the client should report on the completion of data updates. | | +| DATA_STORE_CONN_RETRY | Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot). | | ## OPA Transaction Log / Healthcheck Configuration Variables