From b38790b28a964beccd6395a65d1884fba0eff85f Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Thu, 7 Dec 2023 19:02:58 +0000 Subject: [PATCH 1/6] New Group Client --- brewtils/rest/group_client.py | 570 ++++++++++++++++++++++++++++++++++ 1 file changed, 570 insertions(+) create mode 100644 brewtils/rest/group_client.py diff --git a/brewtils/rest/group_client.py b/brewtils/rest/group_client.py new file mode 100644 index 00000000..cd96a670 --- /dev/null +++ b/brewtils/rest/group_client.py @@ -0,0 +1,570 @@ +# -*- coding: utf-8 -*- +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from functools import partial +from multiprocessing import cpu_count +from typing import Any, Dict, Iterable, Optional + +from packaging.version import InvalidVersion, parse + +import brewtils.plugin +from brewtils.errors import ( + FetchError, + RequestFailedError, + RequestProcessException, + TimeoutExceededError, + ValidationError, + _deprecate, +) +from brewtils.models import Request, System +from brewtils.resolvers.manager import ResolutionManager +from brewtils.rest.easy_client import EasyClient + + +class GroupClient(object): + """High-level client for generating requests for a Beer-garden System. + + GroupClient creation: + This class is intended to be the main way to create groups of Beer-garden requests. Create + an instance with Beer-garden connection information and a group:: + + client = GroupClient( + group=`GroupA`, + bg_host="host", + bg_port=2337, + ) + + Note: Passing an empty string as the system_namespace parameter will evalutate + to the local garden's default namespace. + + Pass additional keyword arguments for more granularity: + + version_constraint: + Allows specifying a particular system version. Can be a version literal + ('1.0.0') or the special value 'latest.' Using 'latest' will allow the + SystemClient to retry a request if it fails due to a missing system + (see Creating Requests). + + default_instance: + The instance name to use when creating a request if no other instance + name is specified. Since each request must be addressed to a specific + instance this is a convenience to prevent needing to specify the + instance for each request. + + system_name: + If + + system_version: + If + + system_namespaces: + If the targeted system is stateless and if a collection of systems could + handle the Request. This will allow the plugin to round robin the requests + to each namespace to help load balance the requests. It will rotate per + Request to the target system. + + Loading the System: + The System definition is lazily loaded, so nothing happens until the first + attempt to send a Request. At that point the GroupClient will query Beer-garden + to get the system definitions that matches the group. If no matching System can + be found a FetchError will be raised. + + Making a Request: + The standard way to create and send requests is by calling object attributes:: + + futures = client.example_command(param_1='example_param') + for future in as_completed(futures): + completed_request = future.result() + + If the request creation process fails (e.g. the command failed validation) and + version_constraint is 'latest' then the GroupClient will check to see if a + newer version is available, and if so it will attempt to make the request on + that version. This is so users of the GroupClient that don't necessarily care + about the target system version don't need to be restarted every time the target + system is updated. + + It's also possible to control what happens when a Request results in an ERROR. + If the ``raise_on_error`` parameter is set to False (the default) then Requests + that are not successful simply result in a Request with a status of ERROR, and + it is the plugin developer's responsibility to check for this case. However, if + ``raise_on_error`` is set to True then this will result in a + ``RequestFailedError`` being raised. This will happen regardless of the value + of the ``blocking`` flag. + + Tweaking Beer-garden Request Parameters: + There are several parameters that control how beer-garden routes / processes a + request. To denote these as intended for Beer-garden itself (rather than a + parameter to be passed to the Plugin) prepend a leading underscore to the + argument name. + + Sending to another instance:: + + request = client.example_command( + _instance_name="instance_2", param_1="example_param" + ) + + Request with a comment:: + + request = client.example_command( + _comment="I'm a beer-garden comment!", param_1="example_param" + ) + + Without the leading underscore the arguments would be treated the same as + "param_1" - another parameter to be passed to the plugin. + + Request that raises:: + + client = GroupClient( + group="Group" + bg_host="localhost", + bg_port=2337, + ) + + try: + client.command_that_errors(_raise_on_error=True) + except RequestFailedError: + print("I could have just ignored this") + + Args: + group (str): Name of the Group to make the Request on + system_name (str): Name of the System to make Requests on + system_namespace (str): Namespace of the System to make Requests on + system_namespaces (list): Namespaces of the System to round robin Requests to. + The target System should be stateless. + version_constraint (str): System version to make Requests on. Can be specific + ('1.0.0') or 'latest'. + default_instance (str): Name of the Instance to make Requests on + timeout (int): Seconds to wait for a request to complete. 'None' means wait + forever. + max_delay (int): Maximum number of seconds to wait between status checks for a + created request + blocking (bool): Flag indicating whether creation will block until the Request + is complete or return a Future that will complete when the Request does + max_concurrent (int): Maximum number of concurrent requests allowed. + Only has an effect when blocking=False. + raise_on_error (bool): Flag controlling whether created Requests that complete + with an ERROR state should raise an exception + + bg_host (str): Beer-garden hostname + bg_port (int): Beer-garden port + bg_url_prefix (str): URL path that will be used as a prefix when communicating + with Beer-garden. Useful if Beer-garden is running on a URL other than '/'. + ssl_enabled (bool): Whether to use SSL for Beer-garden communication + ca_cert (str): Path to certificate file containing the certificate of the + authority that issued the Beer-garden server certificate + ca_verify (bool): Whether to verify Beer-garden server certificate + client_cert (str): Path to client certificate to use when communicating with + Beer-garden + api_version (int): Beer-garden API version to use + client_timeout (int): Max time to wait for Beer-garden server response + username (str): Username for Beer-garden authentication + password (str): Password for Beer-garden authentication + access_token (str): Access token for Beer-garden authentication + refresh_token (str): Refresh token for Beer-garden authentication + """ + + def __init__(self, *args, **kwargs): + self._logger = logging.getLogger(__name__) + + self._systems = None + self._commands = {} + + # Now need to determine if the intended target is the current running plugin. + # Start by ensuring there's a valid Plugin context active + target_self = bool(brewtils.plugin.CONFIG) + + # If ANY of the target specification arguments don't match the current plugin + # then the target is different + config_map = { + "group": "group" + "system_name": "name", + "version_constraint": "version", + "default_instance": "instance_name", + "system_namespace": "namespace", + } + + self._system_name = kwargs.get("system_name") + self._version_constraint = kwargs.get("version_constraint", "latest") + self._default_instance = kwargs.get("default_instance", "default") + self._system_namespace = kwargs.get( + "system_namespace", brewtils.plugin.CONFIG.namespace or "" + ) + self._system_namespaces = kwargs.get("system_namespaces", []) + + # if both system namespaces are defined, combine the inputs + if len(self._system_namespaces) > 0: + if kwargs.get("system_namespace", None): + if self._system_namespace not in self._system_namespaces: + self._system_namespaces.append(self._system_namespace) + else: + self._system_namespaces = [self._system_namespace] + + + self._always_update = kwargs.get("always_update", False) + self._timeout = kwargs.get("timeout", None) + self._max_delay = kwargs.get("max_delay", 30) + self._blocking = kwargs.get("blocking", True) + self._raise_on_error = kwargs.get("raise_on_error", False) + + # This is for Python 3.4 compatibility - max_workers MUST be non-None + # in that version. This logic is what was added in Python 3.5 + max_concurrent = kwargs.get("max_concurrent", (cpu_count() or 1) * 5) + self._thread_pool = ThreadPoolExecutor(max_workers=max_concurrent) + + # This points DeprecationWarnings at the right line + kwargs.setdefault("stacklevel", 5) + + self._easy_client = EasyClient(*args, **kwargs) + self._resolver = ResolutionManager(easy_client=self._easy_client) + + def __getattr__(self, item): + # type: (str) -> partial + """Standard way to create and send beer-garden requests""" + return self.create_bg_request(item) + + def __str__(self): + return "%s[%s]" % (self.bg_system, self.bg_default_instance) + + @property + def bg_system(self): + return self._system + + @property + def bg_default_instance(self): + return self._default_instance + + + def create_bg_request(self, command_name, **kwargs): + # type: (str, **Any) -> partial + """Create a callable that will execute a Beer-garden request when called. + + Normally you interact with the SystemClient by accessing attributes, but there + could be certain cases where you want to create a request without sending it. + + Example:: + + client = SystemClient(host, port, 'system', blocking=False) + + # Create two callables - one with a parameter and one without + uncreated_requests = [ + client.create_bg_request('command_1', arg_1='Hi!'), + client.create_bg_request('command_2'), + ] + + # Calling creates and sends the request + # The result of each is a future because blocking=False on the SystemClient + futures = [req() for req in uncreated_requests] + + # Wait for all the futures to complete + concurrent.futures.wait(futures) + + Args: + command_name (str): Name of the Command to send + kwargs (dict): Will be passed as parameters when creating the Request + + Returns: + Partial that will create and execute a Beer-garden request when called + + Raises: + AttributeError: System does not have a Command with the given command_name + """ + + self._rotate_namespace() + + if not self._loaded or self._always_update: + self.load_bg_system() + + future_requests = {} + + for system in self.load_bg_systems(): + if command_name in system.commands: + future_requests.update( + { + self._thread_pool.submit( + self.send_bg_request, + _command=command_name, + _system_name=system.name, + _system_namespace=system.namespace, + _system_version=system.version, + _system_display=system.display_name, + _output_type=system.commands[command_name].output_type, + _instance_name=self._default_instance, + **kwargs + ) : { + "system": system, + "command_name": command_name, + } + } + ) + else: + raise AttributeError( + "System '%s' has no command named '%s'" % (self._system, command_name) + ) + + return future_requests + + def send_bg_request(self, *args, **kwargs): + """Actually create a Request and send it to Beer-garden + + .. note:: + This method is intended for advanced use only, mainly cases where you're + using the GroupClient without a predefined System. It assumes that + everything needed to construct the request is being passed in ``kwargs``. If + this doesn't sound like what you want you should check out + ``create_bg_request``. + + Args: + args (list): Unused. Passing positional parameters indicates a bug + kwargs (dict): All necessary request parameters, including Beer-garden + internal parameters + + Returns: + A completed Request object + + Raises: + ValidationError: Request creation failed validation on the server + """ + # First, if any positional args were given that's a bug, as it means someone + # tried to pass a parameter without a key: + # client.command_name(param) + if args: + raise RequestProcessException( + "Using positional arguments when creating a request is not allowed. " + "Please use keyword arguments instead." + ) + + # Need to pop here, otherwise we'll try to send as a request parameter + raise_on_error = kwargs.pop("_raise_on_error", self._raise_on_error) + timeout = kwargs.pop("_timeout", self._timeout) + + request = self._construct_bg_request(**kwargs) + request = self._easy_client.create_request( + request, blocking=blocking, timeout=timeout + ) + + return self._wait_for_request(request, raise_on_error, timeout) + + def load_bg_systems(self): + # type: () -> None + """Query beer-garden for a System definition + + This method will make the query to beer-garden for a System matching the name + and version constraints specified during SystemClient instance creation. + + If this method completes successfully the SystemClient will be ready to create + and send Requests. + + Returns: + None + + Raises: + FetchError: Unable to find a matching System + """ + self._system = [] + + for namespace in self._system_namespaces: + filter_kwargs = { + "groups": [self._group] + } + + if self._system_name: + filter_kwargs["name"] = self._system_name + + if self._system_name: + filter_kwargs["namespace"] = namespace + + if self._version_constraint and self._version_constraint != "latest" + filter_kwargs["version"] = self._version_constraint + + if self._version_constraint == "latest": + self._system.extend(_determine_latest_groups(self._easy_client.find_systems(**filter_kwargs))) + else: + self._system.extend(self._easy_client.find_systems(**filter_kwargs)) + + + if len(self._systems) == 0: + raise FetchError( + "Beer-garden has no system group %s with named '%s' or a version matching '%s' in " + "namespace '%s'" + % ( + self._group, + self._system_name, + self._version_constraint, + self._system_namespace + if self._system_namespaces + else "", + ) + ) + + def _wait_for_request(self, request, raise_on_error, timeout): + # type: (Request, bool, int) -> Request + """Poll the server until the request is completed or errors""" + + delay_time = 0.5 + total_wait_time = 0 + while request.status not in Request.COMPLETED_STATUSES: + if timeout and 0 < timeout < total_wait_time: + raise TimeoutExceededError( + "Timeout waiting for request '%s' to complete" % str(request) + ) + + time.sleep(delay_time) + total_wait_time += delay_time + delay_time = min(delay_time * 2, self._max_delay) + + request = self._easy_client.find_unique_request(id=request.id) + + if raise_on_error and request.status == "ERROR": + raise RequestFailedError(request) + + return request + + def _get_parent_for_request(self): + # type: () -> Optional[Request] + parent = getattr(brewtils.plugin.request_context, "current_request", None) + if parent is None: + return None + + if brewtils.plugin.CONFIG and ( + brewtils.plugin.CONFIG.bg_host.upper() + != self._easy_client.client.bg_host.upper() + or brewtils.plugin.CONFIG.bg_port != self._easy_client.client.bg_port + ): + self._logger.warning( + "A parent request was found, but the destination beer-garden " + "appears to be different than the beer-garden to which this plugin " + "is assigned. Cross-server parent/child requests are not supported " + "at this time. Removing the parent context so the request doesn't fail." + ) + return None + + return Request(id=str(parent.id)) + + def _construct_bg_request(self, **kwargs): + # type: (**Any) -> Request + """Create a request that can be used with EasyClient.create_request""" + + command = kwargs.pop("_command", None) + system_name = kwargs.pop("_system_name", None) + system_version = kwargs.pop("_system_version", None) + system_display = kwargs.pop("_system_display", None) + system_namespace = kwargs.pop("_system_namespace", None) + instance_name = kwargs.pop("_instance_name", None) + comment = kwargs.pop("_comment", None) + output_type = kwargs.pop("_output_type", None) + metadata = kwargs.pop("_metadata", {}) + parent = kwargs.pop("_parent", self._get_parent_for_request()) + publish = kwargs.pop("_publish", None) + topic = kwargs.pop("_topic", None) + propagate = kwargs.pop("_propagate", None) + + if system_display: + metadata["system_display_name"] = system_display + if publish: + metadata["_publish"] = publish + if topic: + metadata["_topic"] = topic + if propagate: + metadata["_propagate"] = propagate + + # Don't check namespace - https://github.com/beer-garden/beer-garden/issues/827 + if command is None: + raise ValidationError("Unable to send a request with no command") + if system_name is None: + raise ValidationError("Unable to send a request with no system name") + if system_version is None: + raise ValidationError("Unable to send a request with no system version") + if instance_name is None: + raise ValidationError("Unable to send a request with no instance name") + + request = Request( + command=command, + system=system_name, + system_version=system_version, + namespace=system_namespace, + instance_name=instance_name, + comment=comment, + output_type=output_type, + parent=parent, + metadata=metadata, + parameters=kwargs, + ) + + request.parameters = self._resolve_parameters(command, request) + + return request + + def _resolve_parameters(self, command, request): + # type: (str, Request) -> Dict[str, Any] + """Attempt to upload any necessary file parameters + + This will inspect the Command model for the given command, looking for file + parameters. Any file parameters will be "resolved" (aka uploaded) before + continuing. + + If the command name can not be found in the current list of commands the + parameter list will just be returned. This most likely indicates a direct + invocation of send_bg_request since a bad command name should be caught earlier + in the "normal" workflow. + """ + if command not in self._commands: + return request.parameters + + return self._resolver.resolve( + request.parameters, self._commands[command].parameters, upload=True + ) + + @staticmethod + def _determine_latest_groups(systems): + # type: (Iterable[System]) -> Iterable[System] + """Returns the list of system with the latest version from the provided list of Systems. + Any version adhering to PEP440 is treated as "later" than a version that does + not adhere to that standard. + """ + unique_systems = {} + + for system in systems: + key = f"{system.namespace}-{system.name}" + if key not in unique_systems: + unique_systems[key] = [system] + else: + unique_systems[key].append(system) + + latest_systems = [] + + for key in unique_systems: + if len(unique_systems[key]) == 1: + latest_systems.append(unique_systems[key][0]) + else: + latest_systems.append(_determine_latest(unique_systems[key])) + + return latest_systems + + + @staticmethod + def _determine_latest(systems): + # type: (Iterable[System]) -> Optional[System] + """Returns the system with the latest version from the provided list of Systems. + Any version adhering to PEP440 is treated as "later" than a version that does + not adhere to that standard. + """ + versions = [] + legacy_versions = [] + system_versions_map = {} + + for system in systems: + try: + versions.append(parse(system.version)) + system_versions_map[str(parse(system.version))] = system + except InvalidVersion: + legacy_versions.append(system.version) + system_versions_map[system.version] = system + + eligible_versions = versions if versions else legacy_versions + + if eligible_versions: + latest_version = sorted(eligible_versions, reverse=True)[0] + return system_versions_map.get(str(latest_version)) + else: + return None From a2ac54e5dbd643bc2ac5a7461153acf42f187ae6 Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:15:18 +0000 Subject: [PATCH 2/6] Providing Blocking support --- brewtils/rest/group_client.py | 59 +++++++++++++++-------------------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/brewtils/rest/group_client.py b/brewtils/rest/group_client.py index cd96a670..1e59490e 100644 --- a/brewtils/rest/group_client.py +++ b/brewtils/rest/group_client.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import logging import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial from multiprocessing import cpu_count from typing import Any, Dict, Iterable, Optional @@ -53,16 +53,18 @@ class GroupClient(object): instance for each request. system_name: - If + The system name to use when creaitng the request, if none is provided, + then this field is treated as a wild card. system_version: - If + The system version to use when creaitng the request, if none is provided, + then this field is treated as a wild card. system_namespaces: If the targeted system is stateless and if a collection of systems could - handle the Request. This will allow the plugin to round robin the requests - to each namespace to help load balance the requests. It will rotate per - Request to the target system. + handle the Request. This will allow the plugin to broadcast the request to + all namespaces. If none is provided, then this field is treated as a wild + card. Loading the System: The System definition is lazily loaded, so nothing happens until the first @@ -73,9 +75,10 @@ class GroupClient(object): Making a Request: The standard way to create and send requests is by calling object attributes:: - futures = client.example_command(param_1='example_param') - for future in as_completed(futures): - completed_request = future.result() + results = client.example_command(param_1='example_param') + for result in results: + if result.status == "SUCCESS": + payload = result.output If the request creation process fails (e.g. the command failed validation) and version_constraint is 'latest' then the GroupClient will check to see if a @@ -100,13 +103,13 @@ class GroupClient(object): Sending to another instance:: - request = client.example_command( + requests = group_client.example_command( _instance_name="instance_2", param_1="example_param" ) Request with a comment:: - request = client.example_command( + requests = group_client.example_command( _comment="I'm a beer-garden comment!", param_1="example_param" ) @@ -170,20 +173,6 @@ def __init__(self, *args, **kwargs): self._systems = None self._commands = {} - # Now need to determine if the intended target is the current running plugin. - # Start by ensuring there's a valid Plugin context active - target_self = bool(brewtils.plugin.CONFIG) - - # If ANY of the target specification arguments don't match the current plugin - # then the target is different - config_map = { - "group": "group" - "system_name": "name", - "version_constraint": "version", - "default_instance": "instance_name", - "system_namespace": "namespace", - } - self._system_name = kwargs.get("system_name") self._version_constraint = kwargs.get("version_constraint", "latest") self._default_instance = kwargs.get("default_instance", "default") @@ -270,10 +259,7 @@ def create_bg_request(self, command_name, **kwargs): AttributeError: System does not have a Command with the given command_name """ - self._rotate_namespace() - - if not self._loaded or self._always_update: - self.load_bg_system() + self.load_bg_system() future_requests = {} @@ -301,8 +287,15 @@ def create_bg_request(self, command_name, **kwargs): raise AttributeError( "System '%s' has no command named '%s'" % (self._system, command_name) ) + + if not self.blocking: + return future_requests + + results = [] + for future in as_completed(future_requests): + results.append(future.result()) - return future_requests + return results def send_bg_request(self, *args, **kwargs): """Actually create a Request and send it to Beer-garden @@ -340,7 +333,7 @@ def send_bg_request(self, *args, **kwargs): request = self._construct_bg_request(**kwargs) request = self._easy_client.create_request( - request, blocking=blocking, timeout=timeout + request, blocking=self.blocking, timeout=timeout ) return self._wait_for_request(request, raise_on_error, timeout) @@ -374,11 +367,11 @@ def load_bg_systems(self): if self._system_name: filter_kwargs["namespace"] = namespace - if self._version_constraint and self._version_constraint != "latest" + if self._version_constraint and self._version_constraint != "latest": filter_kwargs["version"] = self._version_constraint if self._version_constraint == "latest": - self._system.extend(_determine_latest_groups(self._easy_client.find_systems(**filter_kwargs))) + self._system.extend(self._determine_latest_groups(self._easy_client.find_systems(**filter_kwargs))) else: self._system.extend(self._easy_client.find_systems(**filter_kwargs)) From ac91708eadae0ce34b353593d1fea7b6c7dd8207 Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:35:00 +0000 Subject: [PATCH 3/6] Updating docs --- brewtils/rest/group_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/brewtils/rest/group_client.py b/brewtils/rest/group_client.py index 1e59490e..19f28ba4 100644 --- a/brewtils/rest/group_client.py +++ b/brewtils/rest/group_client.py @@ -43,7 +43,7 @@ class GroupClient(object): version_constraint: Allows specifying a particular system version. Can be a version literal ('1.0.0') or the special value 'latest.' Using 'latest' will allow the - SystemClient to retry a request if it fails due to a missing system + GroupClient to retry a request if it fails due to a missing system (see Creating Requests). default_instance: @@ -228,12 +228,12 @@ def create_bg_request(self, command_name, **kwargs): # type: (str, **Any) -> partial """Create a callable that will execute a Beer-garden request when called. - Normally you interact with the SystemClient by accessing attributes, but there + Normally you interact with the GroupClient by accessing attributes, but there could be certain cases where you want to create a request without sending it. Example:: - client = SystemClient(host, port, 'system', blocking=False) + client = GroupClient(host, port, 'system', blocking=False) # Create two callables - one with a parameter and one without uncreated_requests = [ @@ -242,7 +242,7 @@ def create_bg_request(self, command_name, **kwargs): ] # Calling creates and sends the request - # The result of each is a future because blocking=False on the SystemClient + # The result of each is a future because blocking=False on the GroupClient futures = [req() for req in uncreated_requests] # Wait for all the futures to complete @@ -343,9 +343,9 @@ def load_bg_systems(self): """Query beer-garden for a System definition This method will make the query to beer-garden for a System matching the name - and version constraints specified during SystemClient instance creation. + and version constraints specified during GroupClient instance creation. - If this method completes successfully the SystemClient will be ready to create + If this method completes successfully the GroupClient will be ready to create and send Requests. Returns: From 935f5a3ec03caa9ce5908d60726a01a6fef7ae04 Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:39:07 +0000 Subject: [PATCH 4/6] Update docs and function paths --- brewtils/rest/group_client.py | 58 +++++++++++++++++------------------ 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/brewtils/rest/group_client.py b/brewtils/rest/group_client.py index 19f28ba4..d5814c42 100644 --- a/brewtils/rest/group_client.py +++ b/brewtils/rest/group_client.py @@ -342,7 +342,7 @@ def load_bg_systems(self): # type: () -> None """Query beer-garden for a System definition - This method will make the query to beer-garden for a System matching the name + This method will make the query to beer-garden for Systems matching the group, name and version constraints specified during GroupClient instance creation. If this method completes successfully the GroupClient will be ready to create @@ -508,35 +508,8 @@ def _resolve_parameters(self, command, request): request.parameters, self._commands[command].parameters, upload=True ) - @staticmethod - def _determine_latest_groups(systems): - # type: (Iterable[System]) -> Iterable[System] - """Returns the list of system with the latest version from the provided list of Systems. - Any version adhering to PEP440 is treated as "later" than a version that does - not adhere to that standard. - """ - unique_systems = {} - - for system in systems: - key = f"{system.namespace}-{system.name}" - if key not in unique_systems: - unique_systems[key] = [system] - else: - unique_systems[key].append(system) - - latest_systems = [] - - for key in unique_systems: - if len(unique_systems[key]) == 1: - latest_systems.append(unique_systems[key][0]) - else: - latest_systems.append(_determine_latest(unique_systems[key])) - - return latest_systems - - - @staticmethod - def _determine_latest(systems): + + def _determine_latest(self, systems): # type: (Iterable[System]) -> Optional[System] """Returns the system with the latest version from the provided list of Systems. Any version adhering to PEP440 is treated as "later" than a version that does @@ -561,3 +534,28 @@ def _determine_latest(systems): return system_versions_map.get(str(latest_version)) else: return None + + def _determine_latest_groups(self, systems): + # type: (Iterable[System]) -> Iterable[System] + """Returns the list of system with the latest version from the provided list of Systems. + Any version adhering to PEP440 is treated as "later" than a version that does + not adhere to that standard. + """ + unique_systems = {} + + for system in systems: + key = f"{system.namespace}-{system.name}" + if key not in unique_systems: + unique_systems[key] = [system] + else: + unique_systems[key].append(system) + + latest_systems = [] + + for key in unique_systems: + if len(unique_systems[key]) == 1: + latest_systems.append(unique_systems[key][0]) + else: + latest_systems.append(self._determine_latest(unique_systems[key])) + + return latest_systems \ No newline at end of file From cab72d11ed2ee13de8162912628557713c7f0cc7 Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:39:37 +0000 Subject: [PATCH 5/6] formatting --- brewtils/rest/group_client.py | 37 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/brewtils/rest/group_client.py b/brewtils/rest/group_client.py index d5814c42..7038b2d1 100644 --- a/brewtils/rest/group_client.py +++ b/brewtils/rest/group_client.py @@ -55,7 +55,7 @@ class GroupClient(object): system_name: The system name to use when creaitng the request, if none is provided, then this field is treated as a wild card. - + system_version: The system version to use when creaitng the request, if none is provided, then this field is treated as a wild card. @@ -63,13 +63,13 @@ class GroupClient(object): system_namespaces: If the targeted system is stateless and if a collection of systems could handle the Request. This will allow the plugin to broadcast the request to - all namespaces. If none is provided, then this field is treated as a wild + all namespaces. If none is provided, then this field is treated as a wild card. Loading the System: The System definition is lazily loaded, so nothing happens until the first attempt to send a Request. At that point the GroupClient will query Beer-garden - to get the system definitions that matches the group. If no matching System can + to get the system definitions that matches the group. If no matching System can be found a FetchError will be raised. Making a Request: @@ -85,7 +85,7 @@ class GroupClient(object): newer version is available, and if so it will attempt to make the request on that version. This is so users of the GroupClient that don't necessarily care about the target system version don't need to be restarted every time the target - system is updated. + system is updated. It's also possible to control what happens when a Request results in an ERROR. If the ``raise_on_error`` parameter is set to False (the default) then Requests @@ -189,7 +189,6 @@ def __init__(self, *args, **kwargs): else: self._system_namespaces = [self._system_namespace] - self._always_update = kwargs.get("always_update", False) self._timeout = kwargs.get("timeout", None) self._max_delay = kwargs.get("max_delay", 30) @@ -223,7 +222,6 @@ def bg_system(self): def bg_default_instance(self): return self._default_instance - def create_bg_request(self, command_name, **kwargs): # type: (str, **Any) -> partial """Create a callable that will execute a Beer-garden request when called. @@ -276,8 +274,8 @@ def create_bg_request(self, command_name, **kwargs): _system_display=system.display_name, _output_type=system.commands[command_name].output_type, _instance_name=self._default_instance, - **kwargs - ) : { + **kwargs, + ): { "system": system, "command_name": command_name, } @@ -285,15 +283,16 @@ def create_bg_request(self, command_name, **kwargs): ) else: raise AttributeError( - "System '%s' has no command named '%s'" % (self._system, command_name) + "System '%s' has no command named '%s'" + % (self._system, command_name) ) - + if not self.blocking: return future_requests - + results = [] for future in as_completed(future_requests): - results.append(future.result()) + results.append(future.result()) return results @@ -357,9 +356,7 @@ def load_bg_systems(self): self._system = [] for namespace in self._system_namespaces: - filter_kwargs = { - "groups": [self._group] - } + filter_kwargs = {"groups": [self._group]} if self._system_name: filter_kwargs["name"] = self._system_name @@ -371,11 +368,14 @@ def load_bg_systems(self): filter_kwargs["version"] = self._version_constraint if self._version_constraint == "latest": - self._system.extend(self._determine_latest_groups(self._easy_client.find_systems(**filter_kwargs))) + self._system.extend( + self._determine_latest_groups( + self._easy_client.find_systems(**filter_kwargs) + ) + ) else: self._system.extend(self._easy_client.find_systems(**filter_kwargs)) - if len(self._systems) == 0: raise FetchError( "Beer-garden has no system group %s with named '%s' or a version matching '%s' in " @@ -508,7 +508,6 @@ def _resolve_parameters(self, command, request): request.parameters, self._commands[command].parameters, upload=True ) - def _determine_latest(self, systems): # type: (Iterable[System]) -> Optional[System] """Returns the system with the latest version from the provided list of Systems. @@ -558,4 +557,4 @@ def _determine_latest_groups(self, systems): else: latest_systems.append(self._determine_latest(unique_systems[key])) - return latest_systems \ No newline at end of file + return latest_systems From 15ac90dab58287c208b4c2b96a18f4b6d32b1710 Mon Sep 17 00:00:00 2001 From: TheBurchLog <5104941+TheBurchLog@users.noreply.github.com> Date: Fri, 15 Dec 2023 18:03:28 +0000 Subject: [PATCH 6/6] Change log --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0c10d0c8..f437b20e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,7 @@ TBD - Add support to change the Exchange Type for RabbitMQ. Default is 'topic', but options like 'fanout' can now be supported - Better handling of Pika errors +- Add GroupClient to support tasking Systems that are "grouped" together through the System/Client annotation. 3.22.0 ------