Skip to content

Commit

Permalink
App Configuration Python Provider - Load Balancing (Azure#37692)
Browse files Browse the repository at this point in the history
* load balancing

* fix rotation bug + added tests

* code cleanup/docstrings

* Added Changelog, fixed bug where refreshing the clients resulted in a reshuffle even if nothing changed.

* adding missing telemetry

* Code Review comments

* formatting

* PR comments

* fixed logger, added client fail log

* fixing extra _

* fixing failover?

* Apply suggestions from code review

Co-authored-by: Avani Gupta <[email protected]>

* _load_balancing_enabled

* fixing other instances

* fixing _

* review comments

* Update _client_manager.py

* Update _async_client_manager.py

* reworked logic

* fixed tests and formatting

* fixing checking for clients

* Update _azureappconfigurationprovider.py

* Update _azureappconfigurationproviderasync.py

* Update _azureappconfigurationprovider.py

* Update _azureappconfigurationprovider.py

* Async changes to match sync

* fixing shuffle

* formatting

* Update sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_client_manager.py

Co-authored-by: Avani Gupta <[email protected]>

* Update sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_client_manager.py

Co-authored-by: Avani Gupta <[email protected]>

* get_next_active_client

* updating async with review changes

* move outside if

* fixing refresh

* Update sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_client_manager.py

Co-authored-by: Albert Ofori <[email protected]>

* always reshuffling

* review comments

* format fix

---------

Co-authored-by: Avani Gupta <[email protected]>
Co-authored-by: Albert Ofori <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent da9b5ff commit d90e4e4
Show file tree
Hide file tree
Showing 10 changed files with 787 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Breaking Changes

### Features Added

* Added support for load balancing between replicas.

### Bugs Fixed

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def load( # pylint: disable=docstring-keyword-should-match-keyword-only
feature_flag_enabled: bool = False,
feature_flag_selectors: Optional[List[SettingSelector]] = None,
feature_flag_refresh_enabled: bool = False,
**kwargs
**kwargs,
) -> "AzureAppConfigurationProvider":
"""
Loads configuration settings from Azure App Configuration into a Python application.
Expand Down Expand Up @@ -119,6 +119,9 @@ def load( # pylint: disable=docstring-keyword-should-match-keyword-only
:keyword replica_discovery_enabled: Optional flag to enable or disable the discovery of replica endpoints. Default
is True.
:paramtype replica_discovery_enabled: bool
:keyword load_balancing_enabled: Optional flag to enable or disable the load balancing of replica endpoints. Default
is False.
:paramtype load_balancing_enabled: bool
"""


Expand All @@ -139,7 +142,7 @@ def load( # pylint: disable=docstring-keyword-should-match-keyword-only
feature_flag_enabled: bool = False,
feature_flag_selectors: Optional[List[SettingSelector]] = None,
feature_flag_refresh_enabled: bool = False,
**kwargs
**kwargs,
) -> "AzureAppConfigurationProvider":
"""
Loads configuration settings from Azure App Configuration into a Python application.
Expand Down Expand Up @@ -179,6 +182,9 @@ def load( # pylint: disable=docstring-keyword-should-match-keyword-only
:keyword replica_discovery_enabled: Optional flag to enable or disable the discovery of replica endpoints. Default
is True.
:paramtype replica_discovery_enabled: bool
:keyword load_balancing_enabled: Optional flag to enable or disable the load balancing of replica endpoints. Default
is False.
:paramtype load_balancing_enabled: bool
"""


Expand Down Expand Up @@ -227,17 +233,9 @@ def load(*args, **kwargs) -> "AzureAppConfigurationProvider":
)

provider = _buildprovider(connection_string, endpoint, credential, uses_key_vault=uses_key_vault, **kwargs)
headers = _get_headers(
kwargs.pop("headers", {}),
"Startup",
provider._replica_client_manager.get_client_count() - 1, # pylint:disable=protected-access
provider._feature_flag_enabled, # pylint:disable=protected-access
provider._feature_filter_usage, # pylint:disable=protected-access
provider._uses_key_vault, # pylint:disable=protected-access
)

