diff --git a/craft_application/application.py b/craft_application/application.py index c47a580e..7f9e21ef 100644 --- a/craft_application/application.py +++ b/craft_application/application.py @@ -113,7 +113,11 @@ def __init__( self._command_groups: list[craft_cli.CommandGroup] = [] self._global_arguments: list[craft_cli.GlobalArgument] = [GLOBAL_VERSION] - if self.services.ProviderClass.is_managed(): + # When build_secrets are enabled, this contains the secret info to pass to + # managed instances. + self._secrets: secrets.BuildSecrets | None = None + + if self.is_managed(): self._work_dir = pathlib.Path("/root") else: self._work_dir = pathlib.Path.cwd() @@ -140,7 +144,7 @@ def command_groups(self) -> list[craft_cli.CommandGroup]: @property def log_path(self) -> pathlib.Path | None: """Get the path to this process's log file, if any.""" - if self.services.ProviderClass.is_managed(): + if self.is_managed(): return util.get_managed_logpath(self.app) return None @@ -197,6 +201,10 @@ def project(self) -> models.Project: return self.app.ProjectClass.from_yaml_data(yaml_data, project_file) + def is_managed(self) -> bool: + """Shortcut to tell whether we're running in managed mode.""" + return self.services.ProviderClass.is_managed() + def run_managed(self, platform: str | None, build_for: str | None) -> None: """Run the application in a managed instance.""" extra_args: dict[str, Any] = {} @@ -205,7 +213,15 @@ def run_managed(self, platform: str | None, build_for: str | None) -> None: build_plan = _filter_plan(build_plan, platform, build_for) for build_info in build_plan: - extra_args["env"] = {"CRAFT_PLATFORM": build_info.platform} + env = {"CRAFT_PLATFORM": build_info.platform} + + if self.app.features.build_secrets: + # If using build secrets, put them in the environment of the managed + # instance. + secret_values = cast(secrets.BuildSecrets, self._secrets) + env.update(secret_values.environment) + + extra_args["env"] = env craft_cli.emit.debug( f"Running {self.app.name}:{build_info.platform} in {build_info.build_for} instance..." @@ -321,7 +337,7 @@ def run(self) -> int: # noqa: PLR0912 (too many branches due to error handling) if command.always_load_project: self.services.project = self.project return_code = dispatcher.run() or 0 - elif not self.services.ProviderClass.is_managed(): + elif not self.is_managed(): # command runs in inner instance, but this is the outer instance self.services.project = self.project self.run_managed(platform, build_for) @@ -374,7 +390,7 @@ def _emit_error( error.__cause__ = cause # Do not report the internal logpath if running inside an instance - if self.services.ProviderClass.is_managed(): + if self.is_managed(): error.logpath_report = False craft_cli.emit.error(error) @@ -427,9 +443,16 @@ def _set_global_environment(self, info: craft_parts.ProjectInfo) -> None: def _render_secrets(self, yaml_data: dict[str, Any]) -> None: """Render build-secrets, in-place.""" - secret_values = secrets.render_secrets(yaml_data) + secret_values = secrets.render_secrets( + yaml_data, managed_mode=self.is_managed() + ) + + num_secrets = len(secret_values.secret_strings) + craft_cli.emit.debug(f"Project has {num_secrets} build-secret(s).") + + craft_cli.emit.set_secrets(list(secret_values.secret_strings)) - craft_cli.emit.set_secrets(list(secret_values)) + self._secrets = secret_values def _extra_yaml_transform(self, yaml_data: dict[str, Any]) -> dict[str, Any]: """Perform additional transformations on a project's yaml data. diff --git a/craft_application/errors.py b/craft_application/errors.py index e75e9f45..edc74a66 100644 --- a/craft_application/errors.py +++ b/craft_application/errors.py @@ -109,8 +109,8 @@ def from_os_error(cls, err: OSError) -> Self: class SecretsCommandError(CraftError): """Error when rendering a build-secret.""" - def __init__(self, host_secret: str, error_message: str) -> None: - message = f'Error when processing secret "{host_secret}"' + def __init__(self, host_directive: str, error_message: str) -> None: + message = f'Error when processing secret "{host_directive}"' details = f"Command output: {error_message}" super().__init__(message=message, details=details) @@ -118,6 +118,18 @@ def __init__(self, host_secret: str, error_message: str) -> None: class SecretsFieldError(CraftError): """Error when using a build-secret in a disallowed field.""" - def __init__(self, host_secret: str, field_name: str) -> None: - message = f'Build secret "{host_secret}" is not allowed on field "{field_name}"' + def __init__(self, host_directive: str, field_name: str) -> None: + message = ( + f'Build secret "{host_directive}" is not allowed on field "{field_name}"' + ) + super().__init__(message=message) + + +class SecretsManagedError(CraftError): + """A secret value was not found while in managed mode.""" + + def __init__(self, host_directive: str) -> None: + message = ( + f'Build secret "{host_directive}" was not found in the managed environment.' + ) super().__init__(message=message) diff --git a/craft_application/secrets.py b/craft_application/secrets.py index 92867aa6..2df1a7c9 100644 --- a/craft_application/secrets.py +++ b/craft_application/secrets.py @@ -16,48 +16,75 @@ """Handling of build-time secrets.""" from __future__ import annotations +import base64 +import dataclasses +import json +import os import re import subprocess -from typing import Any +from typing import Any, Mapping, cast from craft_application import errors SECRET_REGEX = re.compile(r"\$\(HOST_SECRET:(?P.*)\)") -def render_secrets(yaml_data: dict[str, Any]) -> set[str]: +@dataclasses.dataclass(frozen=True) +class BuildSecrets: + """The data needed to correctly handle build-time secrets in the application.""" + + environment: dict[str, str] + """The encoded information that can be passed to a managed instance's environment.""" + + secret_strings: set[str] + """The actual secret text strings, to be passed to craft_cli.""" + + +def render_secrets(yaml_data: dict[str, Any], *, managed_mode: bool) -> BuildSecrets: """Render/expand the build secrets in a project's yaml data (in-place). This function will process directives of the form $(HOST_SECRET:) in string values in ``yaml_data``. For each such directive, the part is executed (with - bash) and the resulting output replaces the whole directive. The returned set - contains the outputs of all HOST_SECRET processing (for masking with craft-cli). + bash) and the resulting output replaces the whole directive. The returned object + contains the result of HOST_SECRET processing (for masking with craft-cli and + forwarding to managed instances). Note that only a few fields are currently supported: - "source" and "build-environment" for parts. Using HOST_SECRET directives in any other field is an error. + + :param yaml_data: The project's loaded data + :param managed_mode: Whether the current application is running in managed mode. """ command_cache: dict[str, str] = {} + if managed_mode: + command_cache = _decode_commands(os.environ) + # Process the fields where we allow build secrets parts = yaml_data.get("parts", {}) for part in parts.values(): - _render_part_secrets(part, command_cache) + _render_part_secrets(part, command_cache, managed_mode) # Now loop over all the data to check for build secrets in disallowed fields _check_for_secrets(yaml_data) - return set(command_cache.values()) + return BuildSecrets( + environment=_encode_commands(command_cache), + secret_strings=set(command_cache.values()), + ) def _render_part_secrets( - part_data: dict[str, Any], command_cache: dict[str, Any] + part_data: dict[str, Any], + command_cache: dict[str, Any], + managed_mode: bool, # noqa: FBT001 (boolean positional arg) ) -> None: # Render "source" source = part_data.get("source", "") - if (rendered := _render_secret(source, command_cache)) is not None: + if (rendered := _render_secret(source, command_cache, managed_mode)) is not None: part_data["source"] = rendered # Render "build-environment" @@ -65,18 +92,27 @@ def _render_part_secrets( # "build-environment" is a list of dicts with a single item each for single_entry_dict in build_env: for var_name, var_value in single_entry_dict.items(): - if (rendered := _render_secret(var_value, command_cache)) is not None: + rendered = _render_secret(var_value, command_cache, managed_mode) + if rendered is not None: single_entry_dict[var_name] = rendered -def _render_secret(text: str, command_cache: dict[str, str]) -> str | None: - if match := SECRET_REGEX.search(text): +def _render_secret( + yaml_string: str, + command_cache: dict[str, str], + managed_mode: bool, # noqa: FBT001 (boolean positional arg) +) -> str | None: + if match := SECRET_REGEX.search(yaml_string): command = match.group("command") host_directive = match.group(0) if command in command_cache: output = command_cache[command] else: + if managed_mode: + # In managed mode the command *must* be in the cache; this is an error. + raise errors.SecretsManagedError(host_directive) + try: output = _run_command(command) except subprocess.CalledProcessError as err: @@ -85,7 +121,7 @@ def _render_secret(text: str, command_cache: dict[str, str]) -> str | None: ) from err command_cache[command] = output - return text.replace(host_directive, output) + return yaml_string.replace(host_directive, output) return None @@ -115,4 +151,40 @@ def _check_str( value: Any, field_name: str # noqa: ANN401 (using Any on purpose) ) -> None: if isinstance(value, str) and (match := SECRET_REGEX.search(value)): - raise errors.SecretsFieldError(host_secret=match.group(), field_name=field_name) + raise errors.SecretsFieldError( + host_directive=match.group(), field_name=field_name + ) + + +def _encode_commands(commands: dict[str, str]) -> dict[str, str]: + """Encode a dict of (command, command-output) pairs for env serialization. + + The resulting dict can be passed to the environment of managed instances (via + subprocess.run() or equivalents). + """ + if not commands: + # Empty dict: don't spend time encoding anything. + return {} + + # The current encoding scheme is to dump the entire dict to base64-encoded json + # string, then put this resulting string in a dict under the "CRAFT_SECRETS" key. + as_json = json.dumps(commands) + as_bytes = as_json.encode("utf-8") + as_b64 = base64.b64encode(as_bytes) + as_str = as_b64.decode("ascii") + + return {"CRAFT_SECRETS": as_str} + + +def _decode_commands(environment: Mapping[str, Any]) -> dict[str, str]: + as_str = environment.get("CRAFT_SECRETS") + + if as_str is None: + # Not necessarily an error: it means the project has no secrets. + return {} + + as_b64 = as_str.encode("ascii") + as_bytes = base64.b64decode(as_b64) + as_json = as_bytes.decode("utf-8") + + return cast("dict[str, str]", json.loads(as_json)) diff --git a/tests/integration/test_application.py b/tests/integration/test_application.py index 659f5367..69215811 100644 --- a/tests/integration/test_application.py +++ b/tests/integration/test_application.py @@ -21,6 +21,7 @@ import craft_cli import pytest import pytest_check +from craft_application import secrets from craft_application.util import yaml from typing_extensions import override @@ -265,31 +266,81 @@ def test_global_environment( assert variables["project_version"] == "1.2.3" -@pytest.mark.enable_features("build_secrets") -def test_build_secrets_destructive(create_app, monkeypatch, tmp_path, capsys): +@pytest.fixture() +def setup_secrets_project(create_app, monkeypatch, tmp_path): """Test the use of build secrets in destructive mode.""" - monkeypatch.setenv("CRAFT_DEBUG", "1") - monkeypatch.chdir(tmp_path) - shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True) - # Run in destructive mode - monkeypatch.setattr("sys.argv", ["testcraft", "prime", "-v", "--destructive-mode"]) + def _inner(*, destructive_mode: bool): + monkeypatch.setenv("CRAFT_DEBUG", "1") + monkeypatch.chdir(tmp_path) + shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True) - app = create_app() + argv = ["testcraft", "prime", "-v"] + if destructive_mode: + # Run in destructive mode + argv.append("--destructive-mode") + + monkeypatch.setattr("sys.argv", argv) + + return create_app() + + return _inner + + +@pytest.fixture() +def check_secrets_output(tmp_path, capsys): + def _inner(): + prime_dir = tmp_path / "prime" + assert (prime_dir / "source-file.txt").read_text().strip() == "A source file" + assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret" + + # Check that the "secrets" were masked in console output and the logfile + _, stderr = capsys.readouterr() + log_contents = craft_cli.emit._log_filepath.read_text() + + for target in (stderr, log_contents): + assert "Dumping SECRET_VAR: my-secret" not in target + assert "Dumping SECRET_VAR: *****" in target + + return _inner + + +@pytest.mark.enable_features("build_secrets") +def test_build_secrets_destructive( + monkeypatch, setup_secrets_project, check_secrets_output +): + """Test the use of build secrets in destructive mode.""" + app = setup_secrets_project(destructive_mode=True) # Set the environment variables that the project needs monkeypatch.setenv("HOST_SOURCE_FOLDER", "secret-source") monkeypatch.setenv("HOST_SECRET_VAR", "my-secret") + app.run() - prime_dir = tmp_path / "prime" - assert (prime_dir / "source-file.txt").read_text().strip() == "A source file" - assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret" + check_secrets_output() + - # Check that the "secrets" were masked in console output and the logfile - _, stderr = capsys.readouterr() - log_contents = craft_cli.emit._log_filepath.read_text() +@pytest.mark.enable_features("build_secrets") +def test_build_secrets_managed( + monkeypatch, tmp_path, setup_secrets_project, check_secrets_output +): + """Test the use of build secrets in managed mode.""" + app = setup_secrets_project(destructive_mode=False) + + monkeypatch.setenv("CRAFT_MANAGED_MODE", "1") + assert app.is_managed + app._work_dir = tmp_path + + # Before running the application, configure its environment "as if" the host app + # had placed the secrets there. + commands = { + "echo ${HOST_SOURCE_FOLDER}": "secret-source", + "echo ${HOST_SECRET_VAR}": "my-secret", + } + encoded = secrets._encode_commands(commands) + monkeypatch.setenv("CRAFT_SECRETS", encoded["CRAFT_SECRETS"]) + + app.run() - for target in (stderr, log_contents): - assert "Dumping SECRET_VAR: my-secret" not in target - assert "Dumping SECRET_VAR: *****" in target + check_secrets_output() diff --git a/tests/unit/test_application.py b/tests/unit/test_application.py index 18198ba6..a73d41dd 100644 --- a/tests/unit/test_application.py +++ b/tests/unit/test_application.py @@ -30,7 +30,7 @@ import craft_providers import pytest import pytest_check -from craft_application import application, commands, services +from craft_application import application, commands, secrets, services from craft_application.models import BuildInfo from craft_application.util import ( get_host_architecture, # pyright: ignore[reportGeneralTypeIssues] @@ -187,6 +187,31 @@ def test_run_managed_failure(app, fake_project): assert exc_info.value.brief == "Failed to execute testcraft in instance." +@pytest.mark.enable_features("build_secrets") +def test_run_managed_secrets(app, fake_project): + mock_provider = mock.MagicMock(spec_set=services.ProviderService) + instance = mock_provider.instance.return_value.__enter__.return_value + mock_execute = instance.execute_run + app.services.provider = mock_provider + app.project = fake_project + + fake_encoded_environment = { + "CRAFT_TEST": "banana", + } + app._secrets = secrets.BuildSecrets( + environment=fake_encoded_environment, + secret_strings=set(), + ) + + app.run_managed(None, get_host_architecture()) + + # Check that the encoded secrets were propagated to the managed instance. + assert len(mock_execute.mock_calls) == 1 + call = mock_execute.mock_calls[0] + execute_env = call.kwargs["env"] + assert execute_env["CRAFT_TEST"] == "banana" + + def test_run_managed_multiple(app, fake_project, monkeypatch): mock_provider = mock.MagicMock(spec_set=services.ProviderService) app.services.provider = mock_provider diff --git a/tests/unit/test_secrets.py b/tests/unit/test_secrets.py index 99a8c2b1..cb2b4a96 100644 --- a/tests/unit/test_secrets.py +++ b/tests/unit/test_secrets.py @@ -19,7 +19,8 @@ from craft_application import errors, secrets -def test_secrets_parts(monkeypatch): +@pytest.fixture() +def good_yaml_data(): p1_data = { "source": "the source secret is $(HOST_SECRET:echo ${SECRET_1})", "build-environment": [ @@ -28,14 +29,35 @@ def test_secrets_parts(monkeypatch): ], } - monkeypatch.setenv("SECRET_1", "source-secret") - monkeypatch.setenv("SECRET_2", "env-secret") + return {"parts": {"p1": p1_data}} - yaml_data = {"parts": {"p1": p1_data}} - secret_values = secrets.render_secrets(yaml_data) - - assert secret_values == {"source-secret", "env-secret"} +@pytest.mark.parametrize("managed_mode", [True, False]) +def test_secrets_parts(monkeypatch, good_yaml_data, managed_mode): + commands = { + "echo ${SECRET_1}": "source-secret", + "echo ${SECRET_2}": "env-secret", + } + encoded = secrets._encode_commands(commands) + + if not managed_mode: + # Running on the "host"; the secrets will be obtained by running commands, + # so we set some environment variables that those commands will use. + monkeypatch.setenv("SECRET_1", "source-secret") + monkeypatch.setenv("SECRET_2", "env-secret") + else: + # Managed mode; the secrets must necessarily come from the environment already. + # Here we fake the work that the host application would have done: get the + # secrets by running the commands, and then encode them into the environment + # of the launched (managed) application. + monkeypatch.setenv("CRAFT_SECRETS", encoded["CRAFT_SECRETS"]) + + secret_values = secrets.render_secrets(good_yaml_data, managed_mode=managed_mode) + + assert secret_values.environment == encoded + assert secret_values.secret_strings == {"source-secret", "env-secret"} + + p1_data = good_yaml_data["parts"]["p1"] assert p1_data["source"] == "the source secret is source-secret" assert p1_data["build-environment"][0]["VAR1"] == "the env secret is env-secret" @@ -47,7 +69,7 @@ def test_secrets_command_error(): yaml_data = {"parts": {"p1": {"source": "$(HOST_SECRET:echo ${I_DONT_EXIST})"}}} with pytest.raises(errors.SecretsCommandError) as exc: - secrets.render_secrets(yaml_data) + secrets.render_secrets(yaml_data, managed_mode=False) expected_message = ( 'Error when processing secret "$(HOST_SECRET:echo ${I_DONT_EXIST})"' @@ -71,7 +93,7 @@ def test_secrets_cache(mocker, monkeypatch): yaml_data = {"parts": {"p1": p1_data}} spied_run = mocker.spy(secrets, "_run_command") - secrets.render_secrets(yaml_data) + secrets.render_secrets(yaml_data, managed_mode=False) # Even though the HOST_SECRET is used twice, only a single bash call is done because # the command is the same. @@ -96,8 +118,37 @@ def test_secrets_bad_field(monkeypatch, yaml_data, field_name): monkeypatch.setenv("GIT_VERSION", "1.0") with pytest.raises(errors.SecretsFieldError) as exc: - secrets.render_secrets(yaml_data) + secrets.render_secrets(yaml_data, managed_mode=False) expected_error = f'Build secret "{_SECRET}" is not allowed on field "{field_name}"' err = exc.value assert str(err) == expected_error + + +def test_secrets_encode_decode(): + commands = { + "echo $VAR1": "var1-value", + "echo $VAR2": "var2-value", + } + + encoded = secrets._encode_commands(commands) + decoded = secrets._decode_commands(encoded) + + assert decoded == commands + + +def test_secrets_encode_decode_empty(): + commands = {} + + encoded = secrets._encode_commands(commands) + decoded = secrets._decode_commands(encoded) + + assert decoded == encoded == commands + + +def test_secrets_managed_missing_key(good_yaml_data): + with pytest.raises(errors.SecretsManagedError) as exc: + secrets.render_secrets(good_yaml_data, managed_mode=True) + + expected_message = 'Build secret "$(HOST_SECRET:echo ${SECRET_1})" was not found in the managed environment.' + assert str(exc.value) == expected_message