Skip to content

Commit

Permalink
feat: support build-secrets in managed-mode (#104)
Browse files Browse the repository at this point in the history
This is done by encoding the secrets, in the host-application, into the
environment of the managed instance. The managed instance then decodes
this environment and uses it to re-render the project secrets.
  • Loading branch information
tigarmo authored Oct 19, 2023
1 parent 4c96df5 commit 8c6c047
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 52 deletions.
37 changes: 30 additions & 7 deletions craft_application/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ def __init__(
self._command_groups: list[craft_cli.CommandGroup] = []
self._global_arguments: list[craft_cli.GlobalArgument] = [GLOBAL_VERSION]

if self.services.ProviderClass.is_managed():
# When build_secrets are enabled, this contains the secret info to pass to
# managed instances.
self._secrets: secrets.BuildSecrets | None = None

if self.is_managed():
self._work_dir = pathlib.Path("/root")
else:
self._work_dir = pathlib.Path.cwd()
Expand All @@ -140,7 +144,7 @@ def command_groups(self) -> list[craft_cli.CommandGroup]:
@property
def log_path(self) -> pathlib.Path | None:
"""Get the path to this process's log file, if any."""
if self.services.ProviderClass.is_managed():
if self.is_managed():
return util.get_managed_logpath(self.app)
return None

Expand Down Expand Up @@ -197,6 +201,10 @@ def project(self) -> models.Project:

return self.app.ProjectClass.from_yaml_data(yaml_data, project_file)

def is_managed(self) -> bool:
"""Shortcut to tell whether we're running in managed mode."""
return self.services.ProviderClass.is_managed()

def run_managed(self, platform: str | None, build_for: str | None) -> None:
"""Run the application in a managed instance."""
extra_args: dict[str, Any] = {}
Expand All @@ -205,7 +213,15 @@ def run_managed(self, platform: str | None, build_for: str | None) -> None:
build_plan = _filter_plan(build_plan, platform, build_for)

for build_info in build_plan:
extra_args["env"] = {"CRAFT_PLATFORM": build_info.platform}
env = {"CRAFT_PLATFORM": build_info.platform}

if self.app.features.build_secrets:
# If using build secrets, put them in the environment of the managed
# instance.
secret_values = cast(secrets.BuildSecrets, self._secrets)
env.update(secret_values.environment)

extra_args["env"] = env

craft_cli.emit.debug(
f"Running {self.app.name}:{build_info.platform} in {build_info.build_for} instance..."
Expand Down Expand Up @@ -321,7 +337,7 @@ def run(self) -> int: # noqa: PLR0912 (too many branches due to error handling)
if command.always_load_project:
self.services.project = self.project
return_code = dispatcher.run() or 0
elif not self.services.ProviderClass.is_managed():
elif not self.is_managed():
# command runs in inner instance, but this is the outer instance
self.services.project = self.project
self.run_managed(platform, build_for)
Expand Down Expand Up @@ -374,7 +390,7 @@ def _emit_error(
error.__cause__ = cause

# Do not report the internal logpath if running inside an instance
if self.services.ProviderClass.is_managed():
if self.is_managed():
error.logpath_report = False

craft_cli.emit.error(error)
Expand Down Expand Up @@ -427,9 +443,16 @@ def _set_global_environment(self, info: craft_parts.ProjectInfo) -> None:

def _render_secrets(self, yaml_data: dict[str, Any]) -> None:
"""Render build-secrets, in-place."""
secret_values = secrets.render_secrets(yaml_data)
secret_values = secrets.render_secrets(
yaml_data, managed_mode=self.is_managed()
)

num_secrets = len(secret_values.secret_strings)
craft_cli.emit.debug(f"Project has {num_secrets} build-secret(s).")

craft_cli.emit.set_secrets(list(secret_values.secret_strings))

craft_cli.emit.set_secrets(list(secret_values))
self._secrets = secret_values

def _extra_yaml_transform(self, yaml_data: dict[str, Any]) -> dict[str, Any]:
"""Perform additional transformations on a project's yaml data.
Expand Down
20 changes: 16 additions & 4 deletions craft_application/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,27 @@ def from_os_error(cls, err: OSError) -> Self:
class SecretsCommandError(CraftError):
"""Error when rendering a build-secret."""

def __init__(self, host_secret: str, error_message: str) -> None:
message = f'Error when processing secret "{host_secret}"'
def __init__(self, host_directive: str, error_message: str) -> None:
message = f'Error when processing secret "{host_directive}"'
details = f"Command output: {error_message}"
super().__init__(message=message, details=details)


class SecretsFieldError(CraftError):
"""Error when using a build-secret in a disallowed field."""

def __init__(self, host_secret: str, field_name: str) -> None:
message = f'Build secret "{host_secret}" is not allowed on field "{field_name}"'
def __init__(self, host_directive: str, field_name: str) -> None:
message = (
f'Build secret "{host_directive}" is not allowed on field "{field_name}"'
)
super().__init__(message=message)


class SecretsManagedError(CraftError):
"""A secret value was not found while in managed mode."""

def __init__(self, host_directive: str) -> None:
message = (
f'Build secret "{host_directive}" was not found in the managed environment.'
)
super().__init__(message=message)
98 changes: 85 additions & 13 deletions craft_application/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,103 @@
"""Handling of build-time secrets."""
from __future__ import annotations

import base64
import dataclasses
import json
import os
import re
import subprocess
from typing import Any
from typing import Any, Mapping, cast

from craft_application import errors

SECRET_REGEX = re.compile(r"\$\(HOST_SECRET:(?P<command>.*)\)")


def render_secrets(yaml_data: dict[str, Any]) -> set[str]:
@dataclasses.dataclass(frozen=True)
class BuildSecrets:
"""The data needed to correctly handle build-time secrets in the application."""

environment: dict[str, str]
"""The encoded information that can be passed to a managed instance's environment."""

secret_strings: set[str]
"""The actual secret text strings, to be passed to craft_cli."""


def render_secrets(yaml_data: dict[str, Any], *, managed_mode: bool) -> BuildSecrets:
"""Render/expand the build secrets in a project's yaml data (in-place).
This function will process directives of the form $(HOST_SECRET:<cmd>) in string
values in ``yaml_data``. For each such directive, the <cmd> part is executed (with
bash) and the resulting output replaces the whole directive. The returned set
contains the outputs of all HOST_SECRET processing (for masking with craft-cli).
bash) and the resulting output replaces the whole directive. The returned object
contains the result of HOST_SECRET processing (for masking with craft-cli and
forwarding to managed instances).
Note that only a few fields are currently supported:
- "source" and "build-environment" for parts.
Using HOST_SECRET directives in any other field is an error.
:param yaml_data: The project's loaded data
:param managed_mode: Whether the current application is running in managed mode.
"""
command_cache: dict[str, str] = {}

if managed_mode:
command_cache = _decode_commands(os.environ)

# Process the fields where we allow build secrets
parts = yaml_data.get("parts", {})
for part in parts.values():
_render_part_secrets(part, command_cache)
_render_part_secrets(part, command_cache, managed_mode)

# Now loop over all the data to check for build secrets in disallowed fields
_check_for_secrets(yaml_data)

return set(command_cache.values())
return BuildSecrets(
environment=_encode_commands(command_cache),
secret_strings=set(command_cache.values()),
)


def _render_part_secrets(
part_data: dict[str, Any], command_cache: dict[str, Any]
part_data: dict[str, Any],
command_cache: dict[str, Any],
managed_mode: bool, # noqa: FBT001 (boolean positional arg)
) -> None:
# Render "source"
source = part_data.get("source", "")
if (rendered := _render_secret(source, command_cache)) is not None:
if (rendered := _render_secret(source, command_cache, managed_mode)) is not None:
part_data["source"] = rendered

# Render "build-environment"
build_env = part_data.get("build-environment", [])
# "build-environment" is a list of dicts with a single item each
for single_entry_dict in build_env:
for var_name, var_value in single_entry_dict.items():
if (rendered := _render_secret(var_value, command_cache)) is not None:
rendered = _render_secret(var_value, command_cache, managed_mode)
if rendered is not None:
single_entry_dict[var_name] = rendered


def _render_secret(text: str, command_cache: dict[str, str]) -> str | None:
if match := SECRET_REGEX.search(text):
def _render_secret(
yaml_string: str,
command_cache: dict[str, str],
managed_mode: bool, # noqa: FBT001 (boolean positional arg)
) -> str | None:
if match := SECRET_REGEX.search(yaml_string):
command = match.group("command")
host_directive = match.group(0)

if command in command_cache:
output = command_cache[command]
else:
if managed_mode:
# In managed mode the command *must* be in the cache; this is an error.
raise errors.SecretsManagedError(host_directive)

try:
output = _run_command(command)
except subprocess.CalledProcessError as err:
Expand All @@ -85,7 +121,7 @@ def _render_secret(text: str, command_cache: dict[str, str]) -> str | None:
) from err
command_cache[command] = output

return text.replace(host_directive, output)
return yaml_string.replace(host_directive, output)
return None


Expand Down Expand Up @@ -115,4 +151,40 @@ def _check_str(
value: Any, field_name: str # noqa: ANN401 (using Any on purpose)
) -> None:
if isinstance(value, str) and (match := SECRET_REGEX.search(value)):
raise errors.SecretsFieldError(host_secret=match.group(), field_name=field_name)
raise errors.SecretsFieldError(
host_directive=match.group(), field_name=field_name
)


def _encode_commands(commands: dict[str, str]) -> dict[str, str]:
"""Encode a dict of (command, command-output) pairs for env serialization.
The resulting dict can be passed to the environment of managed instances (via
subprocess.run() or equivalents).
"""
if not commands:
# Empty dict: don't spend time encoding anything.
return {}

# The current encoding scheme is to dump the entire dict to base64-encoded json
# string, then put this resulting string in a dict under the "CRAFT_SECRETS" key.
as_json = json.dumps(commands)
as_bytes = as_json.encode("utf-8")
as_b64 = base64.b64encode(as_bytes)
as_str = as_b64.decode("ascii")

return {"CRAFT_SECRETS": as_str}


def _decode_commands(environment: Mapping[str, Any]) -> dict[str, str]:
as_str = environment.get("CRAFT_SECRETS")

if as_str is None:
# Not necessarily an error: it means the project has no secrets.
return {}

as_b64 = as_str.encode("ascii")
as_bytes = base64.b64decode(as_b64)
as_json = as_bytes.decode("utf-8")

return cast("dict[str, str]", json.loads(as_json))
85 changes: 68 additions & 17 deletions tests/integration/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import craft_cli
import pytest
import pytest_check
from craft_application import secrets
from craft_application.util import yaml
from typing_extensions import override

Expand Down Expand Up @@ -265,31 +266,81 @@ def test_global_environment(
assert variables["project_version"] == "1.2.3"


@pytest.mark.enable_features("build_secrets")
def test_build_secrets_destructive(create_app, monkeypatch, tmp_path, capsys):
@pytest.fixture()
def setup_secrets_project(create_app, monkeypatch, tmp_path):
"""Test the use of build secrets in destructive mode."""
monkeypatch.setenv("CRAFT_DEBUG", "1")
monkeypatch.chdir(tmp_path)
shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True)

# Run in destructive mode
monkeypatch.setattr("sys.argv", ["testcraft", "prime", "-v", "--destructive-mode"])
def _inner(*, destructive_mode: bool):
monkeypatch.setenv("CRAFT_DEBUG", "1")
monkeypatch.chdir(tmp_path)
shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True)

app = create_app()
argv = ["testcraft", "prime", "-v"]
if destructive_mode:
# Run in destructive mode
argv.append("--destructive-mode")

monkeypatch.setattr("sys.argv", argv)

return create_app()

return _inner


@pytest.fixture()
def check_secrets_output(tmp_path, capsys):
def _inner():
prime_dir = tmp_path / "prime"
assert (prime_dir / "source-file.txt").read_text().strip() == "A source file"
assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret"

# Check that the "secrets" were masked in console output and the logfile
_, stderr = capsys.readouterr()
log_contents = craft_cli.emit._log_filepath.read_text()

for target in (stderr, log_contents):
assert "Dumping SECRET_VAR: my-secret" not in target
assert "Dumping SECRET_VAR: *****" in target

return _inner


@pytest.mark.enable_features("build_secrets")
def test_build_secrets_destructive(
monkeypatch, setup_secrets_project, check_secrets_output
):
"""Test the use of build secrets in destructive mode."""
app = setup_secrets_project(destructive_mode=True)

# Set the environment variables that the project needs
monkeypatch.setenv("HOST_SOURCE_FOLDER", "secret-source")
monkeypatch.setenv("HOST_SECRET_VAR", "my-secret")

app.run()

prime_dir = tmp_path / "prime"
assert (prime_dir / "source-file.txt").read_text().strip() == "A source file"
assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret"
check_secrets_output()


# Check that the "secrets" were masked in console output and the logfile
_, stderr = capsys.readouterr()
log_contents = craft_cli.emit._log_filepath.read_text()
@pytest.mark.enable_features("build_secrets")
def test_build_secrets_managed(
monkeypatch, tmp_path, setup_secrets_project, check_secrets_output
):
"""Test the use of build secrets in managed mode."""
app = setup_secrets_project(destructive_mode=False)

monkeypatch.setenv("CRAFT_MANAGED_MODE", "1")
assert app.is_managed
app._work_dir = tmp_path

# Before running the application, configure its environment "as if" the host app
# had placed the secrets there.
commands = {
"echo ${HOST_SOURCE_FOLDER}": "secret-source",
"echo ${HOST_SECRET_VAR}": "my-secret",
}
encoded = secrets._encode_commands(commands)
monkeypatch.setenv("CRAFT_SECRETS", encoded["CRAFT_SECRETS"])

app.run()

for target in (stderr, log_contents):
assert "Dumping SECRET_VAR: my-secret" not in target
assert "Dumping SECRET_VAR: *****" in target
check_secrets_output()
Loading

0 comments on commit 8c6c047

Please sign in to comment.