Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support build-secrets in managed-mode #104

Merged
merged 3 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions craft_application/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ def __init__(
self._command_groups: list[craft_cli.CommandGroup] = []
self._global_arguments: list[craft_cli.GlobalArgument] = [GLOBAL_VERSION]

if self.services.ProviderClass.is_managed():
# When build_secrets are enabled, this contains the secret info to pass to
# managed instances.
self._secrets: secrets.BuildSecrets | None = None

if self.is_managed():
self._work_dir = pathlib.Path("/root")
else:
self._work_dir = pathlib.Path.cwd()
Expand All @@ -140,7 +144,7 @@ def command_groups(self) -> list[craft_cli.CommandGroup]:
@property
def log_path(self) -> pathlib.Path | None:
"""Get the path to this process's log file, if any."""
if self.services.ProviderClass.is_managed():
if self.is_managed():
return util.get_managed_logpath(self.app)
return None

Expand Down Expand Up @@ -197,6 +201,10 @@ def project(self) -> models.Project:

return self.app.ProjectClass.from_yaml_data(yaml_data, project_file)

def is_managed(self) -> bool:
"""Shortcut to tell whether we're running in managed mode."""
return self.services.ProviderClass.is_managed()

def run_managed(self, platform: str | None, build_for: str | None) -> None:
"""Run the application in a managed instance."""
extra_args: dict[str, Any] = {}
Expand All @@ -205,7 +213,15 @@ def run_managed(self, platform: str | None, build_for: str | None) -> None:
build_plan = _filter_plan(build_plan, platform, build_for)

for build_info in build_plan:
extra_args["env"] = {"CRAFT_PLATFORM": build_info.platform}
env = {"CRAFT_PLATFORM": build_info.platform}

if self.app.features.build_secrets:
# If using build secrets, put them in the environment of the managed
# instance.
secret_values = cast(secrets.BuildSecrets, self._secrets)
cmatsuoka marked this conversation as resolved.
Show resolved Hide resolved
env.update(secret_values.environment)

extra_args["env"] = env

craft_cli.emit.debug(
f"Running {self.app.name}:{build_info.platform} in {build_info.build_for} instance..."
Expand Down Expand Up @@ -321,7 +337,7 @@ def run(self) -> int: # noqa: PLR0912 (too many branches due to error handling)
if command.always_load_project:
self.services.project = self.project
return_code = dispatcher.run() or 0
elif not self.services.ProviderClass.is_managed():
elif not self.is_managed():
# command runs in inner instance, but this is the outer instance
self.services.project = self.project
self.run_managed(platform, build_for)
Expand Down Expand Up @@ -374,7 +390,7 @@ def _emit_error(
error.__cause__ = cause

# Do not report the internal logpath if running inside an instance
if self.services.ProviderClass.is_managed():
if self.is_managed():
error.logpath_report = False

craft_cli.emit.error(error)
Expand Down Expand Up @@ -427,9 +443,16 @@ def _set_global_environment(self, info: craft_parts.ProjectInfo) -> None:

def _render_secrets(self, yaml_data: dict[str, Any]) -> None:
"""Render build-secrets, in-place."""
secret_values = secrets.render_secrets(yaml_data)
secret_values = secrets.render_secrets(
yaml_data, managed_mode=self.is_managed()
)

num_secrets = len(secret_values.secret_strings)
craft_cli.emit.debug(f"Project has {num_secrets} build-secret(s).")

craft_cli.emit.set_secrets(list(secret_values.secret_strings))

craft_cli.emit.set_secrets(list(secret_values))
self._secrets = secret_values

def _extra_yaml_transform(self, yaml_data: dict[str, Any]) -> dict[str, Any]:
"""Perform additional transformations on a project's yaml data.
Expand Down
20 changes: 16 additions & 4 deletions craft_application/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,27 @@ def from_os_error(cls, err: OSError) -> Self:
class SecretsCommandError(CraftError):
"""Error when rendering a build-secret."""

def __init__(self, host_secret: str, error_message: str) -> None:
message = f'Error when processing secret "{host_secret}"'
def __init__(self, host_directive: str, error_message: str) -> None:
message = f'Error when processing secret "{host_directive}"'
details = f"Command output: {error_message}"
super().__init__(message=message, details=details)


class SecretsFieldError(CraftError):
"""Error when using a build-secret in a disallowed field."""

def __init__(self, host_secret: str, field_name: str) -> None:
message = f'Build secret "{host_secret}" is not allowed on field "{field_name}"'
def __init__(self, host_directive: str, field_name: str) -> None:
message = (
f'Build secret "{host_directive}" is not allowed on field "{field_name}"'
)
super().__init__(message=message)


class SecretsManagedError(CraftError):
"""A secret value was not found while in managed mode."""

def __init__(self, host_directive: str) -> None:
message = (
f'Build secret "{host_directive}" was not found in the managed environment.'
)
super().__init__(message=message)
94 changes: 83 additions & 11 deletions craft_application/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,103 @@
"""Handling of build-time secrets."""
from __future__ import annotations

import base64
import dataclasses
import json
import os
import re
import subprocess
from typing import Any
from typing import Any, Mapping, cast

from craft_application import errors

SECRET_REGEX = re.compile(r"\$\(HOST_SECRET:(?P<command>.*)\)")


def render_secrets(yaml_data: dict[str, Any]) -> set[str]:
@dataclasses.dataclass(frozen=True)
class BuildSecrets:
"""The data needed to correctly handle build-time secrets in the application."""

environment: dict[str, str]
"""The encoded information that can be passed to a managed instance's environment."""

secret_strings: set[str]
"""The actual secret text strings, to be passed to craft_cli."""


def render_secrets(yaml_data: dict[str, Any], *, managed_mode: bool) -> BuildSecrets:
"""Render/expand the build secrets in a project's yaml data (in-place).

This function will process directives of the form $(HOST_SECRET:<cmd>) in string
values in ``yaml_data``. For each such directive, the <cmd> part is executed (with
bash) and the resulting output replaces the whole directive. The returned set
contains the outputs of all HOST_SECRET processing (for masking with craft-cli).
bash) and the resulting output replaces the whole directive. The returned object
contains the result of HOST_SECRET processing (for masking with craft-cli and
forwarding to managed instances).

Note that only a few fields are currently supported:

- "source" and "build-environment" for parts.

Using HOST_SECRET directives in any other field is an error.

:param yaml_data: The project's loaded data
:param managed_mode: Whether the current application is running in managed mode.
"""
command_cache: dict[str, str] = {}

if managed_mode:
command_cache = _decode_commands(os.environ)

# Process the fields where we allow build secrets
parts = yaml_data.get("parts", {})
for part in parts.values():
_render_part_secrets(part, command_cache)
_render_part_secrets(part, command_cache, managed_mode)

# Now loop over all the data to check for build secrets in disallowed fields
_check_for_secrets(yaml_data)

return set(command_cache.values())
return BuildSecrets(
environment=_encode_commands(command_cache),
secret_strings=set(command_cache.values()),
)


def _render_part_secrets(
part_data: dict[str, Any], command_cache: dict[str, Any]
part_data: dict[str, Any],
command_cache: dict[str, Any],
managed_mode: bool, # noqa: FBT001 (boolean positional arg)
) -> None:
# Render "source"
source = part_data.get("source", "")
if (rendered := _render_secret(source, command_cache)) is not None:
if (rendered := _render_secret(source, command_cache, managed_mode)) is not None:
part_data["source"] = rendered

# Render "build-environment"
build_env = part_data.get("build-environment", [])
# "build-environment" is a list of dicts with a single item each
for single_entry_dict in build_env:
for var_name, var_value in single_entry_dict.items():
if (rendered := _render_secret(var_value, command_cache)) is not None:
rendered = _render_secret(var_value, command_cache, managed_mode)
if rendered is not None:
single_entry_dict[var_name] = rendered


def _render_secret(text: str, command_cache: dict[str, str]) -> str | None:
def _render_secret(
text: str,
tigarmo marked this conversation as resolved.
Show resolved Hide resolved
command_cache: dict[str, str],
managed_mode: bool, # noqa: FBT001 (boolean positional arg)
tigarmo marked this conversation as resolved.
Show resolved Hide resolved
) -> str | None:
if match := SECRET_REGEX.search(text):
command = match.group("command")
host_directive = match.group(0)

if command in command_cache:
output = command_cache[command]
else:
if managed_mode:
# In managed mode the command *must* be in the cache; this is an error.
raise errors.SecretsManagedError(host_directive)

try:
output = _run_command(command)
except subprocess.CalledProcessError as err:
Expand Down Expand Up @@ -115,4 +151,40 @@ def _check_str(
value: Any, field_name: str # noqa: ANN401 (using Any on purpose)
) -> None:
if isinstance(value, str) and (match := SECRET_REGEX.search(value)):
raise errors.SecretsFieldError(host_secret=match.group(), field_name=field_name)
raise errors.SecretsFieldError(
host_directive=match.group(), field_name=field_name
)


def _encode_commands(commands: dict[str, str]) -> dict[str, str]:
"""Encode a dict of (command, command-output) pairs for env serialization.

The resulting dict can be passed to the environment of managed instances (via
subprocess.run() or equivalents).
"""
if not commands:
# Empty dict: don't spend time encoding anything.
return {}

# The current encoding scheme is to dump the entire dict to base64-encoded json
# string, then put this resulting string in a dict under the "CRAFT_SECRETS" key.
as_json = json.dumps(commands)
as_bytes = as_json.encode("utf-8")
as_b64 = base64.b64encode(as_bytes)
as_str = as_b64.decode("ascii")

return {"CRAFT_SECRETS": as_str}


def _decode_commands(environment: Mapping[str, Any]) -> dict[str, str]:
as_str = environment.get("CRAFT_SECRETS")

if as_str is None:
# Not necessarily an error: it means the project has no secrets.
return {}

as_b64 = as_str.encode("ascii")
as_bytes = base64.b64decode(as_b64)
as_json = as_bytes.decode("utf-8")

return cast("dict[str, str]", json.loads(as_json))
85 changes: 68 additions & 17 deletions tests/integration/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import craft_cli
import pytest
import pytest_check
from craft_application import secrets
from craft_application.util import yaml
from typing_extensions import override

Expand Down Expand Up @@ -265,31 +266,81 @@ def test_global_environment(
assert variables["project_version"] == "1.2.3"


@pytest.mark.enable_features("build_secrets")
def test_build_secrets_destructive(create_app, monkeypatch, tmp_path, capsys):
@pytest.fixture()
def setup_secrets_project(create_app, monkeypatch, tmp_path):
"""Test the use of build secrets in destructive mode."""
monkeypatch.setenv("CRAFT_DEBUG", "1")
monkeypatch.chdir(tmp_path)
shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True)

# Run in destructive mode
monkeypatch.setattr("sys.argv", ["testcraft", "prime", "-v", "--destructive-mode"])
def _inner(*, destructive_mode: bool):
monkeypatch.setenv("CRAFT_DEBUG", "1")
monkeypatch.chdir(tmp_path)
shutil.copytree(TEST_DATA_DIR / "build-secrets", tmp_path, dirs_exist_ok=True)

app = create_app()
argv = ["testcraft", "prime", "-v"]
if destructive_mode:
# Run in destructive mode
argv.append("--destructive-mode")

monkeypatch.setattr("sys.argv", argv)

return create_app()

return _inner


@pytest.fixture()
def check_secrets_output(tmp_path, capsys):
def _inner():
prime_dir = tmp_path / "prime"
assert (prime_dir / "source-file.txt").read_text().strip() == "A source file"
assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret"

# Check that the "secrets" were masked in console output and the logfile
_, stderr = capsys.readouterr()
log_contents = craft_cli.emit._log_filepath.read_text()

for target in (stderr, log_contents):
assert "Dumping SECRET_VAR: my-secret" not in target
assert "Dumping SECRET_VAR: *****" in target

return _inner


@pytest.mark.enable_features("build_secrets")
def test_build_secrets_destructive(
monkeypatch, setup_secrets_project, check_secrets_output
):
"""Test the use of build secrets in destructive mode."""
app = setup_secrets_project(destructive_mode=True)

# Set the environment variables that the project needs
monkeypatch.setenv("HOST_SOURCE_FOLDER", "secret-source")
monkeypatch.setenv("HOST_SECRET_VAR", "my-secret")

app.run()

prime_dir = tmp_path / "prime"
assert (prime_dir / "source-file.txt").read_text().strip() == "A source file"
assert (prime_dir / "build-file.txt").read_text().strip() == "my-secret"
check_secrets_output()


# Check that the "secrets" were masked in console output and the logfile
_, stderr = capsys.readouterr()
log_contents = craft_cli.emit._log_filepath.read_text()
@pytest.mark.enable_features("build_secrets")
def test_build_secrets_managed(
monkeypatch, tmp_path, setup_secrets_project, check_secrets_output
):
"""Test the use of build secrets in managed mode."""
app = setup_secrets_project(destructive_mode=False)

monkeypatch.setenv("CRAFT_MANAGED_MODE", "1")
assert app.is_managed
app._work_dir = tmp_path

# Before running the application, configure its environment "as if" the host app
# had placed the secrets there.
commands = {
"echo ${HOST_SOURCE_FOLDER}": "secret-source",
"echo ${HOST_SECRET_VAR}": "my-secret",
}
encoded = secrets._encode_commands(commands)
monkeypatch.setenv("CRAFT_SECRETS", encoded["CRAFT_SECRETS"])

app.run()

for target in (stderr, log_contents):
assert "Dumping SECRET_VAR: my-secret" not in target
assert "Dumping SECRET_VAR: *****" in target
check_secrets_output()
Loading