try:
provider._load_all(headers=headers) # pylint:disable=protected-access
provider._load_all() # pylint:disable=protected-access
except Exception as e:
_delay_failure(start_time)
raise e
Expand All @@ -253,7 +251,16 @@ def _delay_failure(start_time: datetime.datetime) -> None:
time.sleep((min_time - (current_time - start_time)).total_seconds())


def _get_headers(headers, request_type, replica_count, uses_feature_flags, feature_filters_used, uses_key_vault) -> str:
def _update_correlation_context_header(
headers,
request_type,
replica_count,
uses_feature_flags,
feature_filters_used,
uses_key_vault,
uses_load_balancing,
is_failover_request,
) -> Dict[str, str]:
if os.environ.get(REQUEST_TRACING_DISABLED_ENVIRONMENT_VARIABLE, default="").lower() == "true":
return headers
correlation_context = "RequestType=" + request_type
Expand Down Expand Up @@ -291,6 +298,12 @@ def _get_headers(headers, request_type, replica_count, uses_feature_flags, featu
if replica_count > 0:
correlation_context += ",ReplicaCount=" + str(replica_count)

if is_failover_request:
correlation_context += ",Failover"

if uses_load_balancing:
correlation_context += ",Features=LB"

headers["Correlation-Context"] = correlation_context
return headers

Expand Down Expand Up @@ -467,6 +480,8 @@ def __init__(self, **kwargs: Any) -> None:
min_backoff: int = min(kwargs.pop("min_backoff", 30), interval)
max_backoff: int = min(kwargs.pop("max_backoff", 600), interval)

self._uses_load_balancing = kwargs.pop("load_balancing_enabled", False)

self._replica_client_manager = ConfigurationClientManager(
connection_string=kwargs.pop("connection_string", None),
endpoint=endpoint,
Expand All @@ -477,7 +492,8 @@ def __init__(self, **kwargs: Any) -> None:
replica_discovery_enabled=kwargs.pop("replica_discovery_enabled", True),
min_backoff_sec=min_backoff,
max_backoff_sec=max_backoff,
**kwargs
load_balancing_enabled=self._uses_load_balancing,
**kwargs,
)
self._dict: Dict[str, Any] = {}
self._secret_clients: Dict[str, SecretClient] = {}
Expand Down Expand Up @@ -510,39 +526,44 @@ def __init__(self, **kwargs: Any) -> None:
self._update_lock = Lock()
self._refresh_lock = Lock()

def refresh(self, **kwargs) -> None:
def refresh(self, **kwargs) -> None: # pylint: disable=too-many-statements
if not self._refresh_on and not self._feature_flag_refresh_enabled:
logging.debug("Refresh called but no refresh enabled.")
logger.debug("Refresh called but no refresh enabled.")
return
if not self._refresh_timer.needs_refresh():
logging.debug("Refresh called but refresh interval not elapsed.")
logger.debug("Refresh called but refresh interval not elapsed.")
return
if not self._refresh_lock.acquire(blocking=False): # pylint: disable= consider-using-with
logging.debug("Refresh called but refresh already in progress.")
logger.debug("Refresh called but refresh already in progress.")
return
success = False
need_refresh = False
error_message = """
Failed to refresh configuration settings from Azure App Configuration.
"""
exception: Exception = RuntimeError(error_message)
is_failover_request = False
try:
self._replica_client_manager.refresh_clients()
active_clients = self._replica_client_manager.get_active_clients()
self._replica_client_manager.find_active_clients()
replica_count = self._replica_client_manager.get_client_count() - 1

while client := self._replica_client_manager.get_next_active_client():
headers = _update_correlation_context_header(
kwargs.pop("headers", {}),
"Watch",
replica_count,
self._feature_flag_enabled,
self._feature_filter_usage,
self._uses_key_vault,
self._uses_load_balancing,
is_failover_request,
)

headers = _get_headers(
kwargs.pop("headers", {}),
"Watch",
self._replica_client_manager.get_client_count() - 1,
self._feature_flag_enabled,
self._feature_filter_usage,
self._uses_key_vault,
)
for client in active_clients:
try:
if self._refresh_on:
need_refresh, self._refresh_on, configuration_settings = client.refresh_configuration_settings(
self._selects, self._refresh_on, headers, **kwargs
self._selects, self._refresh_on, headers=headers, **kwargs
)
configuration_settings_processed = {}
for config in configuration_settings:
Expand All @@ -556,11 +577,13 @@ def refresh(self, **kwargs) -> None:
if need_refresh:
self._dict = configuration_settings_processed
if self._feature_flag_refresh_enabled:
need_ff_refresh, self._refresh_on_feature_flags, feature_flags, filters_used = (
need_ff_refresh, refresh_on_feature_flags, feature_flags, filters_used = (
client.refresh_feature_flags(
self._refresh_on_feature_flags, self._feature_flag_selectors, headers, **kwargs
self._refresh_on_feature_flags, self._feature_flag_selectors, headers=headers, **kwargs
)
)
if refresh_on_feature_flags:
self._refresh_on_feature_flags = refresh_on_feature_flags
self._feature_filter_usage = filters_used

if need_refresh or need_ff_refresh:
Expand All @@ -572,8 +595,9 @@ def refresh(self, **kwargs) -> None:
break
except AzureError as e:
exception = e
logger.debug("Failed to refresh configurations from endpoint %s", client.endpoint)
self._replica_client_manager.backoff(client)

is_failover_request = True
if not success:
self._refresh_timer.backoff()
if self._on_refresh_error:
Expand All @@ -586,21 +610,35 @@ def refresh(self, **kwargs) -> None:
self._refresh_lock.release()

def _load_all(self, **kwargs):
active_clients = self._replica_client_manager.get_active_clients()
self._replica_client_manager.refresh_clients()
self._replica_client_manager.find_active_clients()
is_failover_request = False
replica_count = self._replica_client_manager.get_client_count() - 1

for client in active_clients:
while client := self._replica_client_manager.get_next_active_client():
headers = _update_correlation_context_header(
kwargs.pop("headers", {}),
"Startup",
replica_count,
self._feature_flag_enabled,
self._feature_filter_usage,
self._uses_key_vault,
self._uses_load_balancing,
is_failover_request,
)
try:
configuration_settings, sentinel_keys = client.load_configuration_settings(
self._selects, self._refresh_on, **kwargs
self._selects, self._refresh_on, headers=headers, **kwargs
)
configuration_settings_processed = {}
for config in configuration_settings:
key = self._process_key_name(config)
value = self._process_key_value(config)
configuration_settings_processed[key] = value

if self._feature_flag_enabled:
feature_flags, feature_flag_sentinel_keys, used_filters = client.load_feature_flags(
self._feature_flag_selectors, self._feature_flag_refresh_enabled, **kwargs
self._feature_flag_selectors, self._feature_flag_refresh_enabled, headers=headers, **kwargs
)
self._feature_filter_usage = used_filters
configuration_settings_processed[FEATURE_MANAGEMENT_KEY] = {}
Expand All @@ -609,13 +647,12 @@ def _load_all(self, **kwargs):
for (key, label), etag in self._refresh_on.items():
if not etag:
try:
headers = kwargs.get("headers", {})
sentinel = client.get_configuration_setting(key, label, headers=headers) # type:ignore
self._refresh_on[(key, label)] = sentinel.etag # type:ignore
except HttpResponseError as e:
if e.status_code == 404:
# If the sentinel is not found a refresh should be triggered when it is created.
logging.debug(
logger.debug(
"""
WatchKey key: %s label %s was configured but not found. Refresh will be triggered
if created.
Expand All @@ -631,7 +668,9 @@ def _load_all(self, **kwargs):
self._dict = configuration_settings_processed
return
except AzureError:
logger.debug("Failed to refresh configurations from endpoint %s", client.endpoint)
self._replica_client_manager.backoff(client)
is_failover_request = True
raise RuntimeError(
"Failed to load configuration settings. No Azure App Configuration stores successfully loaded from."
)
Expand Down
Loading

0 comments on commit d90e4e4

Please sign in to comment.