diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 58827478..0f79a089 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,7 +1,7 @@ [bumpversion] -current_version = 1.0.2 -commit = True -tag = True +current_version = 1.0.3 +commit = False +tag = False search = {current_version} [bumpversion:file:setup.cfg] diff --git a/.github/actions/image-test/action.yml b/.github/actions/image-test/action.yml new file mode 100644 index 00000000..a01ede5d --- /dev/null +++ b/.github/actions/image-test/action.yml @@ -0,0 +1,23 @@ +name: build-and-test-image +description: Build and test the container image + +runs: + using: composite + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Build container image + uses: docker/build-push-action@v4 + with: + context: . + platforms: linux/amd64 + tags: localhost/ansible-rulebook:test + load: true + + - name: Run tests + shell: bash + run: > + docker run --rm -u 0 localhost/ansible-rulebook:test bash -c ' + pip install -r requirements_test.txt && + pytest -m "e2e" -n auto' diff --git a/.github/workflows/build-image.yml b/.github/workflows/build-image.yml index 8c3aebac..204c0e59 100644 --- a/.github/workflows/build-image.yml +++ b/.github/workflows/build-image.yml @@ -21,14 +21,8 @@ jobs: - name: Checkout repository uses: actions/checkout@v3 - - name: Build local image - run: docker build -t localhost/ansible-rulebook:test . - - - name: Run tests - run: > - docker run --rm -u 0 localhost/ansible-rulebook:test bash -c ' - pip install -r requirements_test.txt && - pytest -m "e2e" -n auto' + - name: Build and test the container image + uses: ./.github/actions/image-test build-and-push-image: needs: build-and-test-image diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 783dfd4e..438755aa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,3 +79,13 @@ jobs: flags: "unittests-${{ matrix.python-version }}" name: codecov-umbrella verbose: true + + build-and-test-image: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Build and test the container image + uses: ./.github/actions/image-test diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3d181701..36b2d2e2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,4 +36,5 @@ repos: rev: v9.5.0 hooks: - id: commitlint + additional_dependencies: ['@commitlint/config-conventional'] stages: [commit-msg] diff --git a/CHANGELOG.md b/CHANGELOG.md index 62e5e72e..8b1fdf17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,22 @@ ## [Unreleased] ### Added -- support for firing multiple rules ### Fixed +- Job_template and workflow_template actions honor custom hosts limit ### Removed +## [1.0.3] - 2023-10-17 + +### Added +- support for firing multiple rules + +### Fixed +- bug fix in run_workflow_template + + +### Removed ## [1.0.2] - 2023-08-14 ### Added diff --git a/ansible_rulebook/__init__.py b/ansible_rulebook/__init__.py index 52d9cdd2..fb5cd4bb 100644 --- a/ansible_rulebook/__init__.py +++ b/ansible_rulebook/__init__.py @@ -14,4 +14,4 @@ """Top-level package for Ansible Events.""" -__version__ = "1.0.2" +__version__ = "1.0.3" diff --git a/ansible_rulebook/action/__init__.py b/ansible_rulebook/action/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ansible_rulebook/action/control.py b/ansible_rulebook/action/control.py new file mode 100644 index 00000000..8e701a4f --- /dev/null +++ b/ansible_rulebook/action/control.py @@ -0,0 +1,54 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from dataclasses import dataclass +from typing import List + + +@dataclass(frozen=True) +class Control: + """Control information when running an action + + Attributes: + queue: asyncio.Queue + This is the queue on which we would be sending action status + periodically when the action is running + inventory: str + This is the inventory information from the command line + It currently is the data that is read from a file, in the future + it could be a directory or an inventory name from the controller + hosts: list[str] + The list of servers passed into ansible-playbook or controller + variables: dict + The variables passed in from the command line plus the matching event + data with event or events key. + project_data_file: str + This is the directory where the collection data is sent from the + AAP server over the websocket is untarred to. The collection could + contain the playbook that is used in the run_playbook action. + """ + + __slots__ = [ + "queue", + "inventory", + "hosts", + "variables", + "project_data_file", + ] + queue: asyncio.Queue + inventory: str + hosts: List[str] + variables: dict + project_data_file: str diff --git a/ansible_rulebook/action/debug.py b/ansible_rulebook/action/debug.py new file mode 100644 index 00000000..ba2b0d4d --- /dev/null +++ b/ansible_rulebook/action/debug.py @@ -0,0 +1,83 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +from dataclasses import asdict +from pprint import pprint + +import dpath +from drools import ruleset as lang + +from ansible_rulebook.util import get_horizontal_rule + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class Debug: + """The debug action tries to mimic the ansible debug task with optional + msg: Prints a message + var: Prints a variable + default: print the metadata, control information and facts from the + rule engine + At the end we send back the action status + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "debug") + self.action_args = action_args + + async def __call__(self): + if "msg" in self.action_args: + messages = self.action_args.get("msg") + if not isinstance(messages, list): + messages = [messages] + for msg in messages: + print(msg) + elif "var" in self.action_args: + key = self.action_args.get("var") + try: + print( + dpath.get( + self.helper.control.variables, key, separator="." + ) + ) + except KeyError: + logger.error("Key %s not found in variable pool", key) + raise + else: + print(get_horizontal_rule("=")) + print("kwargs:") + args = asdict(self.helper.metadata) + project_data_file = self.helper.control.project_data_file + args.update( + { + "inventory": self.helper.control.inventory, + "hosts": self.helper.control.hosts, + "variables": self.helper.control.variables, + "project_data_file": project_data_file, + } + ) + pprint(args) + print(get_horizontal_rule("=")) + print("facts:") + pprint(lang.get_facts(self.helper.metadata.rule_set)) + print(get_horizontal_rule("=")) + + sys.stdout.flush() + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/helper.py b/ansible_rulebook/action/helper.py new file mode 100644 index 00000000..9c2d9902 --- /dev/null +++ b/ansible_rulebook/action/helper.py @@ -0,0 +1,143 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from typing import Dict + +from ansible_rulebook.conf import settings +from ansible_rulebook.event_filter.insert_meta_info import main as insert_meta +from ansible_rulebook.util import run_at + +from .control import Control +from .metadata import Metadata + +KEY_EDA_VARS = "ansible_eda" +INTERNAL_ACTION_STATUS = "successful" + + +class Helper: + """ + Helper class stores the metadata, the control attributes and has + methods to send data to the Queue. + + Attributes + ---------- + metadata : Metadata + a data class that stores rule specific data + control : Control + a control dataclass that stores the runtime information about + the queue on which we send the status for the action, the inventory + information, the hosts data and the variables that we would like + to pass into the action + uuid : str + each action has a uuid that is generated to track it + action : str + the name of the action, set by the sub classe + + Methods + ------- + send_status(data={}, obj_type:"action") + Sends the action status information on the queue + send_default_status() + Sends the default action status, used mostly with internal + actions like debug, print_event, set_fact, retract_fact, + noop, post_event + get_events() + Fetches the matching events from the variables + collect_extra_vars() + Create extra_vars to be sent to playbook and job template which + includes rule and matching events. + embellish_internal_event() + Add internal sources for facts and events posted from inside of + a rulebook + """ + + def __init__(self, metadata: Metadata, control: Control, action: str): + self.metadata = metadata + self.control = control + self.uuid = str(uuid.uuid4()) + self.action = action + + async def send_status(self, data: Dict, obj_type: str = "Action") -> None: + """Send Action status information on the queue""" + payload = { + "type": obj_type, + "action": self.action, + "action_uuid": self.uuid, + "ruleset": self.metadata.rule_set, + "ruleset_uuid": self.metadata.rule_set_uuid, + "rule": self.metadata.rule, + "rule_uuid": self.metadata.rule_uuid, + "rule_run_at": self.metadata.rule_run_at, + "activation_id": settings.identifier, + "activation_instance_id": settings.identifier, + } + payload.update(data) + await self.control.queue.put(payload) + + async def send_default_status(self): + """Send default action status information on the queue""" + await self.send_status( + { + "run_at": run_at(), + "status": INTERNAL_ACTION_STATUS, + "matching_events": self.get_events(), + } + ) + + def get_events(self) -> Dict: + """From the control variables, detect if its a single event + match or a multi event match and return a dictionary with + the event data with + m key for single event stored in the event key + m_0,m_1,.... for multiple matching events stored in + the events key + """ + if "event" in self.control.variables: + return {"m": self.control.variables["event"]} + if "events" in self.control.variables: + return self.control.variables["events"] + return {} + + def embellish_internal_event(self, event: Dict) -> Dict: + """Insert metadata for every internally generated event""" + return insert_meta( + event, **{"source_name": self.action, "source_type": "internal"} + ) + + def set_action(self, action) -> None: + self.action = action + + def collect_extra_vars(self, user_extra_vars: Dict) -> Dict: + """When we send information to ansible-playbook or job template + on AWX, we need the rule and event specific information to + be sent to this external process + + the caller passes in the user_extra_vars from the action args + and then we append eda specific vars and return that as a + the updated dictionary that is sent to the external process + """ + extra_vars = user_extra_vars.copy() if user_extra_vars else {} + + eda_vars = { + "ruleset": self.metadata.rule_set, + "rule": self.metadata.rule, + } + if "events" in self.control.variables: + eda_vars["events"] = self.control.variables["events"] + if "event" in self.control.variables: + eda_vars["event"] = self.control.variables["event"] + + extra_vars[KEY_EDA_VARS] = eda_vars + return extra_vars diff --git a/ansible_rulebook/action/metadata.py b/ansible_rulebook/action/metadata.py new file mode 100644 index 00000000..c3eb73f0 --- /dev/null +++ b/ansible_rulebook/action/metadata.py @@ -0,0 +1,48 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Metadata: + """Metadata class stores the rule specific information + which is used when reporting stats for the action + + Attributes + ---------- + rule: str + Rule name + rule_uuid: str + Rule uuid + rule_set: str + Rule set name + rule_set_uuid: str + Rule set uuid + rule_run_at: str + ISO 8601 date/time when the rule was triggered + """ + + __slots__ = [ + "rule", + "rule_uuid", + "rule_set", + "rule_set_uuid", + "rule_run_at", + ] + rule: str + rule_uuid: str + rule_set: str + rule_set_uuid: str + rule_run_at: str diff --git a/ansible_rulebook/action/noop.py b/ansible_rulebook/action/noop.py new file mode 100644 index 00000000..23b84de7 --- /dev/null +++ b/ansible_rulebook/action/noop.py @@ -0,0 +1,34 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class Noop: + """The No Op action usually used for debugging, doesn't do anything and + just sends the action status + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "noop") + self.action_args = action_args + + async def __call__(self): + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/post_event.py b/ansible_rulebook/action/post_event.py new file mode 100644 index 00000000..4c2cc5bb --- /dev/null +++ b/ansible_rulebook/action/post_event.py @@ -0,0 +1,43 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from drools import ruleset as lang + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class PostEvent: + """The post_event action sends the event information into the Drools + rule engine, which can then trigger the rules based on matching + events. To mark that this is an internal event coming from inside + the rulebook we embellish the event with source information to + indicate that its an internal event. + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "post_event") + self.action_args = action_args + + async def __call__(self): + lang.post( + self.action_args["ruleset"], + self.helper.embellish_internal_event(self.action_args["event"]), + ) + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/print_event.py b/ansible_rulebook/action/print_event.py new file mode 100644 index 00000000..d85fa9f6 --- /dev/null +++ b/ansible_rulebook/action/print_event.py @@ -0,0 +1,45 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from pprint import pprint +from typing import Callable + +from .control import Control +from .helper import Helper +from .metadata import Metadata + + +class PrintEvent: + """The print_event action defined in the rule book + prints the event information to stdout and + send the action status + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "print_event") + self.action_args = action_args + + async def __call__(self): + print_fn: Callable = print + if "pretty" in self.action_args: + print_fn = pprint + + var_name = ( + "events" if "events" in self.helper.control.variables else "event" + ) + + print_fn(self.helper.control.variables[var_name]) + sys.stdout.flush() + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/retract_fact.py b/ansible_rulebook/action/retract_fact.py new file mode 100644 index 00000000..b5afaaf7 --- /dev/null +++ b/ansible_rulebook/action/retract_fact.py @@ -0,0 +1,57 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from drools import ruleset as lang + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class RetractFact: + """The retract_fact action removes a fact information from the Drools + rule engine, which can then trigger the rules based on removed + facts. + The action_args includes the following parameters + ruleset: str + The name of the ruleset to retract the fact from + fact: dict + The fact to retract from Drools + partial: true|false, default is true + if the fact has partial information or it has complete + information. + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "retract_fact") + self.action_args = action_args + + async def __call__(self): + partial = self.action_args.get("partial", True) + if not partial: + exclude_keys = ["meta"] + else: + exclude_keys = [] + + lang.retract_matching_facts( + self.action_args["ruleset"], + self.action_args["fact"], + partial, + exclude_keys, + ) + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/run_job_template.py b/ansible_rulebook/action/run_job_template.py new file mode 100644 index 00000000..c21df914 --- /dev/null +++ b/ansible_rulebook/action/run_job_template.py @@ -0,0 +1,160 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import uuid +from urllib.parse import urljoin + +from drools import ruleset as lang + +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import ( + ControllerApiException, + JobTemplateNotFoundException, +) +from ansible_rulebook.job_template_runner import job_template_runner +from ansible_rulebook.util import process_controller_host_limit, run_at + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class RunJobTemplate: + """run_job_template action launches a specified job template on + the controller. It waits for the job to be complete. + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "run_job_template") + self.action_args = action_args + self.name = self.action_args["name"] + self.organization = self.action_args["organization"] + self.job_id = str(uuid.uuid4()) + self.job_args = self.action_args.get("job_args", {}) + self.job_args["limit"] = process_controller_host_limit( + self.job_args, + self.helper.control.hosts, + ) + self.controller_job = {} + + async def __call__(self): + logger.info( + "running job template: %s, organization: %s", + self.name, + self.organization, + ) + logger.info( + "ruleset: %s, rule %s", + self.helper.metadata.rule_set, + self.helper.metadata.rule, + ) + + self.job_args["extra_vars"] = self.helper.collect_extra_vars( + self.job_args.get("extra_vars", {}) + ) + await self._job_start_event() + await self._run() + + async def _run(self): + retries = self.action_args.get("retries", 0) + if self.action_args.get("retry", False): + retries = max(self.action_args.get("retries", 0), 1) + delay = self.action_args.get("delay", 0) + + try: + for i in range(retries + 1): + if i > 0: + if delay > 0: + await asyncio.sleep(delay) + logger.info( + "Previous run_job_template failed. Retry %d of %d", + i, + retries, + ) + controller_job = await job_template_runner.run_job_template( + self.name, + self.organization, + self.job_args, + ) + if controller_job["status"] != "failed": + break + except (ControllerApiException, JobTemplateNotFoundException) as ex: + logger.error(ex) + controller_job = {} + controller_job["status"] = "failed" + controller_job["created"] = run_at() + controller_job["error"] = str(ex) + + self.controller_job = controller_job + await self._post_process() + + async def _post_process(self) -> None: + a_log = { + "job_template_name": self.name, + "organization": self.organization, + "job_id": self.job_id, + "status": self.controller_job["status"], + "run_at": self.controller_job["created"], + "url": self._controller_job_url(), + "matching_events": self.helper.get_events(), + } + if "error" in self.controller_job: + a_log["message"] = self.controller_job["error"] + a_log["reason"] = {"error": self.controller_job["error"]} + + await self.helper.send_status(a_log) + set_facts = self.action_args.get("set_facts", False) + post_events = self.action_args.get("post_events", False) + + if set_facts or post_events: + ruleset = self.action_args.get( + "ruleset", self.helper.metadata.rule_set + ) + logger.debug("set_facts") + facts = self.controller_job.get("artifacts", {}) + if facts: + facts = self.helper.embellish_internal_event(facts) + logger.debug("facts %s", facts) + if set_facts: + lang.assert_fact(ruleset, facts) + if post_events: + lang.post(ruleset, facts) + else: + logger.debug("Empty facts are not set") + + async def _job_start_event(self): + await self.helper.send_status( + { + "run_at": run_at(), + "matching_events": self.helper.get_events(), + "action": self.helper.action, + "hosts": ",".join(self.helper.control.hosts), + "name": self.name, + "job_id": self.job_id, + "ansible_rulebook_id": settings.identifier, + }, + obj_type="Job", + ) + + def _controller_job_url(self) -> str: + if "id" in self.controller_job: + return urljoin( + job_template_runner.host, + "/#/jobs/" f"{self.controller_job['id']}/" "details", + ) + return "" diff --git a/ansible_rulebook/action/run_module.py b/ansible_rulebook/action/run_module.py new file mode 100644 index 00000000..3c678106 --- /dev/null +++ b/ansible_rulebook/action/run_module.py @@ -0,0 +1,73 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +import yaml + +from .control import Control +from .metadata import Metadata +from .run_playbook import RunPlaybook + +logger = logging.getLogger(__name__) + + +class RunModule(RunPlaybook): + """run_module runs an ansible module using the ansible runner""" + + MODULE_OUTPUT_KEY = "module_result" + + def __init__(self, metadata: Metadata, control: Control, **action_args): + super().__init__(metadata, control, **action_args) + self.helper.set_action("run_module") + self.output_key = self.MODULE_OUTPUT_KEY + + async def _pre_process(self) -> None: + await super()._pre_process() + self.playbook = os.path.join(self.private_data_dir, "wrapper.yml") + self._wrap_module_in_playbook() + + def _copy_playbook_files(self, project_dir): + pass + + def _runner_args(self): + return {"playbook": self.playbook, "inventory": self.inventory} + + def _wrap_module_in_playbook(self) -> None: + module_args = self.action_args.get("module_args", {}) + module_task = { + "name": "Module wrapper", + self.name: module_args, + "register": self.MODULE_OUTPUT_KEY, + } + result_str = "{{ " + self.MODULE_OUTPUT_KEY + " }}" + set_fact_task = { + "name": "save result", + "ansible.builtin.set_fact": { + self.MODULE_OUTPUT_KEY: result_str, + "cacheable": True, + }, + } + tasks = [module_task, set_fact_task] + wrapper = [ + dict( + name="wrapper", + hosts=self.host_limit, + gather_facts=False, + tasks=tasks, + ) + ] + with open(self.playbook, "w") as f: + yaml.dump(wrapper, f) diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py new file mode 100644 index 00000000..2325daab --- /dev/null +++ b/ansible_rulebook/action/run_playbook.py @@ -0,0 +1,265 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import glob +import json +import logging +import os +import shutil +import tempfile +import uuid + +import yaml +from drools import ruleset as lang + +from ansible_rulebook.collection import ( + find_playbook, + has_playbook, + split_collection_name, +) +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import ( + MissingArtifactKeyException, + PlaybookNotFoundException, + PlaybookStatusNotFoundException, +) +from ansible_rulebook.util import create_inventory, run_at + +from .control import Control +from .helper import Helper +from .metadata import Metadata +from .runner import Runner + +logger = logging.getLogger(__name__) + +tar = shutil.which("tar") + + +class RunPlaybook: + """run_playbook action runs an ansible playbook using the + ansible-runner + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "run_playbook") + self.action_args = action_args + self.job_id = str(uuid.uuid4()) + self.default_copy_files = True + self.default_check_files = True + self.name = self.action_args["name"] + self.verbosity = self.action_args.get("verbosity", 0) + self.json_mode = self.action_args.get("json_mode", False) + self.host_limit = ",".join(self.helper.control.hosts) + self.private_data_dir = tempfile.mkdtemp(prefix="eda") + self.output_key = None + self.inventory = None + + async def __call__(self): + try: + logger.info( + f"ruleset: {self.helper.metadata.rule_set}, " + f"rule: {self.helper.metadata.rule}" + ) + logger.debug("private data dir %s", self.private_data_dir) + await self._pre_process() + await self._job_start_event() + logger.info("Calling Ansible runner") + await self._run() + finally: + if os.path.exists(self.private_data_dir): + shutil.rmtree(self.private_data_dir) + + async def _job_start_event(self): + await self.helper.send_status( + { + "run_at": run_at(), + "matching_events": self.helper.get_events(), + "action": self.helper.action, + "hosts": self.host_limit, + "name": self.name, + "job_id": self.job_id, + "ansible_rulebook_id": settings.identifier, + }, + obj_type="Job", + ) + + async def _run(self): + retries = self.action_args.get("retries", 0) + if self.action_args.get("retry", False): + retries = max(self.action_args.get("retries", 0), 1) + + delay = self.action_args.get("delay", 0) + + for i in range(retries + 1): + if i > 0: + if delay > 0: + await asyncio.sleep(delay) + logger.info( + "Previous run_playbook failed. Retry %d of %d", i, retries + ) + + await Runner( + self.private_data_dir, + self.host_limit, + self.verbosity, + self.job_id, + self.json_mode, + self.helper, + self._runner_args(), + )() + if self._get_latest_artifact("status") != "failed": + break + + await self._post_process() + + def _runner_args(self): + return {"playbook": self.name, "inventory": self.inventory} + + async def _pre_process(self) -> None: + playbook_extra_vars = self.helper.collect_extra_vars( + self.action_args.get("extra_vars", {}) + ) + + env_dir = os.path.join(self.private_data_dir, "env") + inventory_dir = os.path.join(self.private_data_dir, "inventory") + project_dir = os.path.join(self.private_data_dir, "project") + + os.mkdir(env_dir) + with open(os.path.join(env_dir, "extravars"), "w") as file_handle: + file_handle.write(yaml.dump(playbook_extra_vars)) + os.mkdir(inventory_dir) + + if self.helper.control.inventory: + create_inventory(inventory_dir, self.helper.control.inventory) + self.inventory = os.path.join( + inventory_dir, os.path.basename(self.helper.control.inventory) + ) + os.mkdir(project_dir) + + logger.debug( + "project_data_file: %s", self.helper.control.project_data_file + ) + if self.helper.control.project_data_file: + if os.path.exists(self.helper.control.project_data_file): + await self._untar_project( + project_dir, self.helper.control.project_data_file + ) + return + self._copy_playbook_files(project_dir) + + def _copy_playbook_files(self, project_dir): + if self.action_args.get("check_files", self.default_check_files): + if os.path.exists(self.name): + tail_name = os.path.basename(self.name) + shutil.copy(self.name, os.path.join(project_dir, tail_name)) + if self.action_args.get("copy_files", self.default_copy_files): + shutil.copytree( + os.path.dirname(os.path.abspath(self.name)), + project_dir, + dirs_exist_ok=True, + ) + self.name = tail_name + elif has_playbook(*split_collection_name(self.name)): + shutil.copy( + find_playbook(*split_collection_name(self.name)), + os.path.join(project_dir, self.name), + ) + else: + msg = ( + f"Could not find a playbook for {self.name} " + f"from {os.getcwd()}" + ) + logger.error(msg) + raise PlaybookNotFoundException(msg) + + async def _post_process(self): + rc = int(self._get_latest_artifact("rc")) + status = self._get_latest_artifact("status") + logger.info("Ansible runner rc: %d, status: %s", rc, status) + if rc != 0: + error_message = self._get_latest_artifact("stderr") + if not error_message: + error_message = self._get_latest_artifact("stdout") + logger.error(error_message) + + await self.helper.send_status( + { + "playbook_name": self.name, + "job_id": self.job_id, + "rc": rc, + "status": status, + "run_at": run_at(), + "matching_events": self.helper.get_events(), + } + ) + set_facts = self.action_args.get("set_facts", False) + post_events = self.action_args.get("post_events", False) + + if rc == 0 and (set_facts or post_events): + logger.debug("set_facts") + fact_folder = self._get_latest_artifact("fact_cache", False) + ruleset = self.action_args.get( + "ruleset", self.helper.metadata.rule_set + ) + for host_facts in glob.glob(os.path.join(fact_folder, "*")): + with open(host_facts) as file_handle: + fact = json.loads(file_handle.read()) + if self.output_key: + if self.output_key not in fact: + logger.error( + "The artifacts from the ansible-runner " + "does not have key %s", + self.output_key, + ) + raise MissingArtifactKeyException( + f"Missing key: {self.output_key} in artifacts" + ) + fact = fact[self.output_key] + fact = self.helper.embellish_internal_event(fact) + logger.debug("fact %s", fact) + if set_facts: + lang.assert_fact(ruleset, fact) + if post_events: + lang.post(ruleset, fact) + + def _get_latest_artifact(self, component: str, content: bool = True): + files = glob.glob( + os.path.join(self.private_data_dir, "artifacts", "*", component) + ) + files.sort(key=os.path.getmtime, reverse=True) + if not files: + raise PlaybookStatusNotFoundException(f"No {component} file found") + if content: + with open(files[0], "r") as file_handle: + content = file_handle.read() + return content + return files[0] + + async def _untar_project(self, output_dir, project_data_file): + + cmd = [tar, "zxvf", project_data_file] + proc = await asyncio.create_subprocess_exec( + *cmd, + cwd=output_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await proc.communicate() + + if stdout: + logger.debug(stdout.decode()) + if stderr: + logger.debug(stderr.decode()) diff --git a/ansible_rulebook/action/run_workflow_template.py b/ansible_rulebook/action/run_workflow_template.py new file mode 100644 index 00000000..de11b7b9 --- /dev/null +++ b/ansible_rulebook/action/run_workflow_template.py @@ -0,0 +1,166 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import uuid +from urllib.parse import urljoin + +from drools import ruleset as lang + +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import ( + ControllerApiException, + WorkflowJobTemplateNotFoundException, +) +from ansible_rulebook.job_template_runner import job_template_runner +from ansible_rulebook.util import process_controller_host_limit, run_at + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class RunWorkflowTemplate: + """run_workflow_template action launches a specified workflow template on + the controller. It waits for the job to be complete. + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "run_workflow_template") + self.action_args = action_args + self.name = self.action_args["name"] + self.organization = self.action_args["organization"] + self.job_id = str(uuid.uuid4()) + self.job_args = self.action_args.get("job_args", {}) + self.job_args["limit"] = process_controller_host_limit( + self.job_args, + self.helper.control.hosts, + ) + self.controller_job = {} + + async def __call__(self): + logger.info( + "running workflow template: %s, organization: %s", + self.name, + self.organization, + ) + logger.info( + "ruleset: %s, rule %s", + self.helper.metadata.rule_set, + self.helper.metadata.rule, + ) + + self.job_args["extra_vars"] = self.helper.collect_extra_vars( + self.job_args.get("extra_vars", {}) + ) + await self._job_start_event() + await self._run() + + async def _run(self): + retries = self.action_args.get("retries", 0) + if self.action_args.get("retry", False): + retries = max(self.action_args.get("retries", 0), 1) + delay = self.action_args.get("delay", 0) + + try: + for i in range(retries + 1): + if i > 0: + if delay > 0: + await asyncio.sleep(delay) + logger.info( + "Previous run_workflow_template failed. " + "Retry %d of %d", + i, + retries, + ) + controller_job = ( + await job_template_runner.run_workflow_job_template( + self.name, + self.organization, + self.job_args, + ) + ) + if controller_job["status"] != "failed": + break + except ( + ControllerApiException, + WorkflowJobTemplateNotFoundException, + ) as ex: + logger.error(ex) + controller_job = {} + controller_job["status"] = "failed" + controller_job["created"] = run_at() + controller_job["error"] = str(ex) + + self.controller_job = controller_job + await self._post_process() + + async def _post_process(self) -> None: + a_log = { + "name": self.name, + "organization": self.organization, + "job_id": self.job_id, + "status": self.controller_job["status"], + "run_at": self.controller_job["created"], + "url": self._controller_job_url(), + "matching_events": self.helper.get_events(), + } + if "error" in self.controller_job: + a_log["message"] = self.controller_job["error"] + a_log["reason"] = {"error": self.controller_job["error"]} + + await self.helper.send_status(a_log) + set_facts = self.action_args.get("set_facts", False) + post_events = self.action_args.get("post_events", False) + + if set_facts or post_events: + ruleset = self.action_args.get( + "ruleset", self.helper.metadata.rule_set + ) + logger.debug("set_facts") + facts = self.controller_job.get("artifacts", {}) + if facts: + facts = self.helper.embellish_internal_event(facts) + logger.debug("facts %s", facts) + if set_facts: + lang.assert_fact(ruleset, facts) + if post_events: + lang.post(ruleset, facts) + else: + logger.debug("Empty facts are not set") + + async def _job_start_event(self): + await self.helper.send_status( + { + "run_at": run_at(), + "matching_events": self.helper.get_events(), + "action": self.helper.action, + "hosts": ",".join(self.helper.control.hosts), + "name": self.name, + "job_id": self.job_id, + "ansible_rulebook_id": settings.identifier, + }, + obj_type="Job", + ) + + def _controller_job_url(self) -> str: + if "id" in self.controller_job: + return urljoin( + job_template_runner.host, + "/#/jobs/workflow/" f"{self.controller_job['id']}/" "details", + ) + return "" diff --git a/ansible_rulebook/action/runner.py b/ansible_rulebook/action/runner.py new file mode 100644 index 00000000..293388fd --- /dev/null +++ b/ansible_rulebook/action/runner.py @@ -0,0 +1,117 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import logging +from asyncio.exceptions import CancelledError +from functools import partial + +import ansible_runner +import janus + +from ansible_rulebook.conf import settings + +logger = logging.getLogger(__name__) + + +class Runner: + """calls ansible-runner to launch either playbooks/modules + ansible-runner + """ + + def __init__( + self, + data_dir, + host_limit, + verbosity, + job_id, + json_mode, + helper, + runner_args, + ): + self.private_data_dir = data_dir + self.host_limit = host_limit + self.verbosity = verbosity + self.job_id = job_id + self.helper = helper + self.runner_args = runner_args + self.json_mode = json_mode + + async def __call__(self): + shutdown = False + + loop = asyncio.get_running_loop() + + queue = janus.Queue() + + # The event_callback is called from the ansible-runner thread + # It needs a thread-safe synchronous queue. + # Janus provides a sync queue connected to an async queue + # Here we push the event into the sync side of janus + def event_callback(event, *_args, **_kwargs): + event["job_id"] = self.job_id + event["ansible_rulebook_id"] = settings.identifier + queue.sync_q.put({"type": "AnsibleEvent", "event": event}) + + # Here we read the async side and push it into the event queue + # which is also async. + # We do this until cancelled at the end of the ansible runner call. + # We might need to drain the queue here before ending. + async def read_queue(): + try: + while True: + val = await queue.async_q.get() + event_data = val.get("event", {}) + val["run_at"] = event_data.get("created") + await self.helper.send_status(val) + except CancelledError: + logger.info("Ansible runner Queue task cancelled") + + def cancel_callback(): + return shutdown + + tasks = [] + + tasks.append(asyncio.create_task(read_queue())) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as task_pool: + try: + await loop.run_in_executor( + task_pool, + partial( + ansible_runner.run, + private_data_dir=self.private_data_dir, + limit=self.host_limit, + verbosity=self.verbosity, + event_handler=event_callback, + cancel_callback=cancel_callback, + json_mode=self.json_mode, + **self.runner_args, + ), + ) + except CancelledError: + logger.debug( + "Ansible Runner Thread Pool executor task cancelled" + ) + shutdown = True + raise + finally: + # Cancel the queue reading task + for task in tasks: + if not task.done(): + logger.debug("Cancel Queue reading task") + task.cancel() + + await asyncio.gather(*tasks) diff --git a/ansible_rulebook/action/set_fact.py b/ansible_rulebook/action/set_fact.py new file mode 100644 index 00000000..adca68a0 --- /dev/null +++ b/ansible_rulebook/action/set_fact.py @@ -0,0 +1,48 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from drools import ruleset as lang + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class SetFact: + """The set_fact action sends the fact information into the Drools + rule engine, which can then trigger the rules based on matching + facts. To mark that this is an internal fact coming from inside + the rulebook we embellish the fact with source information to + indicate that its an internal fact. + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "set_fact") + self.action_args = action_args + + async def __call__(self): + logger.debug( + "set_fact %s %s", + self.action_args["ruleset"], + self.action_args["fact"], + ) + lang.assert_fact( + self.action_args["ruleset"], + self.helper.embellish_internal_event(self.action_args["fact"]), + ) + await self.helper.send_default_status() diff --git a/ansible_rulebook/action/shutdown.py b/ansible_rulebook/action/shutdown.py new file mode 100644 index 00000000..e33c3225 --- /dev/null +++ b/ansible_rulebook/action/shutdown.py @@ -0,0 +1,59 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ansible_rulebook.exception import ShutdownException +from ansible_rulebook.messages import Shutdown as ShutdownMessage +from ansible_rulebook.util import run_at + +from .control import Control +from .helper import INTERNAL_ACTION_STATUS, Helper +from .metadata import Metadata + + +class Shutdown: + """shutdown action initiates a shutdown from inside of a rulebook""" + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "shutdown") + self.action_args = action_args + + async def __call__(self): + delay = self.action_args.get("delay", 60.0) + message = self.action_args.get("message", "Default shutdown message") + kind = self.action_args.get("kind", "graceful") + + await self.helper.send_status( + { + "run_at": run_at(), + "status": INTERNAL_ACTION_STATUS, + "matching_events": self.helper.get_events(), + "delay": delay, + "message": message, + "kind": kind, + } + ) + print( + "Ruleset: %s rule: %s has initiated shutdown of type: %s. " + "Delay: %.3f seconds, Message: %s" + % ( + self.helper.metadata.rule_set, + self.helper.metadata.rule, + kind, + delay, + message, + ) + ) + raise ShutdownException( + ShutdownMessage(message=message, delay=delay, kind=kind) + ) diff --git a/ansible_rulebook/builtin.py b/ansible_rulebook/builtin.py deleted file mode 100644 index cac1615d..00000000 --- a/ansible_rulebook/builtin.py +++ /dev/null @@ -1,1114 +0,0 @@ -# Copyright 2022 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import concurrent.futures -import glob -import json -import logging -import os -import shutil -import sys -import tempfile -import uuid -from asyncio.exceptions import CancelledError -from functools import partial -from pprint import pprint -from typing import Callable, Dict, List, Optional, Union -from urllib.parse import urljoin - -import ansible_runner -import dpath -import janus -import yaml -from drools import ruleset as lang - -from .collection import find_playbook, has_playbook, split_collection_name -from .conf import settings -from .event_filter.insert_meta_info import main as insert_meta -from .exception import ( - ControllerApiException, - JobTemplateNotFoundException, - MissingArtifactKeyException, - PlaybookNotFoundException, - PlaybookStatusNotFoundException, - ShutdownException, - WorkflowJobTemplateNotFoundException, -) -from .job_template_runner import job_template_runner -from .messages import Shutdown -from .util import create_inventory, get_horizontal_rule, run_at - -logger = logging.getLogger(__name__) - -tar = shutil.which("tar") - -KEY_EDA_VARS = "ansible_eda" -INTERNAL_ACTION_STATUS = "successful" -MODULE_OUTPUT_KEY = "module_result" - - -async def none( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, -): - await event_log.put( - dict( - type="Action", - action="noop", - action_uuid=str(uuid.uuid4()), - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - activation_id=settings.identifier, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - ) - - -async def debug(event_log, **kwargs): - if "msg" in kwargs: - messages = kwargs.get("msg") - if not isinstance(messages, list): - messages = [messages] - for msg in messages: - print(msg) - elif "var" in kwargs: - key = kwargs.get("var") - try: - print(dpath.get(kwargs.get("variables"), key, separator=".")) - except KeyError: - logger.error("Key %s not found in variable pool", key) - return - else: - print(get_horizontal_rule("=")) - print("kwargs:") - pprint(kwargs) - print(get_horizontal_rule("=")) - print("facts:") - pprint(lang.get_facts(kwargs["source_ruleset_name"])) - print(get_horizontal_rule("=")) - sys.stdout.flush() - await event_log.put( - dict( - type="Action", - action="debug", - action_uuid=str(uuid.uuid4()), - playbook_name=kwargs.get("name"), - ruleset=kwargs.get("source_ruleset_name"), - ruleset_uuid=kwargs.get("source_ruleset_uuid"), - rule=kwargs.get("source_rule_name"), - rule_uuid=kwargs.get("source_rule_uuid"), - rule_run_at=kwargs.get("rule_run_at"), - activation_id=settings.identifier, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(kwargs.get("variables")), - ) - ) - - -async def print_event( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - name: Optional[str] = None, - pretty: Optional[str] = None, -): - print_fn: Callable = print - if pretty: - print_fn = pprint - - var_name = "events" if "events" in variables else "event" - - print_fn(variables[var_name]) - sys.stdout.flush() - await event_log.put( - dict( - type="Action", - action="print_event", - action_uuid=str(uuid.uuid4()), - activation_id=settings.identifier, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - playbook_name=name, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - ) - - -async def set_fact( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - fact: Dict, - name: Optional[str] = None, -): - logger.debug("set_fact %s %s", ruleset, fact) - lang.assert_fact(ruleset, _embellish_internal_event(fact, "set_fact")) - await event_log.put( - dict( - type="Action", - action="set_fact", - action_uuid=str(uuid.uuid4()), - activation_id=settings.identifier, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - playbook_name=name, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - ) - - -async def retract_fact( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - fact: Dict, - partial: bool = True, - name: Optional[str] = None, -): - - if not partial: - exclude_keys = ["meta"] - else: - exclude_keys = [] - - lang.retract_matching_facts(ruleset, fact, partial, exclude_keys) - await event_log.put( - dict( - type="Action", - action="retract_fact", - action_uuid=str(uuid.uuid4()), - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - activation_id=settings.identifier, - playbook_name=name, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - ) - - -async def post_event( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - event: Dict, -): - lang.post(ruleset, _embellish_internal_event(event, "post_event")) - - await event_log.put( - dict( - type="Action", - action="post_event", - action_uuid=str(uuid.uuid4()), - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - activation_id=settings.identifier, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - ) - - -async def run_playbook( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - name: str, - set_facts: Optional[bool] = None, - post_events: Optional[bool] = None, - verbosity: int = 0, - copy_files: Optional[bool] = False, - json_mode: Optional[bool] = False, - retries: Optional[int] = 0, - retry: Optional[bool] = False, - delay: Optional[int] = 0, - extra_vars: Optional[Dict] = None, - **kwargs, -): - - logger.info("running Ansible playbook: %s", name) - temp_dir, playbook_name = await pre_process_runner( - event_log, - inventory, - variables, - source_ruleset_name, - source_rule_name, - name, - "run_playbook", - copy_files, - True, - project_data_file, - extra_vars, - **kwargs, - ) - - job_id = str(uuid.uuid4()) - - logger.info(f"ruleset: {source_ruleset_name}, rule: {source_rule_name}") - await event_log.put( - dict( - type="Job", - job_id=job_id, - ansible_rulebook_id=settings.identifier, - name=playbook_name, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - hosts=",".join(hosts), - action="run_playbook", - ) - ) - - logger.info("Calling Ansible runner") - - if retry: - retries = max(retries, 1) - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_playbook failed. Retry %d of %d", i, retries - ) - - action_run_at = run_at() - await call_runner( - event_log, - job_id, - temp_dir, - dict(playbook=playbook_name), - hosts, - inventory, - verbosity, - json_mode, - ) - if _get_latest_artifact(temp_dir, "status") != "failed": - break - - await post_process_runner( - event_log, - variables, - temp_dir, - ruleset, - source_ruleset_uuid, - source_rule_name, - source_rule_uuid, - rule_run_at, - settings.identifier, - name, - "run_playbook", - job_id, - action_run_at, - set_facts, - post_events, - ) - - shutil.rmtree(temp_dir) - - -async def run_module( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - name: str, - set_facts: Optional[bool] = None, - post_events: Optional[bool] = None, - verbosity: int = 0, - copy_files: Optional[bool] = False, - json_mode: Optional[bool] = False, - module_args: Union[Dict, None] = None, - retries: Optional[int] = 0, - retry: Optional[bool] = False, - delay: Optional[int] = 0, - extra_vars: Optional[Dict] = None, - **kwargs, -): - temp_dir, module_name = await pre_process_runner( - event_log, - inventory, - variables, - source_ruleset_name, - source_rule_name, - name, - "run_module", - copy_files, - False, - project_data_file, - extra_vars, - **kwargs, - ) - job_id = str(uuid.uuid4()) - - await event_log.put( - dict( - type="Job", - job_id=job_id, - ansible_rulebook_id=settings.identifier, - name=module_name, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - hosts=",".join(hosts), - action="run_module", - ) - ) - - logger.info("Calling Ansible runner") - host_pattern = ",".join(hosts) - playbook = os.path.join(temp_dir, "wrapper.yml") - _wrap_module_in_playbook(module_name, module_args, host_pattern, playbook) - if retry: - retries = max(retries, 1) - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_module failed. Retry %d of %d", i, retries - ) - action_run_at = run_at() - await call_runner( - event_log, - job_id, - temp_dir, - dict(playbook=playbook), - hosts, - inventory, - verbosity, - json_mode, - ) - if _get_latest_artifact(temp_dir, "status") != "failed": - break - - await post_process_runner( - event_log, - variables, - temp_dir, - ruleset, - source_ruleset_uuid, - source_rule_name, - source_rule_uuid, - rule_run_at, - settings.identifier, - name, - "run_module", - job_id, - action_run_at, - set_facts, - post_events, - MODULE_OUTPUT_KEY, - ) - shutil.rmtree(temp_dir) - - -async def call_runner( - event_log, - job_id: str, - private_data_dir: str, - runner_args: Dict, - hosts: List, - inventory: str, - verbosity: int = 0, - json_mode: Optional[bool] = False, -): - - host_limit = ",".join(hosts) - shutdown = False - - loop = asyncio.get_running_loop() - - queue = janus.Queue() - - # The event_callback is called from the ansible-runner thread - # It needs a thread-safe synchronous queue. - # Janus provides a sync queue connected to an async queue - # Here we push the event into the sync side of janus - def event_callback(event, *args, **kwargs): - event["job_id"] = job_id - event["ansible_rulebook_id"] = settings.identifier - queue.sync_q.put(dict(type="AnsibleEvent", event=event)) - - # Here we read the async side and push it into the event queue - # which is also async. - # We do this until cancelled at the end of the ansible runner call. - # We might need to drain the queue here before ending. - async def read_queue(): - try: - while True: - val = await queue.async_q.get() - event_data = val.get("event", {}) - val["run_at"] = event_data.get("created") - await event_log.put(val) - except CancelledError: - logger.info("Ansible Runner Queue task cancelled") - - def cancel_callback(): - return shutdown - - tasks = [] - - tasks.append(asyncio.create_task(read_queue())) - - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as task_pool: - try: - await loop.run_in_executor( - task_pool, - partial( - ansible_runner.run, - private_data_dir=private_data_dir, - limit=host_limit, - verbosity=verbosity, - event_handler=event_callback, - cancel_callback=cancel_callback, - inventory=os.path.join( - private_data_dir, - "inventory", - os.path.basename(inventory), - ), - json_mode=json_mode, - **runner_args, - ), - ) - except CancelledError: - logger.debug("Ansible Runner Thread Pool executor task cancelled") - shutdown = True - raise - finally: - # Cancel the queue reading task - for task in tasks: - if not task.done(): - logger.debug("Cancel Queue reading task") - task.cancel() - - await asyncio.gather(*tasks) - - -async def untar_project(output_dir, project_data_file): - - cmd = [tar, "zxvf", project_data_file] - proc = await asyncio.create_subprocess_exec( - *cmd, - cwd=output_dir, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdout, stderr = await proc.communicate() - - if stdout: - logger.debug(stdout.decode()) - if stderr: - logger.debug(stderr.decode()) - - -async def pre_process_runner( - event_log, - inventory: str, - variables: Dict, - ruleset: str, - rulename: str, - name: str, - action: str, - copy_files: Optional[bool] = False, - check_files: Optional[bool] = True, - project_data_file: Optional[str] = None, - extra_vars: Optional[Dict] = None, - **kwargs, -): - - private_data_dir = tempfile.mkdtemp(prefix=action) - logger.debug("private data dir %s", private_data_dir) - - playbook_extra_vars = _collect_extra_vars( - variables, extra_vars, ruleset, rulename - ) - - env_dir = os.path.join(private_data_dir, "env") - inventory_dir = os.path.join(private_data_dir, "inventory") - project_dir = os.path.join(private_data_dir, "project") - - playbook_name = name - - os.mkdir(env_dir) - with open(os.path.join(env_dir, "extravars"), "w") as f: - f.write(yaml.dump(playbook_extra_vars)) - os.mkdir(inventory_dir) - if inventory: - create_inventory(inventory_dir, inventory) - os.mkdir(project_dir) - - logger.debug("project_data_file: %s", project_data_file) - if project_data_file: - if os.path.exists(project_data_file): - await untar_project(project_dir, project_data_file) - return (private_data_dir, playbook_name) - - if check_files: - if os.path.exists(name): - playbook_name = os.path.basename(name) - shutil.copy(name, os.path.join(project_dir, playbook_name)) - if copy_files: - shutil.copytree( - os.path.dirname(os.path.abspath(name)), - project_dir, - dirs_exist_ok=True, - ) - elif has_playbook(*split_collection_name(name)): - playbook_name = name - shutil.copy( - find_playbook(*split_collection_name(name)), - os.path.join(project_dir, name), - ) - else: - msg = f"Could not find a playbook for {name} from {os.getcwd()}" - logger.error(msg) - raise PlaybookNotFoundException(msg) - - return (private_data_dir, playbook_name) - - -async def post_process_runner( - event_log, - variables: Dict, - private_data_dir: str, - ruleset: str, - ruleset_uuid: str, - rule: str, - rule_uuid: str, - rule_run_at: str, - activation_id: str, - name: str, - action: str, - job_id: str, - run_at: str, - set_facts: Optional[bool] = None, - post_events: Optional[bool] = None, - output_key: Optional[str] = None, -): - - rc = int(_get_latest_artifact(private_data_dir, "rc")) - status = _get_latest_artifact(private_data_dir, "status") - logger.info("Playbook rc: %d, status: %s", rc, status) - if rc != 0: - error_message = _get_latest_artifact(private_data_dir, "stderr") - if not error_message: - error_message = _get_latest_artifact(private_data_dir, "stdout") - logger.error(error_message) - - result = dict( - type="Action", - action=action, - action_uuid=str(uuid.uuid4()), - activation_id=activation_id, - playbook_name=name, - job_id=job_id, - ruleset=ruleset, - ruleset_uuid=ruleset_uuid, - rule=rule, - rule_uuid=rule_uuid, - rc=rc, - status=status, - run_at=run_at, - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - await event_log.put(result) - - if rc == 0 and (set_facts or post_events): - logger.debug("set_facts") - fact_folder = _get_latest_artifact( - private_data_dir, "fact_cache", False - ) - for host_facts in glob.glob(os.path.join(fact_folder, "*")): - with open(host_facts) as f: - fact = json.loads(f.read()) - if output_key: - if output_key not in fact: - logger.error( - "The artifacts from the ansible-runner " - "does not have key %s", - output_key, - ) - raise MissingArtifactKeyException( - f"Missing key: {output_key} in artifacts" - ) - fact = fact[output_key] - fact = _embellish_internal_event(fact, action) - logger.debug("fact %s", fact) - if set_facts: - lang.assert_fact(ruleset, fact) - if post_events: - lang.post(ruleset, fact) - - -async def run_job_template( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - name: str, - organization: str, - job_args: Optional[dict] = None, - set_facts: Optional[bool] = None, - post_events: Optional[bool] = None, - verbosity: int = 0, - copy_files: Optional[bool] = False, - json_mode: Optional[bool] = False, - retries: Optional[int] = 0, - retry: Optional[bool] = False, - delay: Optional[int] = 0, - **kwargs, -): - - logger.info( - "running job template: %s, organization: %s", name, organization - ) - logger.info("ruleset: %s, rule %s", source_ruleset_name, source_rule_name) - - hosts_limit = ",".join(hosts) - if not job_args: - job_args = {} - job_args["limit"] = hosts_limit - - job_args["extra_vars"] = _collect_extra_vars( - variables, - job_args.get("extra_vars", {}), - source_ruleset_name, - source_rule_name, - ) - - job_id = str(uuid.uuid4()) - await event_log.put( - dict( - type="Job", - job_id=job_id, - ansible_rulebook_id=settings.identifier, - name=name, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - hosts=hosts_limit, - action="run_job_template", - ) - ) - - if retry: - retries = max(retries, 1) - - try: - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_job_template failed. Retry %d of %d", - i, - retries, - ) - controller_job = await job_template_runner.run_job_template( - name, - organization, - job_args, - ) - if controller_job["status"] != "failed": - break - except (ControllerApiException, JobTemplateNotFoundException) as ex: - logger.error(ex) - controller_job = {} - controller_job["status"] = "failed" - controller_job["created"] = run_at() - controller_job["error"] = str(ex) - - a_log = dict( - type="Action", - action="run_job_template", - action_uuid=str(uuid.uuid4()), - activation_id=settings.identifier, - job_template_name=name, - organization=organization, - job_id=job_id, - ruleset=ruleset, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - status=controller_job["status"], - run_at=controller_job["created"], - url=_controller_job_url(controller_job, "jobs"), - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - if "error" in controller_job: - a_log["message"] = controller_job["error"] - await event_log.put(a_log) - - _post_process_awx( - controller_job, set_facts, post_events, "run_job_template", ruleset - ) - - -async def run_workflow_template( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - name: str, - organization: str, - job_args: Optional[dict] = None, - set_facts: Optional[bool] = None, - post_events: Optional[bool] = None, - verbosity: int = 0, - copy_files: Optional[bool] = False, - json_mode: Optional[bool] = False, - retries: Optional[int] = 0, - retry: Optional[bool] = False, - delay: Optional[int] = 0, - **kwargs, -): - - logger.info( - "running workflow template: %s organization: %s", name, organization - ) - logger.info("ruleset: %s, rule %s", source_ruleset_name, source_rule_name) - - hosts_limit = ",".join(hosts) - if not job_args: - job_args = {} - job_args["limit"] = hosts_limit - - job_args["extra_vars"] = _collect_extra_vars( - variables, - job_args.get("extra_vars", {}), - source_ruleset_name, - source_rule_name, - ) - - job_id = str(uuid.uuid4()) - - await event_log.put( - dict( - type="Job", - job_id=job_id, - ansible_rulebook_id=settings.identifier, - name=name, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - hosts=hosts_limit, - action="run_workflow_template", - ) - ) - - if retry: - retries = max(retries, 1) - - try: - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_workflow_template failed. Retry %d of %d", - i, - retries, - ) - controller_job = ( - await job_template_runner.run_workflow_job_template( - name, - organization, - job_args, - ) - ) - if controller_job["status"] != "failed": - break - except ( - ControllerApiException, - WorkflowJobTemplateNotFoundException, - ) as ex: - logger.error(ex) - controller_job = {} - controller_job["status"] = "failed" - controller_job["created"] = run_at() - controller_job["error"] = str(ex) - - a_log = dict( - type="Action", - action="run_workflow_template", - action_uuid=str(uuid.uuid4()), - activation_id=settings.identifier, - job_template_name=name, - organization=organization, - job_id=job_id, - ruleset=ruleset, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - status=controller_job["status"], - run_at=controller_job["created"], - url=_controller_job_url(controller_job, "jobs/workflow"), - matching_events=_get_events(variables), - rule_run_at=rule_run_at, - ) - if "error" in controller_job: - a_log["message"] = controller_job["error"] - await event_log.put(a_log) - - _post_process_awx( - controller_job, - set_facts, - post_events, - "run_workflow_template", - ruleset, - ) - - -def _post_process_awx(controller_job, set_facts, post_events, action, ruleset): - if controller_job["status"] == "successful" and (set_facts or post_events): - logger.debug("set_facts") - facts = controller_job.get("artifacts", None) - if facts: - facts = _embellish_internal_event(facts, action) - logger.debug("facts %s", facts) - if set_facts: - lang.assert_fact(ruleset, facts) - if post_events: - lang.post(ruleset, facts) - else: - logger.debug("Empty facts are not set") - - -async def shutdown( - event_log, - inventory: str, - hosts: List, - variables: Dict, - project_data_file: str, - source_ruleset_name: str, - source_ruleset_uuid: str, - source_rule_name: str, - source_rule_uuid: str, - rule_run_at: str, - ruleset: str, - delay: float = 60.0, - message: str = "Default shutdown message", - kind: str = "graceful", -): - await event_log.put( - dict( - type="Action", - action="shutdown", - action_uuid=str(uuid.uuid4()), - activation_id=settings.identifier, - ruleset=source_ruleset_name, - ruleset_uuid=source_ruleset_uuid, - rule=source_rule_name, - rule_uuid=source_rule_uuid, - run_at=run_at(), - status=INTERNAL_ACTION_STATUS, - matching_events=_get_events(variables), - delay=delay, - message=message, - kind=kind, - rule_run_at=rule_run_at, - ) - ) - - print( - "Ruleset: %s rule: %s has initiated shutdown of type: %s. " - "Delay: %.3f seconds, Message: %s" - % (source_ruleset_name, source_rule_name, kind, delay, message) - ) - raise ShutdownException(Shutdown(message=message, delay=delay, kind=kind)) - - -actions: Dict[str, Callable] = dict( - none=none, - debug=debug, - print_event=print_event, - set_fact=set_fact, - retract_fact=retract_fact, - post_event=post_event, - run_playbook=run_playbook, - run_module=run_module, - run_job_template=run_job_template, - run_workflow_template=run_workflow_template, - shutdown=shutdown, -) - - -def _get_latest_artifact(data_dir: str, artifact: str, content: bool = True): - files = glob.glob(os.path.join(data_dir, "artifacts", "*", artifact)) - files.sort(key=os.path.getmtime, reverse=True) - if not files: - raise PlaybookStatusNotFoundException(f"No {artifact} file found") - if content: - with open(files[0], "r") as f: - content = f.read() - return content - return files[0] - - -def _get_events(variables: Dict): - if "event" in variables: - return {"m": variables["event"]} - elif "events" in variables: - return variables["events"] - return {} - - -def _collect_extra_vars( - variables: Dict, user_extra_vars: Dict, ruleset: str, rule: str -): - extra_vars = user_extra_vars.copy() if user_extra_vars else {} - eda_vars = dict(ruleset=ruleset, rule=rule) - if "events" in variables: - eda_vars["events"] = variables["events"] - if "event" in variables: - eda_vars["event"] = variables["event"] - extra_vars[KEY_EDA_VARS] = eda_vars - return extra_vars - - -def _embellish_internal_event(event: Dict, method_name: str) -> Dict: - return insert_meta( - event, **dict(source_name=method_name, source_type="internal") - ) - - -def _controller_job_url(data: dict, prefix: str) -> str: - if "id" in data: - href_slug = f"/#/{prefix}/{data['id']}/details" - return urljoin(job_template_runner.host, href_slug) - return "" - - -def _wrap_module_in_playbook(module_name, module_args, hosts, playbook): - module_task = { - "name": "Module wrapper", - module_name: module_args, - "register": MODULE_OUTPUT_KEY, - } - result_str = "{{ " + MODULE_OUTPUT_KEY + " }}" - set_fact_task = { - "name": "save result", - "ansible.builtin.set_fact": { - MODULE_OUTPUT_KEY: result_str, - "cacheable": True, - }, - } - tasks = [module_task, set_fact_task] - wrapper = [ - dict(name="wrapper", hosts=hosts, gather_facts=False, tasks=tasks) - ] - with open(playbook, "w") as f: - yaml.dump(wrapper, f) diff --git a/ansible_rulebook/rule_set_runner.py b/ansible_rulebook/rule_set_runner.py index 91fa8c2f..e220d8b9 100644 --- a/ansible_rulebook/rule_set_runner.py +++ b/ansible_rulebook/rule_set_runner.py @@ -28,7 +28,19 @@ ) from drools.ruleset import session_stats -from ansible_rulebook.builtin import actions as builtin_actions +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.debug import Debug +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.noop import Noop +from ansible_rulebook.action.post_event import PostEvent +from ansible_rulebook.action.print_event import PrintEvent +from ansible_rulebook.action.retract_fact import RetractFact +from ansible_rulebook.action.run_job_template import RunJobTemplate +from ansible_rulebook.action.run_module import RunModule +from ansible_rulebook.action.run_playbook import RunPlaybook +from ansible_rulebook.action.run_workflow_template import RunWorkflowTemplate +from ansible_rulebook.action.set_fact import SetFact +from ansible_rulebook.action.shutdown import Shutdown as ShutdownAction from ansible_rulebook.conf import settings from ansible_rulebook.exception import ( ShutdownException, @@ -50,6 +62,20 @@ logger = logging.getLogger(__name__) +ACTION_CLASSES = { + "debug": Debug, + "print_event": PrintEvent, + "none": Noop, + "set_fact": SetFact, + "post_event": PostEvent, + "retract_fact": RetractFact, + "shutdown": ShutdownAction, + "run_playbook": RunPlaybook, + "run_module": RunModule, + "run_job_template": RunJobTemplate, + "run_workflow_template": RunWorkflowTemplate, +} + class RuleSetRunner: def __init__( @@ -292,13 +318,17 @@ def _run_action( f"{action_item.rule}" ) logger.debug("Creating action task %s", task_name) + metadata = Metadata( + rule_set=action_item.ruleset, + rule_set_uuid=action_item.ruleset_uuid, + rule=action_item.rule, + rule_uuid=action_item.rule_uuid, + rule_run_at=rule_run_at, + ) + task = asyncio.create_task( self._call_action( - action_item.ruleset, - action_item.ruleset_uuid, - action_item.rule, - action_item.rule_uuid, - rule_run_at, + metadata, action.action, MappingProxyType(action.action_args), action_item.variables, @@ -314,11 +344,7 @@ def _run_action( async def _call_action( self, - ruleset: str, - ruleset_uuid: str, - rule: str, - rule_uuid: str, - rule_run_at: str, + metadata: Metadata, action: str, immutable_action_args: MappingProxyType, variables: Dict, @@ -330,9 +356,12 @@ async def _call_action( action_args = immutable_action_args.copy() error = None - if action in builtin_actions: + if action in ACTION_CLASSES: try: - if action == "run_job_template": + if ( + action == "run_job_template" + or action == "run_workflow_template" + ): limit = dpath.get( action_args, "job_args.limit", @@ -395,21 +424,20 @@ async def _call_action( logger.info("action args: %s", action_args) if "ruleset" not in action_args: - action_args["ruleset"] = ruleset + action_args["ruleset"] = metadata.rule_set - await builtin_actions[action]( - event_log=self.event_log, + control = Control( + queue=self.event_log, inventory=inventory, hosts=hosts, variables=variables_copy, project_data_file=self.project_data_file, - source_ruleset_name=ruleset, - source_ruleset_uuid=ruleset_uuid, - source_rule_name=rule, - source_rule_uuid=rule_uuid, - rule_run_at=rule_run_at, - **action_args, ) + + await ACTION_CLASSES[action]( + metadata, control, **action_args + )() + except KeyError as e: logger.error( "KeyError %s with variables %s", @@ -457,12 +485,12 @@ async def _call_action( playbook_name=action_args.get("name"), status="failed", run_at=run_at(), - rule_run_at=rule_run_at, + rule_run_at=metadata.rule_run_at, message=str(error), - rule=rule, - ruleset=ruleset, - rule_uuid=rule_uuid, - ruleset_uuid=ruleset_uuid, + rule=metadata.rule, + ruleset=metadata.rule_set, + rule_uuid=metadata.rule_uuid, + ruleset_uuid=metadata.rule_set_uuid, ) ) diff --git a/ansible_rulebook/util.py b/ansible_rulebook/util.py index beefbcbf..6160f8e0 100644 --- a/ansible_rulebook/util.py +++ b/ansible_rulebook/util.py @@ -261,3 +261,16 @@ def _builtin_filter_path(name: str) -> Tuple[bool, str]: dirname = os.path.dirname(os.path.realpath(__file__)) path = os.path.join(dirname, "event_filter", filter_name + ".py") return os.path.exists(path), path + + +# TODO(alex): This function should be removed after the +# controller templates are refactored to deduplicate code +def process_controller_host_limit( + job_args: dict, + parent_hosts: list[str], +) -> str: + if "limit" in job_args: + if isinstance(job_args["limit"], list): + return ",".join(job_args["limit"]) + return str(job_args["limit"]) + return ",".join(parent_hosts) diff --git a/commitlint.config.js b/commitlint.config.js new file mode 100644 index 00000000..1f720039 --- /dev/null +++ b/commitlint.config.js @@ -0,0 +1,8 @@ +// custom rules for commitlint + +module.exports = { + extends: ['@commitlint/config-conventional'], + rules: { + 'body-max-line-length': [2, 'always', Infinity], + }, +}; diff --git a/docs/development_environment.rst b/docs/development_environment.rst index d05c9039..7f06bb13 100644 --- a/docs/development_environment.rst +++ b/docs/development_environment.rst @@ -53,8 +53,13 @@ To get flake8 and tox, just pip install them into your virtualenv. 7. Commit your changes and push your branch to GitHub: We follow the conventionalcommit_ standards for commit message During the pre-commit phase we will validate the commit message + You have to install the hook with the following command: -.. _conventionalcommit : https://www.conventionalcommits.org/en/v1.0.0/ +.. code-block:: console + + pre-commit install --hook-type commit-msg + +.. _conventionalcommit : https://www.conventionalcommits.org/en/v1.0.0/ .. code-block:: console diff --git a/requirements_test.txt b/requirements_test.txt index 7bace146..c562ce50 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -13,6 +13,7 @@ pytest-timeout pytest-xdist pytest-cov pytest-check +pytest-jira dynaconf==3.1.11 freezegun oauthlib>=3.2.0 diff --git a/setup.cfg b/setup.cfg index 6c62039a..b91f8b05 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = ansible_rulebook -version = 1.0.2 +version = 1.0.3 description = Event driven automation for Ansible url = https://github.com/ansible/ansible-rulebook license = Apache-2.0 @@ -32,6 +32,7 @@ install_requires = ansible-runner websockets drools_jpy == 0.3.7 + watchdog [options.packages.find] include = diff --git a/tests/e2e/test_actions.py b/tests/e2e/test_actions.py index 05a8308c..76a84ff4 100644 --- a/tests/e2e/test_actions.py +++ b/tests/e2e/test_actions.py @@ -98,8 +98,8 @@ def test_actions_sanity(update_environment): "'hosts': ['all']", f"'inventory': '{inventory}'", "'project_data_file': None,", - "'ruleset': 'Test actions sanity'", - "'source_rule_name': 'debug',", + "'rule_set': 'Test actions sanity'", + "'rule': 'debug',", f"'variables': {{'DEFAULT_EVENT_DELAY': '{DEFAULT_EVENT_DELAY}'", f"'DEFAULT_SHUTDOWN_AFTER': '{DEFAULT_SHUTDOWN_AFTER}',", f"'DEFAULT_STARTUP_DELAY': '{DEFAULT_STARTUP_DELAY}'", @@ -119,8 +119,8 @@ def test_actions_sanity(update_environment): r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" ) expected_debug_regexs = [ - r"'source_rule_uuid':" + f" '{uuid_regex}'", - r"'source_ruleset_uuid':" + f" '{uuid_regex}'", + r"'rule_uuid':" + f" '{uuid_regex}'", + r"'rule_set_uuid':" + f" '{uuid_regex}'", r"'uuid': " + f"'{uuid_regex}'" + r"}}}}", ] @@ -173,7 +173,7 @@ def test_actions_sanity(update_environment): ), "multiple_action action failed" assert ( - len(result.stdout.splitlines()) == 106 + len(result.stdout.splitlines()) == 105 ), "unexpected output from the rulebook" diff --git a/tests/e2e/test_non_alpha_keys.py b/tests/e2e/test_non_alpha_keys.py index e0cb48be..ea9e5863 100644 --- a/tests/e2e/test_non_alpha_keys.py +++ b/tests/e2e/test_non_alpha_keys.py @@ -14,6 +14,7 @@ DEFAULT_TIMEOUT = 15 +@pytest.mark.jira("AAP-16038") @pytest.mark.e2e @pytest.mark.asyncio async def test_non_alpha_numeric_keys(): diff --git a/tests/examples/69_enhanced_debug.yml b/tests/examples/69_enhanced_debug.yml index a38a65a5..ab45a9cb 100644 --- a/tests/examples/69_enhanced_debug.yml +++ b/tests/examples/69_enhanced_debug.yml @@ -23,11 +23,6 @@ - "Hello World {{ event }}" - "Hello Java" - "Hello Java again {{ event }}" - - name: r4 - condition: event.i == 3 - action: - debug: - var: event.does_not_exist - name: r5 condition: event.i == 4 action: diff --git a/tests/test_examples.py b/tests/test_examples.py index eceee736..dfe1fbdd 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2103,7 +2103,8 @@ async def test_46_job_template(): job_url = "https://examples.com/#/jobs/945/details" with SourceTask(rs.sources[0], "sources", {}, queue): with patch( - "ansible_rulebook.builtin.job_template_runner.run_job_template", + "ansible_rulebook.action.run_job_template." + "job_template_runner.run_job_template", return_value=response_obj, ): await run_rulesets( @@ -2138,7 +2139,8 @@ async def test_46_job_template_exception(err_msg, err): rs = ruleset_queues[0][0] with SourceTask(rs.sources[0], "sources", {}, queue): with patch( - "ansible_rulebook.builtin.job_template_runner.run_job_template", + "ansible_rulebook.action.run_job_template." + "job_template_runner.run_job_template", side_effect=err, ): await run_rulesets( @@ -2245,6 +2247,7 @@ async def test_78_complete_retract_fact(): ] +@pytest.mark.jira("AAP-9829") @pytest.mark.parametrize("err_msg,err", WORKFLOW_TEMPLATE_ERRORS) @pytest.mark.asyncio async def test_79_workflow_job_template_exception(err_msg, err): @@ -2256,8 +2259,8 @@ async def test_79_workflow_job_template_exception(err_msg, err): rs = ruleset_queues[0][0] with SourceTask(rs.sources[0], "sources", {}, queue): with patch( - "ansible_rulebook.builtin.job_template_runner." - "run_workflow_job_template", + "ansible_rulebook.action.run_workflow_template." + "job_template_runner.run_workflow_job_template", side_effect=err, ): await run_rulesets( @@ -2291,6 +2294,7 @@ async def test_79_workflow_job_template_exception(err_msg, err): assert set(action.keys()).issuperset(required_keys) +@pytest.mark.jira("AAP-9829") @pytest.mark.asyncio async def test_79_workflow_job_template(): ruleset_queues, event_log = load_rulebook( @@ -2306,8 +2310,8 @@ async def test_79_workflow_job_template(): job_url = "https://examples.com/#/jobs/workflow/945/details" with SourceTask(rs.sources[0], "sources", {}, queue): with patch( - "ansible_rulebook.builtin.job_template_runner." - "run_workflow_job_template", + "ansible_rulebook.action.run_workflow_template." + "job_template_runner.run_workflow_job_template", return_value=response_obj, ): await run_rulesets( @@ -2379,6 +2383,7 @@ async def test_81_match_single_rule(): await validate_events(event_log, **checks) +@pytest.mark.jira("AAP-16038") @pytest.mark.asyncio async def test_82_non_alpha_keys(): ruleset_queues, event_log = load_rulebook("examples/82_non_alpha_keys.yml") diff --git a/tests/unit/action/__init__.py b/tests/unit/action/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/action/conftest.py b/tests/unit/action/conftest.py new file mode 100644 index 00000000..a664eee6 --- /dev/null +++ b/tests/unit/action/conftest.py @@ -0,0 +1,28 @@ +import asyncio + +import pytest + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.metadata import Metadata + + +@pytest.fixture +def base_metadata(): + return Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + + +@pytest.fixture +def base_control(): + return Control( + queue=asyncio.Queue(), + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) diff --git a/tests/unit/action/playbooks/fail.yml b/tests/unit/action/playbooks/fail.yml new file mode 100644 index 00000000..7d470e06 --- /dev/null +++ b/tests/unit/action/playbooks/fail.yml @@ -0,0 +1,8 @@ +- name: Fail the rule + hosts: all + gather_facts: false + tasks: + - name: Fail if we have a rule name defined + when: ansible_eda.rule is defined + ansible.builtin.fail: + msg: "Failed because of Rule name: {{ ansible_eda.rule }}" diff --git a/tests/unit/action/playbooks/rule_name.yml b/tests/unit/action/playbooks/rule_name.yml new file mode 100644 index 00000000..f4187208 --- /dev/null +++ b/tests/unit/action/playbooks/rule_name.yml @@ -0,0 +1,14 @@ +- name: Print rule name that called this playbook + hosts: all + gather_facts: false + tasks: + - name: Print rule name + when: ansible_eda.rule is defined + ansible.builtin.debug: + msg: "Rule name: {{ ansible_eda.rule }}" + - name: Set the RuleName as a fact + ansible.builtin.set_fact: + results: + my_rule_name: "{{ ansible_eda.rule }}" + my_rule_set_name: "{{ ansible_eda.ruleset }}" + cacheable: true diff --git a/tests/unit/action/test_controller.py b/tests/unit/action/test_controller.py new file mode 100644 index 00000000..5ba46db3 --- /dev/null +++ b/tests/unit/action/test_controller.py @@ -0,0 +1,46 @@ +import pytest + +from ansible_rulebook.action.run_job_template import RunJobTemplate +from ansible_rulebook.action.run_workflow_template import RunWorkflowTemplate + + +@pytest.mark.parametrize( + "template_class", + [ + pytest.param(RunJobTemplate, id="job_template"), + pytest.param(RunWorkflowTemplate, id="workflow_template"), + ], +) +@pytest.mark.parametrize( + "input,expected", + [ + pytest.param({"limit": "localhost"}, "localhost", id="single_host"), + pytest.param( + {"limit": "localhost,localhost2"}, + "localhost,localhost2", + id="multiple_hosts_str", + ), + pytest.param( + {"limit": ["localhost", "localhost2"]}, + "localhost,localhost2", + id="multiple_hosts", + ), + pytest.param({}, "all", id="default"), + ], +) +@pytest.mark.asyncio +async def test_controller_custom_host_limit( + input, expected, template_class, base_metadata, base_control +): + """Test controller templates process the host limit in job_args.""" + action_args = { + "name": "fred", + "organization": "Default", + "retries": 1, + "retry": True, + "delay": 1, + "set_facts": True, + "job_args": input, + } + template = template_class(base_metadata, base_control, **action_args) + assert template.job_args["limit"] == expected diff --git a/tests/unit/action/test_debug.py b/tests/unit/action/test_debug.py new file mode 100644 index 00000000..91dbee97 --- /dev/null +++ b/tests/unit/action/test_debug.py @@ -0,0 +1,180 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.debug import Debug +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" +REQUIRED_KEYS = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", +} + + +def _validate(queue, metadata): + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + assert action["action"] == "debug" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + assert len(set(action.keys()).difference(REQUIRED_KEYS)) == 0 + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_debug(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = {} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with patch( + "ansible_rulebook.action.run_job_template.lang.get_facts", + return_value={"a": 1}, + ) as drools_mock: + await Debug(metadata, control, **action_args)() + drools_mock.assert_called_once() + + _validate(queue, metadata) + + +MSG_DATA = [ + ("msg", "Simple Message"), + ("msg", ["First Message", "Second Message"]), +] + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.parametrize("mtype, arg", MSG_DATA) +@pytest.mark.asyncio +async def test_debug_msg(mtype, arg): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = {mtype: arg} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await Debug(metadata, control, **action_args)() + + _validate(queue, metadata) + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_debug_var(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"abc": {"xyz": 1}, "event": {"a": 1}}, + project_data_file="", + ) + action_args = {"var": "abc.xyz"} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await Debug(metadata, control, **action_args)() + + _validate(queue, metadata) + + +@pytest.mark.asyncio +async def test_debug_var_missing_key(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"abc": {"xyz": 1}, "event": {"a": 1}}, + project_data_file="", + ) + action_args = {"var": "abc.klm"} + + with pytest.raises(KeyError): + await Debug(metadata, control, **action_args)() diff --git a/tests/unit/action/test_noop.py b/tests/unit/action/test_noop.py new file mode 100644 index 00000000..ec9ff4f9 --- /dev/null +++ b/tests/unit/action/test_noop.py @@ -0,0 +1,91 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.noop import Noop +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_noop(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = {} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await Noop(metadata, control, **action_args)() + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + } + assert action["action"] == "noop" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["activation_instance_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 diff --git a/tests/unit/action/test_post_event.py b/tests/unit/action/test_post_event.py new file mode 100644 index 00000000..f2d0b3be --- /dev/null +++ b/tests/unit/action/test_post_event.py @@ -0,0 +1,97 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.post_event import PostEvent +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_noop(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = {"event": {"b": 1}, "ruleset": metadata.rule_set} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with patch( + "ansible_rulebook.action.run_job_template.lang.post" + ) as drools_mock: + await PostEvent(metadata, control, **action_args)() + drools_mock.assert_called_once_with( + action_args["ruleset"], action_args["event"] + ) + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + } + assert action["action"] == "post_event" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["activation_instance_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 diff --git a/tests/unit/action/test_print_event.py b/tests/unit/action/test_print_event.py new file mode 100644 index 00000000..2cff0d18 --- /dev/null +++ b/tests/unit/action/test_print_event.py @@ -0,0 +1,91 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.print_event import PrintEvent +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_print_event(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = dict(pretty=True) + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await PrintEvent(metadata, control, **action_args)() + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + } + assert action["action"] == "print_event" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["activation_instance_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 diff --git a/tests/unit/action/test_retract_fact.py b/tests/unit/action/test_retract_fact.py new file mode 100644 index 00000000..8fc69422 --- /dev/null +++ b/tests/unit/action/test_retract_fact.py @@ -0,0 +1,108 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.retract_fact import RetractFact +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + +TEST_DATA = [(True, []), (False, ["meta"])] + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.parametrize("partial,keys_excluded", TEST_DATA) +@pytest.mark.asyncio +async def test_retract_fact(partial, keys_excluded): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = { + "fact": {"b": 1}, + "ruleset": metadata.rule_set, + "partial": partial, + } + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with patch( + "ansible_rulebook.action.run_job_template." + "lang.retract_matching_facts" + ) as drools_mock: + await RetractFact(metadata, control, **action_args)() + drools_mock.assert_called_once_with( + action_args["ruleset"], + action_args["fact"], + partial, + keys_excluded, + ) + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + } + assert action["action"] == "retract_fact" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["activation_instance_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 diff --git a/tests/unit/action/test_run_job_template.py b/tests/unit/action/test_run_job_template.py new file mode 100644 index 00000000..8646729e --- /dev/null +++ b/tests/unit/action/test_run_job_template.py @@ -0,0 +1,217 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.run_job_template import RunJobTemplate +from ansible_rulebook.exception import ( + ControllerApiException, + JobTemplateNotFoundException, +) + + +def _validate(queue, success, reason=None): + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + assert action["action"] == "run_job_template" + if reason: + assert action["reason"] == reason + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "job_template_name", + "matching_events", + "url", + "organization", + "job_id", + } + + if not success: + required_keys.add("message") + + x = set(action.keys()).difference(required_keys) + assert len(x) == 0 + + +JOB_TEMPLATE_ERRORS = [ + ("api error", ControllerApiException("api error")), + ("jt does not exist", JobTemplateNotFoundException("jt does not exist")), +] + + +@pytest.mark.parametrize("err_msg,err", JOB_TEMPLATE_ERRORS) +@pytest.mark.asyncio +async def test_run_job_template_exception(err_msg, err): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "set_facts": True, + "organization": "Default", + "retries": 0, + "retry": True, + "delay": 0, + } + with patch( + "ansible_rulebook.action.run_job_template." + "job_template_runner.run_job_template", + side_effect=err, + ): + await RunJobTemplate(metadata, control, **action_args)() + _validate(queue, False, {"error": err_msg}) + + +DROOLS_CALLS = [ + ( + "ansible_rulebook.action.run_job_template.lang.assert_fact", + dict(set_facts=True), + ), + ( + "ansible_rulebook.action.run_job_template.lang.post", + dict(post_events=True), + ), +] + + +@pytest.mark.parametrize("drools_call,additional_args", DROOLS_CALLS) +@pytest.mark.asyncio +async def test_run_job_template(drools_call, additional_args): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "organization": "Default", + "retries": 0, + "retry": True, + "delay": 0, + } + action_args.update(additional_args) + controller_job = { + "status": "failed", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + } + with patch( + "ansible_rulebook.action.run_job_template." + "job_template_runner.run_job_template", + return_value=controller_job, + ): + with patch(drools_call) as drools_mock: + await RunJobTemplate(metadata, control, **action_args)() + drools_mock.assert_called_once() + + _validate(queue, True) + + +@pytest.mark.asyncio +async def test_run_job_template_retries(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "organization": "Default", + "retries": 1, + "retry": True, + "delay": 1, + "set_facts": True, + } + controller_job = [ + { + "status": "failed", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + }, + { + "status": "success", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + }, + ] + + with patch( + "ansible_rulebook.action.run_job_template." + "job_template_runner.run_job_template", + side_effect=controller_job, + ): + with patch( + "ansible_rulebook.action.run_job_template.lang.assert_fact" + ) as drools_mock: + await RunJobTemplate(metadata, control, **action_args)() + drools_mock.assert_called_once() + + _validate(queue, True) diff --git a/tests/unit/action/test_run_module.py b/tests/unit/action/test_run_module.py new file mode 100644 index 00000000..408e9576 --- /dev/null +++ b/tests/unit/action/test_run_module.py @@ -0,0 +1,103 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import os +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.run_module import RunModule +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" +HERE = os.path.dirname(os.path.abspath(__file__)) +INVENTORY_FILE = os.path.join(HERE, "../../playbooks/inventory.yml") + + +def _validate(queue, metadata, status, rc): + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + "job_id", + "playbook_name", + "rc", + } + assert action["action"] == "run_module" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == status + assert action["rc"] == rc + assert action["type"] == "Action" + assert action["matching_events"] == {"m_0": {"a": 1}, "m_1": {"b": 2}} + + assert len(set(action.keys()).difference(required_keys)) == 0 + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_run_module(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory=INVENTORY_FILE, + hosts=["localhost"], + variables={"events": {"m_0": {"a": 1}, "m_1": {"b": 2}}}, + project_data_file="", + ) + action_args = { + "module_args": {"name": "Fred Flintstone"}, + "name": "ansible.eda.upcase", + } + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await RunModule(metadata, control, **action_args)() + + _validate(queue, metadata, "successful", 0) diff --git a/tests/unit/action/test_run_playbook.py b/tests/unit/action/test_run_playbook.py new file mode 100644 index 00000000..d4d888ec --- /dev/null +++ b/tests/unit/action/test_run_playbook.py @@ -0,0 +1,200 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import os +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.run_playbook import RunPlaybook +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import PlaybookNotFoundException + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" +HERE = os.path.dirname(os.path.abspath(__file__)) +INVENTORY_FILE = os.path.join(HERE, "../../playbooks/inventory.yml") + + +def _validate(queue, metadata, status, rc): + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "ansible_rulebook_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + "job_id", + "playbook_name", + "rc", + } + assert action["action"] == "run_playbook" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == status + assert action["rc"] == rc + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 + + +HERE = os.path.dirname(os.path.abspath(__file__)) + +DROOLS_CALLS = [ + ( + "ansible_rulebook.action.run_job_template.lang.assert_fact", + dict(set_facts=True), + ), + ( + "ansible_rulebook.action.run_job_template.lang.post", + dict(post_events=True), + ), +] + + +@pytest.mark.parametrize("drools_call,additional_args", DROOLS_CALLS) +@pytest.mark.asyncio +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_run_playbook(drools_call, additional_args): + os.chdir(HERE) + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory=INVENTORY_FILE, + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = { + "ruleset": metadata.rule_set, + "name": "./playbooks/rule_name.yml", + } + action_args.update(additional_args) + + set_fact_args = { + "results": { + "my_rule_name": metadata.rule, + "my_rule_set_name": metadata.rule_set, + }, + "meta": { + "source": {"name": "run_playbook", "type": "internal"}, + "received_at": ACTION_RUN_AT, + "uuid": DUMMY_UUID, + }, + } + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with patch(drools_call) as drools_mock: + await RunPlaybook(metadata, control, **action_args)() + drools_mock.assert_called_once_with( + action_args["ruleset"], set_fact_args + ) + + _validate(queue, metadata, "successful", 0) + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_run_playbook_missing(): + os.chdir(HERE) + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory=INVENTORY_FILE, + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = { + "ruleset": metadata.rule_set, + "name": "./playbooks/does_not_exist.yml", + "set_facts": True, + } + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with pytest.raises(PlaybookNotFoundException): + await RunPlaybook(metadata, control, **action_args)() + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_run_playbook_fail(): + os.chdir(HERE) + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory=INVENTORY_FILE, + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = { + "ruleset": metadata.rule_set, + "name": "./playbooks/fail.yml", + "set_facts": True, + } + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + await RunPlaybook(metadata, control, **action_args)() + + _validate(queue, metadata, "failed", 2) diff --git a/tests/unit/action/test_run_workflow_template.py b/tests/unit/action/test_run_workflow_template.py new file mode 100644 index 00000000..c780fe17 --- /dev/null +++ b/tests/unit/action/test_run_workflow_template.py @@ -0,0 +1,220 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.run_workflow_template import RunWorkflowTemplate +from ansible_rulebook.exception import ( + ControllerApiException, + WorkflowJobTemplateNotFoundException, +) + + +def _validate(queue, success, reason=None): + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + assert action["action"] == "run_workflow_template" + if reason: + assert action["reason"] == reason + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "name", + "matching_events", + "url", + "organization", + "job_id", + } + + if not success: + required_keys.add("message") + + x = set(action.keys()).difference(required_keys) + assert len(x) == 0 + + +WORKFLOW_TEMPLATE_ERRORS = [ + ("api error", ControllerApiException("api error")), + ( + "wf does not exist", + WorkflowJobTemplateNotFoundException("wf does not exist"), + ), +] + + +@pytest.mark.parametrize("err_msg,err", WORKFLOW_TEMPLATE_ERRORS) +@pytest.mark.asyncio +async def test_run_workflow_template_exception(err_msg, err): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "set_facts": True, + "organization": "Default", + "retries": 0, + "retry": True, + "delay": 0, + } + with patch( + "ansible_rulebook.action.run_workflow_template." + "job_template_runner.run_workflow_job_template", + side_effect=err, + ): + await RunWorkflowTemplate(metadata, control, **action_args)() + _validate(queue, False, {"error": err_msg}) + + +DROOLS_CALLS = [ + ( + "ansible_rulebook.action.run_workflow_template.lang.assert_fact", + dict(set_facts=True), + ), + ( + "ansible_rulebook.action.run_workflow_template.lang.post", + dict(post_events=True), + ), +] + + +@pytest.mark.parametrize("drools_call,additional_args", DROOLS_CALLS) +@pytest.mark.asyncio +async def test_run_workflow_template(drools_call, additional_args): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "organization": "Default", + "retries": 0, + "retry": True, + "delay": 0, + } + action_args.update(additional_args) + controller_job = { + "status": "failed", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + } + with patch( + "ansible_rulebook.action.run_workflow_template." + "job_template_runner.run_workflow_job_template", + return_value=controller_job, + ): + with patch(drools_call) as drools_mock: + await RunWorkflowTemplate(metadata, control, **action_args)() + drools_mock.assert_called_once() + + _validate(queue, True) + + +@pytest.mark.asyncio +async def test_run_workflow_template_retries(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid="u1", + rule_set_uuid="u2", + rule_run_at="abc", + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"a": 1}, + project_data_file="", + ) + action_args = { + "name": "fred", + "organization": "Default", + "retries": 1, + "retry": True, + "delay": 1, + "set_facts": True, + } + controller_job = [ + { + "status": "failed", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + }, + { + "status": "success", + "rc": 0, + "artifacts": dict(b=1), + "created": "abc", + "id": 10, + }, + ] + + with patch( + "ansible_rulebook.action.run_workflow_template." + "job_template_runner.run_workflow_job_template", + side_effect=controller_job, + ): + with patch( + "ansible_rulebook.action.run_workflow_template.lang.assert_fact" + ) as drools_mock: + await RunWorkflowTemplate(metadata, control, **action_args)() + drools_mock.assert_called_once() + + _validate(queue, True) diff --git a/tests/unit/action/test_set_fact.py b/tests/unit/action/test_set_fact.py new file mode 100644 index 00000000..f06c7dfb --- /dev/null +++ b/tests/unit/action/test_set_fact.py @@ -0,0 +1,96 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.set_fact import SetFact +from ansible_rulebook.conf import settings + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_noop(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = {"fact": {"b": 1}, "ruleset": metadata.rule_set} + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with patch( + "ansible_rulebook.action.run_job_template.lang.assert_fact" + ) as drools_mock: + await SetFact(metadata, control, **action_args)() + drools_mock.assert_called_once_with( + action_args["ruleset"], action_args["fact"] + ) + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + } + assert action["action"] == "set_fact" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0 diff --git a/tests/unit/action/test_shutdown.py b/tests/unit/action/test_shutdown.py new file mode 100644 index 00000000..15036355 --- /dev/null +++ b/tests/unit/action/test_shutdown.py @@ -0,0 +1,95 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from ansible_rulebook.action.control import Control +from ansible_rulebook.action.helper import INTERNAL_ACTION_STATUS +from ansible_rulebook.action.metadata import Metadata +from ansible_rulebook.action.shutdown import Shutdown +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import ShutdownException + +DUMMY_UUID = "eb7de03f-6f8f-4943-b69e-3c90db346edf" +RULE_UUID = "abcdef3f-6f8f-4943-b69e-3c90db346edf" +RULE_SET_UUID = "00aabbcc-1111-2222-b69e-3c90db346edf" +RULE_RUN_AT = "2023-06-11T12:13:10Z" +ACTION_RUN_AT = "2023-06-11T12:13:14Z" + + +@freeze_time("2023-06-11 12:13:14") +@pytest.mark.asyncio +async def test_shutdown(): + queue = asyncio.Queue() + metadata = Metadata( + rule="r1", + rule_set="rs1", + rule_uuid=RULE_UUID, + rule_set_uuid=RULE_SET_UUID, + rule_run_at=RULE_RUN_AT, + ) + control = Control( + queue=queue, + inventory="abc", + hosts=["all"], + variables={"event": {"a": 1}}, + project_data_file="", + ) + action_args = dict(delay=60, message="Testing Shutdown") + + with patch("uuid.uuid4", return_value=DUMMY_UUID): + with pytest.raises(ShutdownException): + await Shutdown(metadata, control, **action_args)() + + while not queue.empty(): + event = queue.get_nowait() + if event["type"] == "Action": + action = event + + required_keys = { + "action", + "action_uuid", + "activation_id", + "activation_instance_id", + "reason", + "rule_run_at", + "run_at", + "rule", + "ruleset", + "rule_uuid", + "ruleset_uuid", + "status", + "type", + "matching_events", + "delay", + "message", + "kind", + } + assert action["action"] == "shutdown" + assert action["action_uuid"] == DUMMY_UUID + assert action["activation_id"] == settings.identifier + assert action["run_at"] == ACTION_RUN_AT + assert action["rule_run_at"] == metadata.rule_run_at + assert action["rule"] == metadata.rule + assert action["ruleset"] == metadata.rule_set + assert action["rule_uuid"] == metadata.rule_uuid + assert action["ruleset_uuid"] == metadata.rule_set_uuid + assert action["status"] == INTERNAL_ACTION_STATUS + assert action["type"] == "Action" + assert action["matching_events"] == {"m": {"a": 1}} + + assert len(set(action.keys()).difference(required_keys)) == 0