Skip to content

Commit

Permalink
modules: factor out cliargs into separate dataclass (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxhoesel authored Apr 8, 2024
1 parent 90780c8 commit ec8d82d
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 100 deletions.
123 changes: 73 additions & 50 deletions plugins/module_utils/cli_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
import tempfile
Expand Down Expand Up @@ -49,71 +51,40 @@ class CliCommandResult:


@dataclass
class CliCommand:
"""CliCommand represents a single command to be run by step-cli
class CliCommandArgs:
"""Arguments to be passed to the command.
Args:
executable(StepCliExecutable): The executable to run the command with
args (List[str]): Fixed list of parameters/args to pass to step-cli, such as ["ca", "certificate"]
module_param_args (Dict[str,str]): Args to be sourced from a modules args. Keys must be valid module params
(such as "provisioner"), while their values are the corresponding step-cli parameter
(such as "--provisioner"). Values are transformed according to their type:
args is a plain list of arguments to be passed in
module_param_args is a str-str dict that maps module params to command-line args.
For example, {"provisioner": "--provisioner"} will cause the module param "provisioner" to be passed in
as the value to the "--provisioner" argument. Values are transformed according to their type:
- bools only pass the corresponding flag (e.g. --force)
- list causes the mapped arg to be repeated for each value (e.g. --in=1 --in=2)
- all other types are formatted and passed as-is
module_tmpfile_args (Dict[str,str]): Same as module_params, except that the value is written to a
temporary file and the passed value is the path to that file.
For example, {"password": "--jwk-password-file"}, will result in the value of the
"password" module arg to be written to a temporary file that is then passed to
"--jwk-password-file". This is primarily intended for password files and sensitive data
run_in_check_mode (bool): Whether to run this command even if Ansibles check_mode is enabled.
If false and check_mode is enabled, the invocation will exit with rc=0 and no output.
Only set this on invocations that don't change the system state! Default is false
fail_on_error(bool): Whether to run module_fail if this invocation fails. Default is true
module_tmpfile_args is the same as module_param_args, except that the value is written to a temporary file
at runtime and the path to that file is passed instead. This is primarily intended for password files.
"""
executable: StepCliExecutable
args: List[str]
module_param_args: Dict[str, str] = field(default_factory=dict)
module_tmpfile_args: Dict[str, str] = field(default_factory=dict)
run_in_check_mode: bool = False
fail_on_error: bool = True

def run(self, module: AnsibleModule) -> CliCommandResult:
"""Execute the command with the given step-cli executable and Ansible module
def join(self, other: CliCommandArgs) -> CliCommandArgs:
"""Joins this Args object with another one and produces a new object containing values from both.
`args` are appended while the module_x_args values are merged. The other object takes precedence.
Args:
module (AnsibleModule): The Ansible module
other (Self): The other object to merge with
Returns:
CliCommandResult: Result of the command.
Raises:
CliError if the module args don't match with the provided params
Self: New Args object
"""
# use a context manager to ensure that our sensitive temporary files are *always* deleted
with tempfile.TemporaryDirectory("ansible-smallstep") as tmpdir:
args = self._build_args(module, Path(tmpdir))
return CliCommandArgs(self.args + other.args,
{**self.module_param_args, **other.module_param_args},
{**self.module_tmpfile_args, **other.module_tmpfile_args}
)

if module.check_mode and not self.run_in_check_mode:
return CliCommandResult(0, "", "")

rc, stdout, stderr = module.run_command(args)
if rc != 0 and self.fail_on_error:
if ("error allocating terminal" in stderr or "open /dev/tty: no such device or address" in stderr):
module.fail_json(
"Failed to run command: step-cli tried to open a terminal for interactive input. "
"This happens when step-cli prompts for additional parameters or asks for confirmation. "
"You may be missing a required parameter (such as 'force'). Check the module documentation. "
"If you are sure that you provided all required parameters, you may have encountered a bug. "
f"Please file an issue at {COLLECTION_REPO} if you think this is the case. "
f"Failed command: \'{' '.join(args)}\'"
)
else:
module.fail_json(f"Error running command \'{' '.join(args)}\'. Error: {stderr}")
return CliCommandResult(rc, stdout, stderr)

def _build_args(self, module: AnsibleModule, tmpdir: Path) -> List[str]:
args = [self.executable.path] + self.args
def build(self, module: AnsibleModule, tmpdir: Path) -> List[str]:
args = self.args
module_params = cast(Dict, module.params)

# Create temporary files for any parameters that need to point to files, such as password-file
Expand Down Expand Up @@ -144,3 +115,55 @@ def _build_args(self, module: AnsibleModule, tmpdir: Path) -> List[str]:
# all other types
args.extend([self.module_param_args[param_name], str(module_params[param_name])])
return args


@dataclass
class CliCommand:
"""CliCommand represents a single command to be run by step-cli
Args:
executable(StepCliExecutable): The executable to run the command with
argspec (CliCommandArgs): Arguments to be passed to the executable
run_in_check_mode (bool): Whether to run this command even if Ansibles check_mode is enabled.
If false and check_mode is enabled, the invocation will exit with rc=0 and no output.
Only set this on invocations that don't change the system state! Default is false
fail_on_error(bool): Whether to run module_fail if this invocation fails. Default is true
"""
executable: StepCliExecutable
args: CliCommandArgs
run_in_check_mode: bool = False
fail_on_error: bool = True

def run(self, module: AnsibleModule) -> CliCommandResult:
"""Execute the command with the given step-cli executable and Ansible module
Args:
module (AnsibleModule): The Ansible module
Returns:
CliCommandResult: Result of the command.
Raises:
CliError if the module args don't match with the provided params
"""
# use a context manager to ensure that our sensitive temporary files are *always* deleted
with tempfile.TemporaryDirectory("ansible-smallstep") as tmpdir:
cmd = [self.executable.path] + self.args.build(module, Path(tmpdir))

if module.check_mode and not self.run_in_check_mode:
return CliCommandResult(0, "", "")

rc, stdout, stderr = module.run_command(cmd)
if rc != 0 and self.fail_on_error:
if ("error allocating terminal" in stderr or "open /dev/tty: no such device or address" in stderr):
module.fail_json(
"Failed to run command: step-cli tried to open a terminal for interactive input. "
"This happens when step-cli prompts for additional parameters or asks for confirmation. "
"You may be missing a required parameter (such as 'force'). Check the module documentation. "
"If you are sure that you provided all required parameters, you may have encountered a bug. "
f"Please file an issue at {COLLECTION_REPO} if you think this is the case. "
f"Failed command: \'{' '.join(cmd)}\'"
)
else:
module.fail_json(f"Error running command \'{' '.join(cmd)}\'. Error: {stderr}")
return CliCommandResult(rc, stdout, stderr)
6 changes: 3 additions & 3 deletions plugins/module_utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Dict, Any

from ansible.module_utils.basic import AnsibleModule
from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable
from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable, CliCommandArgs


@dataclass
Expand Down Expand Up @@ -42,7 +42,7 @@ def get_certificate_info(
if roots:
inspect_args.extend(["--roots", roots])

inspect_cmd = CliCommand(executable, inspect_args, run_in_check_mode=True)
inspect_cmd = CliCommand(executable, CliCommandArgs(inspect_args), run_in_check_mode=True)
inspect_res = inspect_cmd.run(module)
# The docs say inspect outputs to stderr, but my shell says otherwise:
# https://github.com/smallstep/cli/issues/1032
Expand All @@ -56,7 +56,7 @@ def get_certificate_info(
verify_args.extend(["--server-name", server_name])
if roots:
verify_args.extend(["--roots", roots])
verify_cmd = CliCommand(executable, verify_args, run_in_check_mode=True, fail_on_error=False)
verify_cmd = CliCommand(executable, CliCommandArgs(verify_args), run_in_check_mode=True, fail_on_error=False)
verify_res = verify_cmd.run(module)
valid = verify_res.rc == 0
invalid_reason = "" if valid else verify_res.stderr
Expand Down
6 changes: 5 additions & 1 deletion plugins/module_utils/params/ca_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common import validation

from ..cli_wrapper import CliCommandArgs
from .params_helper import ParamsHelper


Expand All @@ -18,7 +19,10 @@ class AdminParams(ParamsHelper):
admin_subject=dict(type="str", aliases=["admin_name"]),
admin_password_file=dict(type="path", no_log=False)
)
cliarg_map: Dict[str, str] = {key: f"--{key.replace('_', '-')}" for key in argument_spec}

@classmethod
def cli_args(cls) -> CliCommandArgs:
return CliCommandArgs([], {key: f"--{key.replace('_', '-')}" for key in cls.argument_spec})

# pylint: disable=useless-parent-delegation
def __init__(self, module: AnsibleModule) -> None:
Expand Down
6 changes: 5 additions & 1 deletion plugins/module_utils/params/ca_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ansible.module_utils.basic import AnsibleModule

from ..cli_wrapper import CliCommandArgs
from .params_helper import ParamsHelper


Expand All @@ -16,7 +17,10 @@ class CaConnectionParams(ParamsHelper):
ca_config=dict(type="path"),
offline=dict(type="bool"),
)
cliarg_map: Dict[str, str] = {key: f"--{key.replace('_', '-')}" for key in argument_spec}

@classmethod
def cli_args(cls) -> CliCommandArgs:
return CliCommandArgs([], {key: f"--{key.replace('_', '-')}" for key in cls.argument_spec})

# pylint: disable=useless-parent-delegation
def __init__(self, module: AnsibleModule) -> None:
Expand Down
8 changes: 5 additions & 3 deletions plugins/module_utils/params/params_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from ansible.module_utils.basic import AnsibleModule

from ..cli_wrapper import CliCommandArgs


class ParamsHelper(ABC):
"""A helper class that provides a set of module parameters and a method to validate them.
Expand All @@ -27,8 +29,8 @@ def argument_spec(self) -> Dict[str, Dict[str, Any]]:
"""Returns the helpers argument spec, as expected by AnsibleModule()
"""

@property
@classmethod
@abstractmethod
def cliarg_map(self) -> Dict[str, str]:
"""Returns a map of params with their corresponding cli parameter, for use in CliWrapper
def cli_args(cls) -> CliCommandArgs:
"""Returns a CliCommandArgs object containing all the arguments needed for this parameter group
"""
5 changes: 3 additions & 2 deletions plugins/modules/step_ca_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from typing import Dict, cast, Any

from ansible.module_utils.basic import AnsibleModule
from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable
from ..module_utils.cli_wrapper import CliCommand, CliCommandArgs, StepCliExecutable

DEFAULTS_FILE = f"{os.environ.get('STEPPATH', os.environ['HOME'] + '/.step')}/config/defaults.json"

Expand Down Expand Up @@ -92,13 +92,14 @@ def run_module():
result["failed"] = True
module.exit_json(**result)

bootstrap_cmd = CliCommand(cli_exec, ["ca", "bootstrap"], {
bootstrap_args = CliCommandArgs(["ca", "bootstrap"], {
"ca_url": "--ca-url",
"fingerprint": "--fingerprint",
"force": "--force",
"install": "--install",
"redirect_url": "--redirect-url",
})
bootstrap_cmd = CliCommand(cli_exec, bootstrap_args)
bootstrap_cmd.run(module)
result["changed"] = True
module.exit_json(**result)
Expand Down
14 changes: 5 additions & 9 deletions plugins/modules/step_ca_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
from ansible.module_utils.common.validation import check_required_if

from ..module_utils.params.ca_connection import CaConnectionParams
from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable
from ..module_utils.cli_wrapper import CliCommand, CliCommandArgs, StepCliExecutable
from ..module_utils import helpers
from ..module_utils.constants import DEFAULT_STEP_CLI_EXECUTABLE

Expand Down Expand Up @@ -297,10 +297,8 @@ def create_certificate(executable: StepCliExecutable, module: AnsibleModule, for
if force:
args.append("--force")

create_cmd = CliCommand(executable, args, {
**cert_cliarg_map,
**CaConnectionParams.cliarg_map
})
create_args = CaConnectionParams.cli_args().join(CliCommandArgs(args, cert_cliarg_map))
create_cmd = CliCommand(executable, create_args)
create_cmd.run(module)
return {"changed": True}

Expand Down Expand Up @@ -355,10 +353,8 @@ def revoke_certificate(executable: StepCliExecutable, module: AnsibleModule) ->
"revoke_reason_code": "--reasonCode",
"token": "--token"
}
revoke_cmd = CliCommand(executable, ["ca", "revoke"], {
**revoke_cliarg_map,
**CaConnectionParams.cliarg_map
}, fail_on_error=False)
revoke_args = CaConnectionParams.cli_args().join(CliCommandArgs(["ca", "revoke"], revoke_cliarg_map))
revoke_cmd = CliCommand(executable, revoke_args, fail_on_error=False)
res = revoke_cmd.run(module)

if res.rc != 0 and "is already revoked" in res.stderr:
Expand Down
37 changes: 19 additions & 18 deletions plugins/modules/step_ca_provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@
from ansible.module_utils.basic import AnsibleModule

from ..module_utils.params.ca_admin import AdminParams
from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable
from ..module_utils.cli_wrapper import CliCommand, CliCommandArgs, StepCliExecutable
from ..module_utils.constants import DEFAULT_STEP_CLI_EXECUTABLE

# We cannot use the default connection module util, as that one includes the --offline flag,
Expand All @@ -453,7 +453,7 @@
"root": "--root"
}

CREATE_UPDATE_CLIARG_MAP = {
CREATE_UPDATE_CLIARGS = {
"allow_renewal_after_expiry": "--allow-renewal-after-expiry",
"aws_accounts": "--aws-account",
"azure_audience": "--azure-audience",
Expand Down Expand Up @@ -505,30 +505,31 @@


def add_provisioner(name: str, provisioner_type: str, executable: StepCliExecutable, module: AnsibleModule):
cmd = CliCommand(executable, ["ca", "provisioner", "add", name, "--type", provisioner_type], {
**CREATE_UPDATE_CLIARG_MAP,
**CONNECTION_CLIARG_MAP,
**AdminParams.cliarg_map
})
args = AdminParams.cli_args().join(CliCommandArgs(
["ca", "provisioner", "add", name, "--type", provisioner_type],
{**CREATE_UPDATE_CLIARGS, **CONNECTION_CLIARG_MAP}
))
cmd = CliCommand(executable, args)
cmd.run(module)
return


def update_provisioner(name: str, executable: StepCliExecutable, module: AnsibleModule):
cmd = CliCommand(executable, ["ca", "provisioner", "update", name], {
**CREATE_UPDATE_CLIARG_MAP,
**CONNECTION_CLIARG_MAP,
**AdminParams.cliarg_map
})
args = AdminParams.cli_args().join(CliCommandArgs(
["ca", "provisioner", "update", name],
{**CREATE_UPDATE_CLIARGS, **CONNECTION_CLIARG_MAP}
))
cmd = CliCommand(executable, args)
cmd.run(module)
return


def remove_provisioner(name: str, executable: StepCliExecutable, module: AnsibleModule):
cmd = CliCommand(executable, ["ca", "provisioner", "remove", name], {
**CONNECTION_CLIARG_MAP,
**AdminParams.cliarg_map
})
args = AdminParams.cli_args().join(CliCommandArgs(
["ca", "provisioner", "remove", name],
CONNECTION_CLIARG_MAP
))
cmd = CliCommand(executable, args)
cmd.run(module)
return

Expand Down Expand Up @@ -609,8 +610,8 @@ def run_module():
if state == "present" and not p_type:
module.fail_json("Provisioner type is required when state == present")

ca_online_check = CliCommand(executable, ["ca", "provisioner", "list"],
CONNECTION_CLIARG_MAP, run_in_check_mode=True, fail_on_error=False)
ca_online_check_args = CliCommandArgs(["ca", "provisioner", "list"], CONNECTION_CLIARG_MAP)
ca_online_check = CliCommand(executable, ca_online_check_args, run_in_check_mode=True, fail_on_error=False)
ca_online_res = ca_online_check.run(module)
# Offline provisioner management is possible even if the CA is down.
# ca provisioner list does depend on the CA being available however, so we need some backup strategies.
Expand Down
9 changes: 4 additions & 5 deletions plugins/modules/step_ca_renew.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@

from ansible.module_utils.basic import AnsibleModule

from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable
from ..module_utils.cli_wrapper import CliCommand, CliCommandArgs, StepCliExecutable
from ..module_utils.params.ca_connection import CaConnectionParams
from ..module_utils.constants import DEFAULT_STEP_CLI_EXECUTABLE

Expand Down Expand Up @@ -111,10 +111,9 @@ def run_module():
# All parameters can be converted to a mapping by just appending -- and replacing the underscores
renew_cliarg_map = {arg: f"--{arg.replace('_', '-')}" for arg in renew_cliargs}

renew_cmd = CliCommand(executable, ["ca", "renew", module_params["crt_file"], module_params["key_file"]], {
**renew_cliarg_map,
**CaConnectionParams.cliarg_map
})
renew_args = CaConnectionParams.cli_args().join(CliCommandArgs(
["ca", "renew", module_params["crt_file"], module_params["key_file"]], renew_cliarg_map))
renew_cmd = CliCommand(executable, renew_args)
renew_res = renew_cmd.run(module)
if "Your certificate has been saved in" in renew_res.stderr:
result["changed"] = True
Expand Down
Loading

0 comments on commit ec8d82d

Please sign in to comment.