From c6e9b82aaec95874cb6dc325879e5d10033097c2 Mon Sep 17 00:00:00 2001 From: Nikita Melkozerov Date: Tue, 8 Oct 2024 16:16:31 +0200 Subject: [PATCH] Access Edges (#2195) --- .devcontainer/Dockerfile | 6 +- .devcontainer/docker-compose.yml | 2 +- .vscode/settings.json | 15 +- fixlib/fixlib/baseresources.py | 80 +- fixlib/fixlib/graph/__init__.py | 13 +- fixlib/tox.ini | 4 +- plugins/aws/fix_plugin_aws/access_edges.py | 808 +++++++++++++++ plugins/aws/fix_plugin_aws/collector.py | 10 +- plugins/aws/fix_plugin_aws/configuration.py | 4 + plugins/aws/fix_plugin_aws/resource/backup.py | 12 +- plugins/aws/fix_plugin_aws/resource/base.py | 12 +- .../aws/fix_plugin_aws/resource/dynamodb.py | 19 +- plugins/aws/fix_plugin_aws/resource/ecr.py | 10 +- plugins/aws/fix_plugin_aws/resource/efs.py | 12 +- plugins/aws/fix_plugin_aws/resource/iam.py | 3 + .../aws/fix_plugin_aws/resource/kinesis.py | 4 +- plugins/aws/fix_plugin_aws/resource/kms.py | 11 +- .../aws/fix_plugin_aws/resource/lambda_.py | 91 +- plugins/aws/fix_plugin_aws/resource/s3.py | 19 +- .../fix_plugin_aws/resource/secretsmanager.py | 40 +- plugins/aws/fix_plugin_aws/resource/sns.py | 12 +- plugins/aws/fix_plugin_aws/resource/sqs.py | 19 +- plugins/aws/pyproject.toml | 1 + plugins/aws/test/__init__.py | 4 +- plugins/aws/test/acccess_edges_test.py | 919 ++++++++++++++++++ plugins/aws/test/collector_test.py | 2 +- .../get-resource-policy__foo.json | 5 + plugins/aws/test/resources/lambda_test.py | 24 +- plugins/aws/tox.ini | 2 +- 29 files changed, 2011 insertions(+), 152 deletions(-) create mode 100644 plugins/aws/fix_plugin_aws/access_edges.py create mode 100644 plugins/aws/test/acccess_edges_test.py create mode 100644 plugins/aws/test/resources/files/secretsmanager/get-resource-policy__foo.json diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 37054d10a8..79b4b97cf6 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,4 +1,4 @@ -FROM mcr.microsoft.com/devcontainers/python:3.11 +FROM mcr.microsoft.com/devcontainers/python:3.12 ARG USERNAME=vscode @@ -15,8 +15,8 @@ RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ && apt-get -y install --no-install-recommends vim wget -RUN wget -qO- https://download.arangodb.com/arangodb310/DEBIAN/Release.key | apt-key add - -RUN echo 'deb https://download.arangodb.com/arangodb310/DEBIAN/ /' | tee /etc/apt/sources.list.d/arangodb.list +RUN wget -qO- https://download.arangodb.com/arangodb311/DEBIAN/Release.key | apt-key add - +RUN echo 'deb https://download.arangodb.com/arangodb311/DEBIAN/ /' | tee /etc/apt/sources.list.d/arangodb.list RUN apt-get install -y apt-transport-https RUN apt-get update RUN apt-get install -y arangodb3-client diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 7609711bcd..e66a97203f 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -23,7 +23,7 @@ services: # (Adding the "ports" property to this file will not forward from the container.) arangodb: - image: arangodb:3.10.3 + image: arangodb:3.11.11 restart: unless-stopped # Uncomment the lines below in case you want to keep arangodb data in a separate volume # volumes: diff --git a/.vscode/settings.json b/.vscode/settings.json index a86820abbf..5e11a76027 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,8 +10,17 @@ "fixworker/test" ], "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": false, - "python.formatting.provider": "black", + "python.testing.pytestEnabled": true, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter", + "editor.formatOnSave": true + }, + "black-formatter.args": [ + "--line-length", + "120", + "--target-version", + "py39" + ], "search.exclude": { "**/*.code-search": true, "**/bower_components": true, @@ -22,4 +31,4 @@ "python.analysis.extraPaths": [ "fixlib" ] -} +} \ No newline at end of file diff --git a/fixlib/fixlib/baseresources.py b/fixlib/fixlib/baseresources.py index f45e4a8e03..96820715c1 100644 --- a/fixlib/fixlib/baseresources.py +++ b/fixlib/fixlib/baseresources.py @@ -6,16 +6,16 @@ from abc import ABC from copy import deepcopy from datetime import datetime, timezone, timedelta -from enum import Enum, unique +from enum import Enum, StrEnum, unique from functools import wraps, cached_property from typing import Dict, Iterator, List, ClassVar, Optional, TypedDict, Any, TypeVar, Type, Callable, Set, Tuple from collections import defaultdict from attr import resolve_types -from attrs import define, field, Factory +from attrs import define, field, Factory, frozen, evolve from prometheus_client import Counter, Summary -from fixlib.json import from_json as _from_json, to_json as _to_json +from fixlib.json import from_json as _from_json, to_json as _to_json, to_json_str from fixlib.logger import log from fixlib.types import Json from fixlib.utils import make_valid_timestamp, utc_str, utc @@ -68,6 +68,7 @@ class ModelReference(TypedDict, total=False): class EdgeType(Enum): default = "default" delete = "delete" + iam = "iam" @staticmethod def from_value(value: Optional[str] = None) -> EdgeType: @@ -1613,4 +1614,77 @@ def delete(self, graph: Any) -> bool: return False +class PolicySourceKind(StrEnum): + principal = "principal" # e.g. IAM user, attached policy + group = "group" # policy comes from an IAM group + resource = "resource" # e.g. s3 bucket policy + + +ResourceConstraint = str + +ConditionString = str + + +@frozen +class PolicySource: + kind: PolicySourceKind + uri: str + + +class HasResourcePolicy(ABC): + # returns a list of all policies that affects the resource (inline, attached, etc.) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Json]]: + raise NotImplementedError + + +@frozen +class PermissionCondition: + # if nonempty and any evals to true, access is granted, otherwise implicitly denied + allow: Optional[Tuple[ConditionString, ...]] = None + # if nonempty and any is evals to false, access is implicitly denied + boundary: Optional[Tuple[ConditionString, ...]] = None + # if nonempty and any evals to true, access is explicitly denied + deny: Optional[Tuple[ConditionString, ...]] = None + + +@frozen +class PermissionScope: + source: PolicySource + constraints: Tuple[ResourceConstraint, ...] # aka resource constraints + conditions: Optional[PermissionCondition] = None + + def with_deny_conditions(self, deny_conditions: List[Json]) -> "PermissionScope": + c = self.conditions or PermissionCondition() + return evolve(self, conditions=evolve(c, deny=tuple([to_json_str(c) for c in deny_conditions]))) + + def with_boundary_conditions(self, boundary_conditions: List[Json]) -> "PermissionScope": + c = self.conditions or PermissionCondition() + return evolve(self, conditions=evolve(c, boundary=tuple([to_json_str(c) for c in boundary_conditions]))) + + def has_no_condititons(self) -> bool: + if self.conditions is None: + return True + + if self.conditions.allow is None and self.conditions.boundary is None and self.conditions.deny is None: + return True + + return False + + +class PermissionLevel(StrEnum): + list = "list" + read = "read" + tagging = "tagging" + write = "write" + permission_management = "permission" + unknown = "unknown" # in case a resource is not in the levels database + + +@frozen +class AccessPermission: + action: str + level: PermissionLevel + scopes: Tuple[PermissionScope, ...] + + resolve_types(BaseResource) # noqa diff --git a/fixlib/fixlib/graph/__init__.py b/fixlib/fixlib/graph/__init__.py index e18dfea910..80b5420eea 100644 --- a/fixlib/fixlib/graph/__init__.py +++ b/fixlib/fixlib/graph/__init__.py @@ -122,7 +122,7 @@ def merge(self, graph: Graph, skip_deferred_edges: bool = False) -> None: try: self._log_edge_creation = False - self.update(edges=graph.edges, nodes=graph.nodes) + self.update(edges=graph.edges(keys=True, data=True), nodes=graph.nodes) self.deferred_edges.extend(graph.deferred_edges) finally: self._log_edge_creation = True @@ -647,17 +647,18 @@ def export_graph(self) -> None: if not self.found_replace_node: log.warning(f"No nodes of kind {self.graph_merge_kind.kind} found in graph") start_time = time() - for edge in self.graph.edges: - from_node = edge[0] - to_node = edge[1] + for from_node, to_node, key, data in self.graph.edges(keys=True, data=True): if not isinstance(from_node, BaseResource) or not isinstance(to_node, BaseResource): log.error(f"One of {from_node} and {to_node} is no base resource") continue edge_dict = {"from": from_node.chksum, "to": to_node.chksum} - if len(edge) == 3: - key = edge[2] + if key: if isinstance(key, EdgeKey) and key.edge_type != EdgeType.default: edge_dict["edge_type"] = key.edge_type.value + + if reported := data.get("reported"): + edge_dict["reported"] = reported + edge_json = json.dumps(edge_dict) + "\n" self.tempfile.write(edge_json.encode()) self.total_lines += 1 diff --git a/fixlib/tox.ini b/fixlib/tox.ini index 15c37aa06a..008bc3c7b1 100644 --- a/fixlib/tox.ini +++ b/fixlib/tox.ini @@ -25,7 +25,7 @@ commands = flake8 --verbose commands= pytest [testenv:black] -commands = black --line-length 120 --check --diff --target-version py39 . +commands = black --line-length 120 --check --diff --target-version py311 . [testenv:mypy] -commands= mypy --install-types --non-interactive --python-version 3.9 --strict fixlib +commands= mypy --install-types --non-interactive --python-version 3.11 --strict fixlib diff --git a/plugins/aws/fix_plugin_aws/access_edges.py b/plugins/aws/fix_plugin_aws/access_edges.py new file mode 100644 index 0000000000..0a11a8d6d8 --- /dev/null +++ b/plugins/aws/fix_plugin_aws/access_edges.py @@ -0,0 +1,808 @@ +from functools import lru_cache +from attr import frozen, define +from fix_plugin_aws.resource.base import AwsResource, GraphBuilder + +from typing import List, Literal, Set, Optional, Tuple, Union, Pattern + +from fixlib.baseresources import ( + PermissionCondition, + PolicySource, + PermissionScope, + AccessPermission, + ResourceConstraint, +) +from fix_plugin_aws.resource.iam import AwsIamGroup, AwsIamPolicy, AwsIamUser, AwsIamRole +from fixlib.baseresources import EdgeType, PolicySourceKind, HasResourcePolicy, PermissionLevel +from fixlib.json import to_json, to_json_str +from fixlib.types import Json + +from cloudsplaining.scan.policy_document import PolicyDocument +from cloudsplaining.scan.statement_detail import StatementDetail +from policy_sentry.querying.actions import get_action_data, get_actions_matching_arn +from policy_sentry.querying.all import get_all_actions +from policy_sentry.util.arns import ARN, get_service_from_arn +import re +import logging + +log = logging.getLogger("fix.plugins.aws") + + +ALL_ACTIONS = get_all_actions() + + +@define(slots=True) +class IamRequestContext: + principal: AwsResource + identity_policies: List[Tuple[PolicySource, PolicyDocument]] + permission_boundaries: List[PolicyDocument] # todo: use them too + # all service control policies applicable to the principal, + # starting from the root, then all org units, then the account + service_control_policy_levels: List[List[PolicyDocument]] + # technically we should also add a list of session policies here, but they don't exist in the collector context + + def all_policies( + self, resource_based_policies: Optional[List[Tuple[PolicySource, PolicyDocument]]] = None + ) -> List[PolicyDocument]: + return ( + [p[1] for p in self.identity_policies] + + self.permission_boundaries + + [p for group in self.service_control_policy_levels for p in group] + + ([p[1] for p in (resource_based_policies or [])]) + ) + + +IamAction = str + + +def find_allowed_action(policy_document: PolicyDocument, service_prefix: str) -> Set[IamAction]: + allowed_actions: Set[IamAction] = set() + for statement in policy_document.statements: + if statement.effect_allow: + allowed_actions.update(get_expanded_action(statement, service_prefix)) + + return allowed_actions + + +def find_all_allowed_actions(all_involved_policies: List[PolicyDocument], resource_arn: str) -> Set[IamAction]: + resource_actions = set() + try: + resource_actions = set(get_actions_matching_arn(resource_arn)) + except Exception as e: + log.debug(f"Error when trying to get actions matching ARN {resource_arn}: {e}") + + service_prefix = "" + try: + service_prefix = get_service_from_arn(resource_arn) + except Exception as e: + log.debug(f"Error when trying to get service prefix from ARN {resource_arn}: {e}") + policy_actions: Set[IamAction] = set() + for p in all_involved_policies: + policy_actions.update(find_allowed_action(p, service_prefix)) + return policy_actions.intersection(resource_actions) + + +def get_expanded_action(statement: StatementDetail, service_prefix: str) -> Set[str]: + actions = set() + expanded: List[str] = statement.expanded_actions or [] + for action in expanded: + if action.startswith(f"{service_prefix}:"): + actions.add(action) + + return actions + + +@lru_cache(maxsize=1024) +def make_resoruce_regex(aws_resorce_wildcard: str) -> Pattern[str]: + # step 1: translate aws wildcard to python regex + python_regex = aws_resorce_wildcard.replace("*", ".*").replace("?", ".") + # step 2: compile the regex + return re.compile(f"^{python_regex}$", re.IGNORECASE) + + +def expand_wildcards_and_match(*, identifier: str, wildcard_string: str) -> bool: + """ + helper function to expand wildcards and match the identifier + + use case: + match the resource constraint (wildcard) with the ARN + match the wildcard action with the specific action + """ + pattern = make_resoruce_regex(wildcard_string) + return pattern.match(identifier) is not None + + +def check_statement_match( + statement: StatementDetail, + effect: Optional[Literal["Allow", "Deny"]], + action: str, + resource: AwsResource, + principal: Optional[AwsResource], + source_arn: Optional[str] = None, +) -> Tuple[bool, List[ResourceConstraint]]: + """ + check if a statement matches the given effect, action, resource and principal, + returns boolean if there is a match and optional resource constraint (if there were any) + """ + if resource.arn is None: + raise ValueError("Resource ARN is missing, go and fix the filtering logic") + + # step 1: check the principal if provided + if principal: + principal_match = False + if policy_principal := statement.json.get("Principal", None): + if policy_principal == "*": + principal_match = True + elif "AWS" in policy_principal: + aws_principal_list = policy_principal["AWS"] + if isinstance(aws_principal_list, str): + aws_principal_list = [aws_principal_list] + if check_principal_match(principal, aws_principal_list): + principal_match = True + else: + # aws service principal is specified, we do not handle such cases yet + pass + elif policy_not_principal := statement.json.get("NotPrincipal", None): + # * is not allowed in NotPrincipal, so we can skip the check + principal_match = True + if "AWS" in policy_not_principal: + aws_principal_list = policy_not_principal["AWS"] + assert isinstance(aws_principal_list, list) + if check_principal_match(principal, aws_principal_list): + principal_match = False + else: + # aws service principal is specified, we do not handle such cases yet + pass + else: + principal_match = True + + if not principal_match: + # principal does not match, we can shortcut here + return False, [] + + # step 2: check if the effect matches + if effect: + if statement.effect != effect: + # wrong effect, skip this statement + return False, [] + + # step 3: check if the action matches + action_match = False + if statement.actions: + # shortcuts for known AWS managed policies + if source_arn == "arn:aws:iam::aws:policy/ReadOnlyAccess": + action_level = get_action_level(action) + if action_level in [PermissionLevel.read or PermissionLevel.list]: + action_match = True + else: + action_match = False + else: + for a in statement.actions: + if expand_wildcards_and_match(identifier=action, wildcard_string=a): + action_match = True + break + else: + # not_action + action_match = True + for na in statement.not_action: + if expand_wildcards_and_match(identifier=action, wildcard_string=na): + action_match = False + break + if not action_match: + # action does not match, skip this statement + return False, [] + + # step 4: check if the resource matches + matched_resource_constraints: List[ResourceConstraint] = [] + resource_matches = False + if len(statement.resources) > 0: + for resource_constraint in statement.resources: + if expand_wildcards_and_match(identifier=resource.arn, wildcard_string=resource_constraint): + matched_resource_constraints.append(resource_constraint) + resource_matches = True + break + elif len(statement.not_resource) > 0: + resource_matches = True + for not_resource_constraint in statement.not_resource: + if expand_wildcards_and_match(identifier=resource.arn, wildcard_string=not_resource_constraint): + resource_matches = False + break + matched_resource_constraints.append("not " + not_resource_constraint) + else: + # no Resource/NotResource specified, consider allowed + resource_matches = True + if not resource_matches: + # resource does not match, skip this statement + return False, [] + + # step 5: (we're not doing this yet) check if the condition matches + # here we just return the statement and condition checking is the responsibility of the caller + return (True, matched_resource_constraints) + + +def check_principal_match(principal: AwsResource, aws_principal_list: List[str]) -> bool: + assert principal.arn + for aws_principal in aws_principal_list: + if aws_principal == "*": + return True + + if principal.arn == aws_principal: + return True + + if principal.id == aws_principal: + return True + + principal_arn = ARN(principal.arn) + if principal_arn.account == aws_principal: + return True + + return False + + +def collect_matching_statements( + *, + policy: PolicyDocument, + effect: Optional[Literal["Allow", "Deny"]], + action: str, + resource: AwsResource, + principal: Optional[AwsResource], + source_arn: Optional[str] = None, +) -> List[Tuple[StatementDetail, List[ResourceConstraint]]]: + """ + resoruce based policies contain principal field and need to be handled differently + """ + results: List[Tuple[StatementDetail, List[ResourceConstraint]]] = [] + + if resource.arn is None: + raise ValueError("Resource ARN is missing, go and fix the filtering logic") + + for statement in policy.statements: + + matches, maybe_resource_constraint = check_statement_match( + statement, effect=effect, action=action, resource=resource, principal=principal, source_arn=source_arn + ) + if matches: + results.append((statement, maybe_resource_constraint)) + + return results + + +def check_explicit_deny( + request_context: IamRequestContext, + resource: AwsResource, + action: str, + resource_based_policies: List[Tuple[PolicySource, PolicyDocument]], +) -> Union[Literal["Denied", "NextStep"], List[Json]]: + + denied_when_any_is_true: List[Json] = [] + + # we should skip service control policies for service linked roles + if not is_service_linked_role(request_context.principal): + for scp_level in request_context.service_control_policy_levels: + for policy in scp_level: + policy_statements = collect_matching_statements( + policy=policy, effect="Deny", action=action, resource=resource, principal=request_context.principal + ) + for statement, _ in policy_statements: + if statement.condition: + denied_when_any_is_true.append(statement.condition) + else: + return "Denied" + + # check permission boundaries + for policy in request_context.permission_boundaries: + policy_statements = collect_matching_statements( + policy=policy, effect="Deny", action=action, resource=resource, principal=request_context.principal + ) + for statement, _ in policy_statements: + if statement.condition: + denied_when_any_is_true.append(statement.condition) + else: + return "Denied" + + # check the rest of the policies + for _, policy in request_context.identity_policies + resource_based_policies: + policy_statements = collect_matching_statements( + policy=policy, effect="Deny", action=action, resource=resource, principal=request_context.principal + ) + for statement, _ in policy_statements: + if statement.condition: + denied_when_any_is_true.append(statement.condition) + else: + return "Denied" + + if denied_when_any_is_true: + return denied_when_any_is_true + + return "NextStep" + + +def scp_allowed(request_context: IamRequestContext, action: str, resource: AwsResource) -> bool: + + # traverse the SCPs: root -> OU -> account levels + for scp_level_policies in request_context.service_control_policy_levels: + level_allows = False + for policy in scp_level_policies: + statements = collect_matching_statements( + policy=policy, effect="Allow", action=action, resource=resource, principal=None + ) + if statements: + # 'Allow' statements in SCP can't have conditions, we do not check them + level_allows = True + break + + if not level_allows: + return False + + return True + + +@frozen +class FinalAllow: + scopes: List[PermissionScope] + + +@frozen +class Continue: + scopes: List[PermissionScope] + + +@frozen +class Deny: + pass + + +ResourceBasedPolicyResult = Union[FinalAllow, Continue, Deny] + + +# check if the resource based policies allow the action +# as a shortcut we return the first allow statement we find, or a first seen condition. +def check_resource_based_policies( + principal: AwsResource, + action: str, + resource: AwsResource, + resource_based_policies: List[Tuple[PolicySource, PolicyDocument]], +) -> ResourceBasedPolicyResult: + assert resource.arn + + scopes: List[PermissionScope] = [] + + arn = ARN(resource.arn) + explicit_allow_required = False + if arn.service_prefix == "iam" or arn.service_prefix == "kms": + explicit_allow_required = True + + for source, policy in resource_based_policies: + + matching_statements = collect_matching_statements( + policy=policy, + effect="Allow", + action=action, + resource=resource, + principal=principal, + ) + if len(matching_statements) == 0: + continue + + for statement, constraints in matching_statements: + if statement.condition: + scopes.append( + PermissionScope( + source=source, + constraints=tuple(constraints), + conditions=PermissionCondition(allow=(to_json_str(statement.condition),)), + ) + ) + else: + scopes.append( + PermissionScope( + source=source, + constraints=tuple(constraints), + ) + ) + + # if we found any allow statements, let's check the principal and act accordingly + if scopes: + if isinstance(principal, AwsIamUser): + # in case of IAM users, identity_based_policies and permission boundaries are not relevant + # and we can return the result immediately + return FinalAllow(scopes) + + # if we have KMS or IAM service, we want an explicit allow + if explicit_allow_required: + if not scopes: + return Deny() + + # in case of other IAM principals, allow on resource based policy is not enough and + # we need to check the permission boundaries + return Continue(scopes) + + +def check_identity_based_policies( + request_context: IamRequestContext, resource: AwsResource, action: str +) -> List[PermissionScope]: + + scopes: List[PermissionScope] = [] + + for source, policy in request_context.identity_policies: + for statement, resource_constraints in collect_matching_statements( + policy=policy, effect="Allow", action=action, resource=resource, principal=None, source_arn=source.uri + ): + conditions = None + if statement.condition: + conditions = PermissionCondition(allow=(to_json_str(statement.condition),)) + + scopes.append(PermissionScope(source, tuple(resource_constraints), conditions=conditions)) + + return scopes + + +def check_permission_boundaries( + request_context: IamRequestContext, resource: AwsResource, action: str +) -> Union[Literal["Denied", "NextStep"], List[Json]]: + + conditions: List[Json] = [] + + # ignore policy sources and resource constraints because permission boundaries + # can never allow access to a resource, only restrict it + for policy in request_context.permission_boundaries: + for statement, _ in collect_matching_statements( + policy=policy, effect="Allow", action=action, resource=resource, principal=None + ): + if statement.condition: + assert isinstance(statement.condition, dict) + conditions.append(statement.condition) + else: # if there is an allow statement without a condition, the action is allowed + return "NextStep" + + if len(conditions) > 0: + return conditions + + # no matching permission boundaries that allow access + return "Denied" + + +def is_service_linked_role(principal: AwsResource) -> bool: + assert principal.arn + if ":role/" in principal.arn: + arn = ARN(principal.arn) + role_name = arn.resource_path + return role_name.startswith("AWSServiceRoleFor") + + return False + + +def get_action_level(action: str) -> PermissionLevel: + service, action_name = action.split(":") + level = "" + action_data = get_action_data(service, action_name) + if not action_data: + return PermissionLevel.unknown + if len(action_data[service]) > 0: + for info in action_data[service]: + if action == info["action"]: + level = info["access_level"] + break + if level == "List": + return PermissionLevel.list + elif level == "Read": + return PermissionLevel.read + elif level == "Tagging": + return PermissionLevel.tagging + elif level == "Write": + return PermissionLevel.write + elif level == "Permissions management": + return PermissionLevel.permission_management + else: + return PermissionLevel.unknown + + +# logic according to https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html +def check_policies( + request_context: IamRequestContext, + resource: AwsResource, + action: str, + resource_based_policies: List[Tuple[PolicySource, PolicyDocument]], +) -> Optional[AccessPermission]: + + # when any of the conditions evaluate to true, the action is explicitly denied + # comes from any explicit deny statements in all policies + deny_conditions: List[Json] = [] + + # when any of the conditions evaluate to false, the action is implicitly denied + # comes from the permission boundaries + restricting_conditions: List[Json] = [] + + # when any of the scopes evaluate to true, the action is allowed + # comes from the resource based policies and identity based policies + allowed_scopes: List[PermissionScope] = [] + + # 1. check for explicit deny. If denied, we can abort immediately + result = check_explicit_deny(request_context, resource, action, resource_based_policies) + if result == "Denied": + return None + elif result == "NextStep": + pass + else: + for c in result: + # satisfying any of the conditions above will deny the action + deny_conditions.append(c) + + # 2. check for organization SCPs + if len(request_context.service_control_policy_levels) > 0 and not is_service_linked_role(request_context.principal): + org_scp_allowed = scp_allowed(request_context, action, resource) + if not org_scp_allowed: + return None + + # 3. check resource based policies + if len(resource_based_policies) > 0: + resource_result = check_resource_based_policies( + request_context.principal, action, resource, resource_based_policies + ) + if isinstance(resource_result, FinalAllow): + scopes = resource_result.scopes + final_resource_scopes: Set[PermissionScope] = set() + for scope in scopes: + final_resource_scopes.add(scope.with_deny_conditions(deny_conditions)) + + return AccessPermission(action=action, level=get_action_level(action), scopes=tuple(final_resource_scopes)) + if isinstance(resource_result, Continue): + scopes = resource_result.scopes + allowed_scopes.extend(scopes) + + if isinstance(resource_result, Deny): + return None + + # 4. to make it a bit simpler, we check the permission boundaries before checking identity based policies + if len(request_context.permission_boundaries) > 0: + permission_boundary_result = check_permission_boundaries(request_context, resource, action) + if permission_boundary_result == "Denied": + return None + elif permission_boundary_result == "NextStep": + pass + else: + restricting_conditions.extend(permission_boundary_result) + + # 5. check identity based policies + if len(request_context.identity_policies) == 0: + if len(allowed_scopes) == 0: + # resource policy did no allow any actions and we have zero identity based policies -> implicit deny + return None + # otherwise continue with the resource based policies + else: + identity_based_allowed = check_identity_based_policies(request_context, resource, action) + if not identity_based_allowed: + return None + allowed_scopes.extend(identity_based_allowed) + + # 6. check for session policies + # we don't collect session principals and session policies, so this step is skipped + + # 7. if we reached here, the action is allowed + level = get_action_level(action) + + final_scopes: Set[PermissionScope] = set() + for scope in allowed_scopes: + if deny_conditions: + scope = scope.with_deny_conditions(deny_conditions) + final_scopes.add(scope) + + # if there is a scope with no conditions, we can ignore everything else + for scope in final_scopes: + if scope.has_no_condititons(): + final_scopes = {scope} + break + + log.debug( + f"Found access permission, {action} is allowed for {resource} by {request_context.principal}, level: {level}. Scopes: {len(final_scopes)}" + ) + + # return the result + return AccessPermission( + action=action, + level=level, + scopes=tuple(final_scopes), + ) + + +def compute_permissions( + resource: AwsResource, + iam_context: IamRequestContext, + resource_based_policies: List[Tuple[PolicySource, PolicyDocument]], +) -> List[AccessPermission]: + + assert resource.arn + # step 1: find the relevant action to check + relevant_actions = find_all_allowed_actions(iam_context.all_policies(resource_based_policies), resource.arn) + + all_permissions: List[AccessPermission] = [] + + # step 2: for every action, check if it is allowed + for action in relevant_actions: + if p := check_policies(iam_context, resource, action, resource_based_policies): + all_permissions.append(p) + + return all_permissions + + +class AccessEdgeCreator: + + def __init__(self, builder: GraphBuilder): + self.builder = builder + self.principals: List[IamRequestContext] = [] + self._init_principals() + + def _init_principals(self) -> None: + for node in self.builder.nodes(clazz=AwsResource): + if isinstance(node, AwsIamUser): + + identity_based_policies = self._get_user_based_policies(node) + + permission_boundaries: List[PolicyDocument] = [] + if (pb := node.user_permissions_boundary) and (pb_arn := pb.permissions_boundary_arn): + for pb_policy in self.builder.nodes(clazz=AwsIamPolicy, filter=lambda p: p.arn == pb_arn): + if pdj := pb_policy.policy_document_json(): + permission_boundaries.append(PolicyDocument(pdj)) + + # todo: collect these resources + service_control_policy_levels: List[List[PolicyDocument]] = [] + + request_context = IamRequestContext( + principal=node, + identity_policies=identity_based_policies, + permission_boundaries=permission_boundaries, + service_control_policy_levels=service_control_policy_levels, + ) + + self.principals.append(request_context) + + if isinstance(node, AwsIamGroup): + identity_based_policies = self._get_group_based_policies(node) + # todo: collect these resources + service_control_policy_levels = [] + + request_context = IamRequestContext( + principal=node, + identity_policies=identity_based_policies, + permission_boundaries=[], # permission boundaries are not applicable to groups + service_control_policy_levels=service_control_policy_levels, + ) + + self.principals.append(request_context) + + if isinstance(node, AwsIamRole): + identity_based_policies = self._get_role_based_policies(node) + # todo: colect these resources + permission_boundaries = [] + if (pb := node.role_permissions_boundary) and (pb_arn := pb.permissions_boundary_arn): + for pb_policy in self.builder.nodes(clazz=AwsIamPolicy, filter=lambda p: p.arn == pb_arn): + if pdj := pb_policy.policy_document_json(): + permission_boundaries.append(PolicyDocument(pdj)) + # todo: collect these resources + service_control_policy_levels = [] + + request_context = IamRequestContext( + principal=node, + identity_policies=identity_based_policies, + permission_boundaries=permission_boundaries, + service_control_policy_levels=service_control_policy_levels, + ) + + self.principals.append(request_context) + + def _get_user_based_policies(self, principal: AwsIamUser) -> List[Tuple[PolicySource, PolicyDocument]]: + inline_policies = [ + ( + PolicySource(kind=PolicySourceKind.principal, uri=principal.arn or ""), + PolicyDocument(policy.policy_document), + ) + for policy in principal.user_policies + if policy.policy_document + ] + attached_policies = [] + group_policies = [] + for _, to_node in self.builder.graph.edges(principal): + if isinstance(to_node, AwsIamPolicy): + if doc := to_node.policy_document_json(): + attached_policies.append( + ( + PolicySource(kind=PolicySourceKind.principal, uri=to_node.arn or ""), + PolicyDocument(doc), + ) + ) + + if isinstance(to_node, AwsIamGroup): + group = to_node + # inline group policies + for policy in group.group_policies: + if policy.policy_document: + group_policies.append( + ( + PolicySource(kind=PolicySourceKind.group, uri=group.arn or ""), + PolicyDocument(policy.policy_document), + ) + ) + # attached group policies + for _, group_successor in self.builder.graph.edges(group): + if isinstance(group_successor, AwsIamPolicy): + if doc := group_successor.policy_document_json(): + group_policies.append( + ( + PolicySource(kind=PolicySourceKind.group, uri=group_successor.arn or ""), + PolicyDocument(doc), + ) + ) + + return inline_policies + attached_policies + group_policies + + def _get_group_based_policies(self, principal: AwsIamGroup) -> List[Tuple[PolicySource, PolicyDocument]]: + # not really a principal, but could be useful to have access edges for groups + inline_policies = [ + ( + PolicySource(kind=PolicySourceKind.group, uri=principal.arn or ""), + PolicyDocument(policy.policy_document), + ) + for policy in principal.group_policies + if policy.policy_document + ] + + attached_policies = [] + for _, to_node in self.builder.graph.edges(principal): + if isinstance(to_node, AwsIamPolicy): + if doc := to_node.policy_document_json(): + attached_policies.append( + ( + PolicySource(kind=PolicySourceKind.group, uri=to_node.arn or ""), + PolicyDocument(doc), + ) + ) + + return inline_policies + attached_policies + + def _get_role_based_policies(self, principal: AwsIamRole) -> List[Tuple[PolicySource, PolicyDocument]]: + inline_policies = [] + for doc in [p.policy_document for p in principal.role_policies if p.policy_document]: + inline_policies.append( + ( + PolicySource(kind=PolicySourceKind.principal, uri=principal.arn or ""), + PolicyDocument(doc), + ) + ) + + attached_policies = [] + for _, to_node in self.builder.graph.edges(principal): + if isinstance(to_node, AwsIamPolicy): + if policy_doc := to_node.policy_document_json(): + attached_policies.append( + ( + PolicySource(kind=PolicySourceKind.principal, uri=to_node.arn or ""), + PolicyDocument(policy_doc), + ) + ) + + return inline_policies + attached_policies + + def add_access_edges(self) -> None: + + principal_arns = set([p.principal.arn for p in self.principals]) + + for node in self.builder.nodes(clazz=AwsResource, filter=lambda r: r.arn is not None): + + if node.arn in principal_arns: + # do not create cycles + continue + + for context in self.principals: + + resource_policies: List[Tuple[PolicySource, PolicyDocument]] = [] + if isinstance(node, HasResourcePolicy): + for source, json_policy in node.resource_policy(self.builder): + resource_policies.append((source, PolicyDocument(json_policy))) + + permissions = compute_permissions(node, context, resource_policies) + + if not permissions: + continue + + reported = to_json({"permissions": permissions}, strip_nulls=True) + + self.builder.add_edge(from_node=context.principal, edge_type=EdgeType.iam, reported=reported, node=node) diff --git a/plugins/aws/fix_plugin_aws/collector.py b/plugins/aws/fix_plugin_aws/collector.py index 1950561b66..3993e2cc48 100644 --- a/plugins/aws/fix_plugin_aws/collector.py +++ b/plugins/aws/fix_plugin_aws/collector.py @@ -2,10 +2,11 @@ from collections import defaultdict from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime, timedelta, timezone -from typing import List, Type, Optional, ClassVar, Union, cast, Dict, Any +from typing import Any, Dict, List, Type, Optional, ClassVar, Union, cast, Dict, Any from attrs import define +from fix_plugin_aws.access_edges import AccessEdgeCreator from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.configuration import AwsConfig from fix_plugin_aws.resource import ( @@ -254,6 +255,13 @@ def get_last_run() -> Optional[datetime]: log.warning(f"Unexpected node type {node} in graph") raise Exception("Only AWS resources expected") + access_edge_collection_enabled = False + if access_edge_collection_enabled and global_builder.config.collect_access_edges: + # add access edges + log.info(f"[Aws:{self.account.id}] Create access edges.") + access_edge_creator = AccessEdgeCreator(global_builder) + access_edge_creator.add_access_edges() + # final hook when the graph is complete for node, data in list(self.graph.nodes(data=True)): if isinstance(node, AwsResource): diff --git a/plugins/aws/fix_plugin_aws/configuration.py b/plugins/aws/fix_plugin_aws/configuration.py index e088c79f5f..6d8f7a544f 100644 --- a/plugins/aws/fix_plugin_aws/configuration.py +++ b/plugins/aws/fix_plugin_aws/configuration.py @@ -268,6 +268,10 @@ class AwsConfig: default=True, metadata={"description": "Collect resource usage metrics via CloudWatch, enabled by default"}, ) + collect_access_edges: Optional[bool] = field( + default=True, + metadata={"description": "Collect IAM access edges, enabled by default"}, + ) @staticmethod def from_json(json: Json) -> "AwsConfig": diff --git a/plugins/aws/fix_plugin_aws/resource/backup.py b/plugins/aws/fix_plugin_aws/resource/backup.py index 5a1bd24c6d..cbb18e9834 100644 --- a/plugins/aws/fix_plugin_aws/resource/backup.py +++ b/plugins/aws/fix_plugin_aws/resource/backup.py @@ -1,6 +1,6 @@ import logging from datetime import datetime -from typing import Any, ClassVar, Dict, Optional, List, Type +from typing import Any, ClassVar, Dict, Optional, List, Tuple, Type from json import loads as json_loads from attrs import define, field @@ -15,7 +15,7 @@ from fix_plugin_aws.resource.redshift import AwsRedshiftCluster from fix_plugin_aws.resource.s3 import AwsS3Bucket from fix_plugin_aws.utils import TagsValue -from fixlib.baseresources import ModelReference +from fixlib.baseresources import HasResourcePolicy, ModelReference, PolicySource, PolicySourceKind from fixlib.graph import Graph from fixlib.json_bender import F, Bender, S, ForallBend, Bend from fixlib.types import Json @@ -304,7 +304,7 @@ def add_tags(backup_plan: AwsBackupPlan) -> None: @define(eq=False, slots=False) -class AwsBackupVault(BackupResourceTaggable, AwsResource): +class AwsBackupVault(BackupResourceTaggable, AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_backup_vault" _kind_display: ClassVar[str] = "AWS Backup Vault" _kind_description: ClassVar[str] = "AWS Backup Vault is a secure storage container for backup data in AWS Backup. It stores and organizes backup copies, providing encryption and access policies to protect backups. Users can create multiple vaults to separate backups by application, environment, or compliance requirements. AWS Backup Vault supports retention policies and lifecycle management for stored backups." # fmt: skip @@ -341,6 +341,12 @@ class AwsBackupVault(BackupResourceTaggable, AwsResource): lock_date: Optional[datetime] = field(default=None, metadata={"description": "The date and time when Backup Vault Lock configuration becomes immutable, meaning it cannot be changed or deleted. If you applied Vault Lock to your vault without specifying a lock date, you can change your Vault Lock settings, or delete Vault Lock from the vault entirely, at any time. This value is in Unix format, Coordinated Universal Time (UTC), and accurate to milliseconds. For example, the value 1516925490.087 represents Friday, January 26, 2018 12:11:30.087 AM."}) # fmt: skip vault_policy: Optional[Json] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.vault_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, uri=self.arn), self.vault_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/base.py b/plugins/aws/fix_plugin_aws/resource/base.py index e1f896d867..71f4859444 100644 --- a/plugins/aws/fix_plugin_aws/resource/base.py +++ b/plugins/aws/fix_plugin_aws/resource/base.py @@ -580,13 +580,21 @@ def add_node( return node def add_edge( - self, from_node: BaseResource, edge_type: EdgeType = EdgeType.default, reverse: bool = False, **to_node: Any + self, + from_node: BaseResource, + edge_type: EdgeType = EdgeType.default, + reverse: bool = False, + reported: Optional[Json] = None, + **to_node: Any, ) -> None: to_n = self.node(**to_node) if isinstance(from_node, AwsResource) and isinstance(to_n, AwsResource): start, end = (to_n, from_node) if reverse else (from_node, to_n) with self.graph_edges_access.write_access: - self.graph.add_edge(start, end, edge_type=edge_type) + kwargs: Dict[str, Any] = {} + if reported: + kwargs["reported"] = reported + self.graph.add_edge(start, end, edge_type=edge_type, **kwargs) def add_deferred_edge( self, from_node: BaseResource, edge_type: EdgeType, to_node: str, reverse: bool = False diff --git a/plugins/aws/fix_plugin_aws/resource/dynamodb.py b/plugins/aws/fix_plugin_aws/resource/dynamodb.py index a5547954b8..c7bbfbf4fb 100644 --- a/plugins/aws/fix_plugin_aws/resource/dynamodb.py +++ b/plugins/aws/fix_plugin_aws/resource/dynamodb.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import ClassVar, Dict, List, Optional, Type, Any +from typing import ClassVar, Dict, List, Optional, Tuple, Type, Any from attrs import define, field from json import loads as json_loads @@ -8,7 +8,7 @@ from fix_plugin_aws.resource.kinesis import AwsKinesisStream from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import ModelReference +from fixlib.baseresources import HasResourcePolicy, ModelReference, PolicySource, PolicySourceKind from fixlib.graph import Graph from fixlib.json_bender import S, Bend, Bender, ForallBend, bend from fixlib.types import Json @@ -356,7 +356,7 @@ class AwsDynamoDbContinuousBackup: @define(eq=False, slots=False) -class AwsDynamoDbTable(DynamoDbTaggable, AwsResource): +class AwsDynamoDbTable(DynamoDbTaggable, AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_dynamodb_table" _kind_display: ClassVar[str] = "AWS DynamoDB Table" _kind_description: ClassVar[str] = "AWS DynamoDB Table is a fully managed NoSQL database service that stores and retrieves data. It supports key-value and document data models, offering automatic scaling and low-latency performance. DynamoDB Tables handle data storage, indexing, and querying, providing consistent read and write throughput. They offer data encryption, backup, and recovery features for secure and reliable data management." # fmt: skip @@ -419,6 +419,11 @@ class AwsDynamoDbTable(DynamoDbTaggable, AwsResource): dynamodb_continuous_backup: Optional[AwsDynamoDbContinuousBackup] = field(default=None) dynamodb_policy: Optional[Json] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.dynamodb_policy or not self.arn: + return [] + return [(PolicySource(PolicySourceKind.resource, self.arn), self.dynamodb_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ @@ -498,7 +503,7 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]: @define(eq=False, slots=False) -class AwsDynamoDbGlobalTable(DynamoDbTaggable, AwsResource): +class AwsDynamoDbGlobalTable(DynamoDbTaggable, AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_dynamodb_global_table" _kind_display: ClassVar[str] = "AWS DynamoDB Global Table" _kind_description: ClassVar[str] = "AWS DynamoDB Global Table is a feature that replicates DynamoDB tables across multiple AWS regions. It provides multi-region read and write access to data, ensuring low-latency access for globally distributed applications. Global Table maintains consistency across regions, handles conflict resolution, and offers automatic failover, improving availability and disaster recovery capabilities for applications with global user bases." # fmt: skip @@ -525,6 +530,12 @@ class AwsDynamoDbGlobalTable(DynamoDbTaggable, AwsResource): dynamodb_global_table_status: Optional[str] = field(default=None) dynamodb_policy: Optional[Json] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.dynamodb_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.dynamodb_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/ecr.py b/plugins/aws/fix_plugin_aws/resource/ecr.py index b8ad1f7a16..584d41601b 100644 --- a/plugins/aws/fix_plugin_aws/resource/ecr.py +++ b/plugins/aws/fix_plugin_aws/resource/ecr.py @@ -1,6 +1,6 @@ import json import logging -from typing import ClassVar, Dict, Optional, List, Type, Any +from typing import ClassVar, Dict, Optional, List, Tuple, Type, Any from json import loads as json_loads from attrs import define, field @@ -8,6 +8,7 @@ from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder from fix_plugin_aws.utils import ToDict +from fixlib.baseresources import HasResourcePolicy, PolicySource, PolicySourceKind from fixlib.json import sort_json from fixlib.json_bender import Bender, S, Bend from fixlib.types import Json @@ -25,7 +26,7 @@ class AwsEcrEncryptionConfiguration: @define(eq=False, slots=False) -class AwsEcrRepository(AwsResource): +class AwsEcrRepository(AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_ecr_repository" _kind_display: ClassVar[str] = "AWS ECR Repository" _kind_description: ClassVar[str] = "AWS ECR (Elastic Container Registry) is a managed Docker container registry service. It stores, manages, and deploys container images for applications. ECR integrates with other AWS services, provides secure access control, and supports image scanning for vulnerabilities. Users can push, pull, and share Docker images within their AWS environment or with external parties." # fmt: skip @@ -57,6 +58,11 @@ class AwsEcrRepository(AwsResource): lifecycle_policy: Optional[Json] = field(default=None, metadata={"description": "The repository lifecycle policy."}) # fmt: skip repository_policy: Optional[Json] = field(default=None, metadata={"description": "The repository policy."}) # fmt: skip + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.repository_policy: + return [] + return [(PolicySource(PolicySourceKind.resource, self.repository_arn or ""), self.repository_policy or {})] + @classmethod def collect_resources(cls, builder: GraphBuilder) -> None: def add_repository_policy(repository: AwsEcrRepository) -> None: diff --git a/plugins/aws/fix_plugin_aws/resource/efs.py b/plugins/aws/fix_plugin_aws/resource/efs.py index 7cd16c17f6..50c7fc10b5 100644 --- a/plugins/aws/fix_plugin_aws/resource/efs.py +++ b/plugins/aws/fix_plugin_aws/resource/efs.py @@ -1,5 +1,5 @@ import json -from typing import Optional, ClassVar, Dict, List, Type, Any +from typing import Optional, ClassVar, Dict, List, Tuple, Type, Any import math @@ -9,7 +9,7 @@ from fix_plugin_aws.resource.base import AwsApiSpec, GraphBuilder, AwsResource from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import ModelReference, BaseNetworkShare +from fixlib.baseresources import HasResourcePolicy, ModelReference, BaseNetworkShare, PolicySource, PolicySourceKind from fixlib.graph import Graph from fixlib.json import sort_json from fixlib.json_bender import Bender, S, F, Bend @@ -75,7 +75,7 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: @define(eq=False, slots=False) -class AwsEfsFileSystem(EfsTaggable, AwsResource, BaseNetworkShare): +class AwsEfsFileSystem(EfsTaggable, AwsResource, BaseNetworkShare, HasResourcePolicy): kind: ClassVar[str] = "aws_efs_file_system" _kind_display: ClassVar[str] = "AWS EFS File System" _aws_metadata: ClassVar[Dict[str, Any]] = {"provider_link_tpl": "https://{region_id}.console.aws.amazon.com/efs/home?region={region}#/file-systems/{FileSystemId}", "arn_tpl": "arn:{partition}:efs:{region}:{account}:file-system/{id}"} # fmt: skip @@ -116,6 +116,12 @@ class AwsEfsFileSystem(EfsTaggable, AwsResource, BaseNetworkShare): availability_zone_name: Optional[str] = field(default=None) file_system_policy: Optional[Json] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.file_system_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.file_system_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/iam.py b/plugins/aws/fix_plugin_aws/resource/iam.py index ceb2eeaed9..a946872d94 100644 --- a/plugins/aws/fix_plugin_aws/resource/iam.py +++ b/plugins/aws/fix_plugin_aws/resource/iam.py @@ -377,6 +377,9 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]: def service_name(cls) -> str: return service_name + def policy_document_json(self) -> Optional[Json]: + return self.policy_document.document if self.policy_document else None + @define(eq=False, slots=False) class AwsIamGroup(AwsResource, BaseGroup): diff --git a/plugins/aws/fix_plugin_aws/resource/kinesis.py b/plugins/aws/fix_plugin_aws/resource/kinesis.py index 9d01c9bdb3..4c600834f8 100644 --- a/plugins/aws/fix_plugin_aws/resource/kinesis.py +++ b/plugins/aws/fix_plugin_aws/resource/kinesis.py @@ -8,7 +8,7 @@ from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import MetricName, ModelReference +from fixlib.baseresources import HasResourcePolicy, MetricName, ModelReference from fixlib.graph import Graph from fixlib.json_bender import Bender, S, Bend, bend, ForallBend from fixlib.types import Json @@ -89,7 +89,7 @@ class AwsKinesisEnhancedMetrics: @define(eq=False, slots=False) -class AwsKinesisStream(AwsResource): +class AwsKinesisStream(AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_kinesis_stream" _kind_display: ClassVar[str] = "AWS Kinesis Stream" _kind_description: ClassVar[str] = "" # fmt: skip diff --git a/plugins/aws/fix_plugin_aws/resource/kms.py b/plugins/aws/fix_plugin_aws/resource/kms.py index 097429dfce..34db963bd9 100644 --- a/plugins/aws/fix_plugin_aws/resource/kms.py +++ b/plugins/aws/fix_plugin_aws/resource/kms.py @@ -1,12 +1,12 @@ import json -from typing import ClassVar, Dict, List, Optional, Type, Any +from typing import ClassVar, Dict, List, Optional, Tuple, Type, Any from attrs import define, field from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import BaseAccessKey +from fixlib.baseresources import BaseAccessKey, HasResourcePolicy, PolicySource, PolicySourceKind from fixlib.graph import Graph from fixlib.json import sort_json from fixlib.json_bender import Bend, Bender, S, ForallBend, bend @@ -63,7 +63,7 @@ class AwsKmsMultiRegionConfig: @define(eq=False, slots=False) -class AwsKmsKey(AwsResource, BaseAccessKey): +class AwsKmsKey(AwsResource, BaseAccessKey, HasResourcePolicy): kind: ClassVar[str] = "aws_kms_key" _kind_display: ClassVar[str] = "AWS KMS Key" _kind_description: ClassVar[str] = "AWS KMS Key is a managed service that creates and controls cryptographic keys used to protect data in AWS services and applications. It generates, stores, and manages keys for encryption and decryption operations. KMS Keys integrate with other AWS services, providing a centralized system for key management and helping users meet compliance requirements for data security and access control." # fmt: skip @@ -121,6 +121,11 @@ class AwsKmsKey(AwsResource, BaseAccessKey): kms_key_rotation_enabled: Optional[bool] = field(default=None) kms_key_policy: Optional[Json] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.kms_key_policy or not self.arn: + return [] + return [(PolicySource(PolicySourceKind.resource, self.arn), self.kms_key_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/lambda_.py b/plugins/aws/fix_plugin_aws/resource/lambda_.py index aaacd1482b..caa2da3f6b 100644 --- a/plugins/aws/fix_plugin_aws/resource/lambda_.py +++ b/plugins/aws/fix_plugin_aws/resource/lambda_.py @@ -2,72 +2,32 @@ import json as json_p import logging import re -from typing import ClassVar, Dict, Optional, List, Type, Any +from typing import ClassVar, Dict, Optional, List, Tuple, Type, Any from attrs import define, field from fix_plugin_aws.aws_client import AwsClient -from fix_plugin_aws.resource.apigateway import AwsApiGatewayRestApi, AwsApiGatewayResource from fix_plugin_aws.resource.base import AwsResource, GraphBuilder, AwsApiSpec, parse_json from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, normalizer_factory from fix_plugin_aws.resource.ec2 import AwsEc2Subnet, AwsEc2SecurityGroup, AwsEc2Vpc from fix_plugin_aws.resource.kms import AwsKmsKey from fixlib.baseresources import ( BaseServerlessFunction, + HasResourcePolicy, MetricName, ModelReference, + PolicySource, + PolicySourceKind, ) from fixlib.graph import Graph from fixlib.json_bender import Bender, S, Bend, ForallBend, F, bend from fixlib.types import Json +from fixlib.json import sort_json log = logging.getLogger("fix.plugins.aws") service_name = "lambda" -@define(eq=False, slots=False) -class AwsLambdaPolicyStatement: - kind: ClassVar[str] = "aws_lambda_policy_statement" - kind_display: ClassVar[str] = "AWS Lambda Policy Statement" - kind_description: ClassVar[str] = ( - "Lambda Policy Statements are used to define permissions for AWS Lambda" - " functions, specifying what actions can be performed and by whom." - ) - mapping: ClassVar[Dict[str, Bender]] = { - "sid": S("Sid"), - "effect": S("Effect"), - "principal": S("Principal"), - "action": S("Action"), - "resource": S("Resource"), - "condition": S("Condition"), - } - sid: Optional[str] = field(default=None) - effect: Optional[str] = field(default=None) - principal: Optional[Any] = field(default=None) - action: Optional[Any] = field(default=None) - resource: Optional[Any] = field(default=None) - condition: Optional[Any] = field(default=None) - - -@define(eq=False, slots=False) -class AwsLambdaPolicy: - kind: ClassVar[str] = "aws_lambda_policy" - kind_display: ClassVar[str] = "AWS Lambda Policy" - kind_description: ClassVar[str] = ( - "AWS Lambda Policies are permissions policies that determine what actions a" - " Lambda function can take and what resources it can access within the AWS" - " environment." - ) - mapping: ClassVar[Dict[str, Bender]] = { - "id": S("Id"), - "version": S("Version"), - "statement": S("Statement") >> ForallBend(AwsLambdaPolicyStatement.mapping), - } - id: Optional[str] = field(default=None) - version: Optional[str] = field(default=None) - statement: Optional[List[AwsLambdaPolicyStatement]] = field(default=None) - - @define(eq=False, slots=False) class AwsLambdaEnvironmentError: kind: ClassVar[str] = "aws_lambda_environment_error" @@ -230,7 +190,7 @@ class AwsLambdaFunctionUrlConfig: @define(eq=False, slots=False) -class AwsLambdaFunction(AwsResource, BaseServerlessFunction): +class AwsLambdaFunction(AwsResource, BaseServerlessFunction, HasResourcePolicy): kind: ClassVar[str] = "aws_lambda_function" _kind_display: ClassVar[str] = "AWS Lambda Function" _aws_metadata: ClassVar[Dict[str, Any]] = {"provider_link_tpl": "https://{region_id}.console.aws.amazon.com/lambda/home?region={region}#/functions/{FunctionName}", "arn_tpl": "arn:{partition}:lambda:{region}:{account}:function/{name}"} # fmt: skip @@ -320,9 +280,15 @@ class AwsLambdaFunction(AwsResource, BaseServerlessFunction): function_signing_job_arn: Optional[str] = field(default=None) function_architectures: List[str] = field(factory=list) function_ephemeral_storage: Optional[int] = field(default=None) - function_policy: Optional[AwsLambdaPolicy] = field(default=None) + function_policy: Optional[Json] = field(default=None) function_url_config: Optional[AwsLambdaFunctionUrlConfig] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.function_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.function_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ @@ -342,7 +308,7 @@ def add_tags(function: AwsLambdaFunction) -> None: function.tags = tags def get_policy(function: AwsLambdaFunction) -> None: - if policy := builder.client.get( + if raw_policy := builder.client.get( service_name, "get-policy", expected_errors=["ResourceNotFoundException"], # policy is optional @@ -350,33 +316,8 @@ def get_policy(function: AwsLambdaFunction) -> None: result_name="Policy", ): # policy is defined as string, but it is actually a json object - mapped = bend(AwsLambdaPolicy.mapping, json_p.loads(policy)) # type: ignore - if policy_instance := parse_json(mapped, AwsLambdaPolicy, builder): - function.function_policy = policy_instance - for statement in policy_instance.statement or []: - if ( - statement.principal - and statement.condition - and isinstance(statement.principal, dict) - and statement.principal.get("Service") == "apigateway.amazonaws.com" - and (arn_like := statement.condition.get("ArnLike")) is not None - and (source := arn_like.get("AWS:SourceArn")) is not None - ): - source_arn = source.rsplit(":")[-1] - rest_api_id = source_arn.split("/")[0] - builder.dependant_node( - function, - reverse=True, - clazz=AwsApiGatewayRestApi, - id=rest_api_id, - ) - builder.dependant_node( - function, - reverse=True, - clazz=AwsApiGatewayResource, - api_link=rest_api_id, - resource_path="/" + source_arn.split("/")[-1], - ) + json_policy = json_p.loads(raw_policy) # type: ignore + function.function_policy = sort_json(json_policy, sort_list=True) def get_url_config(function: AwsLambdaFunction) -> None: if config := builder.client.get( diff --git a/plugins/aws/fix_plugin_aws/resource/s3.py b/plugins/aws/fix_plugin_aws/resource/s3.py index 580adb8320..bb52c3b8f9 100644 --- a/plugins/aws/fix_plugin_aws/resource/s3.py +++ b/plugins/aws/fix_plugin_aws/resource/s3.py @@ -2,16 +2,25 @@ from collections import defaultdict from datetime import timedelta from json import loads as json_loads -from typing import ClassVar, Dict, List, Type, Optional, cast, Any +from typing import ClassVar, Dict, List, Tuple, Type, Optional, cast, Any from attr import field from attrs import define + from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder, parse_json from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, normalizer_factory from fix_plugin_aws.utils import tags_as_dict -from fixlib.baseresources import BaseBucket, MetricName, PhantomBaseResource, ModelReference +from fixlib.baseresources import ( + BaseBucket, + MetricName, + PhantomBaseResource, + ModelReference, + PolicySourceKind, + PolicySource, + HasResourcePolicy, +) from fixlib.graph import Graph from fixlib.json import is_empty, sort_json from fixlib.json_bender import Bender, S, bend, Bend, ForallBend @@ -163,7 +172,7 @@ class AwsS3Logging: @define(eq=False, slots=False) -class AwsS3Bucket(AwsResource, BaseBucket): +class AwsS3Bucket(AwsResource, BaseBucket, HasResourcePolicy): kind: ClassVar[str] = "aws_s3_bucket" _kind_display: ClassVar[str] = "AWS S3 Bucket" _aws_metadata: ClassVar[Dict[str, Any]] = {"provider_link_tpl": "https://s3.console.aws.amazon.com/s3/buckets/{name}?region={region_id}&bucketType=general&tab=objects", "arn_tpl": "arn:{partition}:s3:{region}:{account}:bucket/{name}"} # fmt: skip @@ -184,6 +193,10 @@ class AwsS3Bucket(AwsResource, BaseBucket): bucket_location: Optional[str] = field(default=None) bucket_lifecycle_policy: Optional[Json] = field(default=None, metadata={"description": "The bucket lifecycle policy."}) # fmt: skip + def resource_policy(self, builder: GraphBuilder) -> List[Tuple[PolicySource, Dict[str, Any]]]: + assert self.arn + return [(PolicySource(PolicySourceKind.resource, self.arn), self.bucket_policy)] if self.bucket_policy else [] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/secretsmanager.py b/plugins/aws/fix_plugin_aws/resource/secretsmanager.py index 7303962d3d..e79f52a172 100644 --- a/plugins/aws/fix_plugin_aws/resource/secretsmanager.py +++ b/plugins/aws/fix_plugin_aws/resource/secretsmanager.py @@ -1,14 +1,16 @@ from datetime import datetime -from typing import ClassVar, Dict, Optional, List, Type, Any +from typing import ClassVar, Dict, Optional, List, Tuple, Type, Any from attrs import define, field from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import ModelReference +from fixlib.baseresources import HasResourcePolicy, ModelReference, PolicySource, PolicySourceKind from fixlib.json_bender import Bender, S, Bend from fixlib.types import Json +from json import loads as json_loads +from fixlib.json import sort_json service_name = "secretsmanager" @@ -27,7 +29,7 @@ class AwsSecretsManagerRotationRulesType: @define(eq=False, slots=False) -class AwsSecretsManagerSecret(AwsResource): +class AwsSecretsManagerSecret(HasResourcePolicy, AwsResource): kind: ClassVar[str] = "aws_secretsmanager_secret" api_spec: ClassVar[AwsApiSpec] = AwsApiSpec(service_name, "list-secrets", "SecretList") _kind_display: ClassVar[str] = "AWS Secrets Manager Secret" @@ -72,6 +74,38 @@ class AwsSecretsManagerSecret(AwsResource): owning_service: Optional[str] = field(default=None, metadata={"description": "Returns the name of the service that created the secret."}) # fmt: skip created_date: Optional[datetime] = field(default=None, metadata={"description": "The date and time when a secret was created."}) # fmt: skip primary_region: Optional[str] = field(default=None, metadata={"description": "The Region where Secrets Manager originated the secret."}) # fmt: skip + policy: Optional[Json] = field(default=None) + + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.policy)] + + @classmethod + def called_collect_apis(cls) -> List[AwsApiSpec]: + return [ + cls.api_spec, + AwsApiSpec(service_name, "get-resource-policy"), + ] + + @classmethod + def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> None: + + def get_policy(secret: AwsSecretsManagerSecret) -> None: + if raw_policy := builder.client.get( + service_name, + "get-resource-policy", + expected_errors=["ResourceNotFoundException"], # policy is optional + SecretId=secret.id, + result_name="ResourcePolicy", + ): + secret.policy = sort_json(json_loads(raw_policy), sort_list=True) # type: ignore + + for js in json: + if instance := cls.from_api(js, builder): + builder.add_node(instance, js) + builder.submit_work(service_name, get_policy, instance) def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: if kms_key_id := source.get("KmsKeyId"): diff --git a/plugins/aws/fix_plugin_aws/resource/sns.py b/plugins/aws/fix_plugin_aws/resource/sns.py index 141b1547c6..6a22292149 100644 --- a/plugins/aws/fix_plugin_aws/resource/sns.py +++ b/plugins/aws/fix_plugin_aws/resource/sns.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import ClassVar, Dict, List, Optional, Type, Any +from typing import ClassVar, Dict, List, Optional, Tuple, Type, Any from attrs import define, field from fix_plugin_aws.aws_client import AwsClient @@ -8,7 +8,7 @@ from fix_plugin_aws.resource.iam import AwsIamRole from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.utils import ToDict -from fixlib.baseresources import EdgeType, MetricName, ModelReference +from fixlib.baseresources import EdgeType, HasResourcePolicy, MetricName, ModelReference, PolicySource, PolicySourceKind from fixlib.graph import Graph from fixlib.json_bender import F, Bender, S, bend, ParseJson, Sorted from fixlib.types import Json @@ -17,7 +17,7 @@ @define(eq=False, slots=False) -class AwsSnsTopic(AwsResource): +class AwsSnsTopic(AwsResource, HasResourcePolicy): kind: ClassVar[str] = "aws_sns_topic" _kind_display: ClassVar[str] = "AWS SNS Topic" _kind_description: ClassVar[str] = "AWS SNS Topic is a messaging service that facilitates communication between distributed systems, applications, and microservices. It implements a publish-subscribe model, where publishers send messages to topics and subscribers receive those messages. SNS supports multiple protocols for message delivery, including HTTP, email, SMS, and mobile push notifications, making it useful for various notification scenarios." # fmt: skip @@ -58,6 +58,12 @@ class AwsSnsTopic(AwsResource): topic_fifo_topic: Optional[bool] = field(default=None) topic_content_based_deduplication: Optional[bool] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.topic_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.topic_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/fix_plugin_aws/resource/sqs.py b/plugins/aws/fix_plugin_aws/resource/sqs.py index ff0af25832..ebaa8923be 100644 --- a/plugins/aws/fix_plugin_aws/resource/sqs.py +++ b/plugins/aws/fix_plugin_aws/resource/sqs.py @@ -1,5 +1,5 @@ from datetime import datetime, timezone -from typing import ClassVar, Dict, List, Optional, Type, Any +from typing import ClassVar, Dict, List, Optional, Tuple, Type, Any from attrs import define, field @@ -8,7 +8,14 @@ from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, normalizer_factory from fix_plugin_aws.resource.kms import AwsKmsKey -from fixlib.baseresources import BaseQueue, MetricName, ModelReference +from fixlib.baseresources import ( + BaseQueue, + HasResourcePolicy, + MetricName, + ModelReference, + PolicySource, + PolicySourceKind, +) from fixlib.graph import Graph from fixlib.json_bender import F, Bender, S, AsInt, AsBool, Bend, ParseJson, Sorted from fixlib.types import Json @@ -35,7 +42,7 @@ class AwsSqsRedrivePolicy: @define(eq=False, slots=False) -class AwsSqsQueue(AwsResource, BaseQueue): +class AwsSqsQueue(AwsResource, BaseQueue, HasResourcePolicy): kind: ClassVar[str] = "aws_sqs_queue" _kind_display: ClassVar[str] = "AWS SQS Queue" _kind_description: ClassVar[str] = "AWS SQS Queue is a managed message queuing service that facilitates communication between distributed system components. It stores messages from producers and delivers them to consumers, ensuring reliable data transfer. SQS supports multiple messaging patterns, including point-to-point and publish-subscribe, and handles message retention, delivery, and deletion. It integrates with other AWS services for building decoupled applications." # fmt: skip @@ -96,6 +103,12 @@ class AwsSqsQueue(AwsResource, BaseQueue): sqs_receive_message_wait_time_seconds: Optional[int] = field(default=None) sqs_managed_sse_enabled: Optional[bool] = field(default=None) + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: + if not self.sqs_policy or not self.arn: + return [] + + return [(PolicySource(PolicySourceKind.resource, self.arn), self.sqs_policy)] + @classmethod def called_collect_apis(cls) -> List[AwsApiSpec]: return [ diff --git a/plugins/aws/pyproject.toml b/plugins/aws/pyproject.toml index 647453a05d..41291c4876 100644 --- a/plugins/aws/pyproject.toml +++ b/plugins/aws/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "retrying", "boto3", "botocore", + "cloudsplaining", ] [project.entry-points."fix.plugins"] diff --git a/plugins/aws/test/__init__.py b/plugins/aws/test/__init__.py index 00018ec2e8..4b96212e7b 100644 --- a/plugins/aws/test/__init__.py +++ b/plugins/aws/test/__init__.py @@ -35,7 +35,7 @@ def builder(aws_client: AwsClient, no_feedback: CoreFeedback) -> Iterator[GraphB yield GraphBuilder( Graph(), Cloud(id="aws"), - AwsAccount(id="test"), + AwsAccount(id="test", arn="arn:aws:organizations::123456789012:account/o-exampleorgid/123456789012"), regions[0], {r.id: r for r in regions}, aws_client, @@ -51,5 +51,5 @@ def no_feedback() -> CoreFeedback: @fixture def account_collector(aws_config: AwsConfig, no_feedback: CoreFeedback) -> AwsAccountCollector: - account = AwsAccount(id="test") + account = AwsAccount(id="test", arn="arn:aws:organizations::123456789012:account/o-exampleorgid/123456789012") return AwsAccountCollector(aws_config, Cloud(id="aws"), account, ["us-east-1"], no_feedback, {}) diff --git a/plugins/aws/test/acccess_edges_test.py b/plugins/aws/test/acccess_edges_test.py new file mode 100644 index 0000000000..f3a2c5c375 --- /dev/null +++ b/plugins/aws/test/acccess_edges_test.py @@ -0,0 +1,919 @@ +from cloudsplaining.scan.policy_document import PolicyDocument +from cloudsplaining.scan.statement_detail import StatementDetail + +from fix_plugin_aws.resource.base import AwsResource +from fix_plugin_aws.resource.iam import AwsIamUser, AwsIamGroup, AwsIamRole +from typing import Any, Dict, List + +import re +from fix_plugin_aws.access_edges import ( + find_allowed_action, + make_resoruce_regex, + check_statement_match, + check_principal_match, + IamRequestContext, + check_explicit_deny, + compute_permissions, +) + +from fixlib.baseresources import PolicySourceKind, PolicySource, PermissionLevel +from fixlib.json import to_json_str + + +def test_find_allowed_action() -> None: + policy_document = { + "Version": "2012-10-17", + "Statement": [ + {"Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": ["arn:aws:s3:::bucket/*"]}, + {"Effect": "Allow", "Action": ["s3:ListBuckets"], "Resource": ["*"]}, + {"Effect": "Allow", "Action": ["ec2:DescribeInstances"], "Resource": ["*"]}, + {"Effect": "Deny", "Action": ["s3:DeleteObject"], "Resource": ["arn:aws:s3:::bucket/*"]}, + ], + } + + allowed_actions = find_allowed_action(PolicyDocument(policy_document), "s3") + + assert allowed_actions == {"s3:GetObject", "s3:PutObject", "s3:ListBuckets"} + + +def test_make_resoruce_regex() -> None: + # Test case 1: Wildcard with * + wildcard = "arn:aws:s3:::my-bucket/*" + regex = make_resoruce_regex(wildcard) + assert isinstance(regex, re.Pattern) + assert regex.match("arn:aws:s3:::my-bucket/my-object") + assert not regex.match("arn:aws:s3:::other-bucket/my-object") + + # Test case 2: Wildcard with ? + wildcard = "arn:aws:s3:::my-bucket/?" + regex = make_resoruce_regex(wildcard) + assert isinstance(regex, re.Pattern) + assert regex.match("arn:aws:s3:::my-bucket/a") + assert not regex.match("arn:aws:s3:::my-bucket/ab") + + # Test case 3: Wildcard with multiple * + wildcard = "arn:aws:s3:::*/*" + regex = make_resoruce_regex(wildcard) + assert isinstance(regex, re.Pattern) + assert regex.match("arn:aws:s3:::my-bucket/my-object") + assert regex.match("arn:aws:s3:::other-bucket/another-object") + + # Test case 4: Wildcard with multiple ? + wildcard = "arn:aws:s3:::my-bucket/??" + regex = make_resoruce_regex(wildcard) + assert isinstance(regex, re.Pattern) + assert regex.match("arn:aws:s3:::my-bucket/ab") + assert not regex.match("arn:aws:s3:::my-bucket/abc") + + +def test_check_statement_match1() -> None: + allow_statement = { + "Effect": "Allow", + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::example-bucket/*", + "Principal": {"AWS": ["arn:aws:iam::123456789012:user/example-user"]}, + } + statement = StatementDetail(allow_statement) + resource = AwsResource(id="bucket", arn="arn:aws:s3:::example-bucket/object.txt") + principal = AwsResource(id="principal", arn="arn:aws:iam::123456789012:user/example-user") + + # Test matching statement + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, principal) + assert result is True + assert constraints == ["arn:aws:s3:::example-bucket/*"] + + # Test wrong effect + result, constraints = check_statement_match(statement, "Deny", "s3:GetObject", resource, principal) + assert result is False + assert constraints == [] + + # wrong principal does not match + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, resource) + assert result is False + + # Test statement with condition + allow_statement["Condition"] = {"StringEquals": {"s3:prefix": "private/"}} + statement = StatementDetail(allow_statement) + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, principal) + assert result is True + + # not providing principaal works + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, principal=None) + assert result is True + + # not providing effect works + result, constraints = check_statement_match( + statement, effect=None, action="s3:GetObject", resource=resource, principal=None + ) + assert result is True + + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, principal) + assert result is True + assert constraints == ["arn:aws:s3:::example-bucket/*"] + + deny_statement = { + "Effect": "Deny", + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::example-bucket/*", + "Principal": {"AWS": ["arn:aws:iam::123456789012:user/example-user"]}, + } + + statement = StatementDetail(deny_statement) + result, constraints = check_statement_match(statement, "Deny", "s3:GetObject", resource, principal) + assert result is True + assert constraints == ["arn:aws:s3:::example-bucket/*"] + + # test not resource + not_resource_statement = dict(allow_statement) + del not_resource_statement["Resource"] + not_resource_statement["NotResource"] = "arn:aws:s3:::example-bucket/private/*" + statement = StatementDetail(not_resource_statement) + result, constraints = check_statement_match(statement, "Allow", "s3:GetObject", resource, principal) + assert result is True + assert constraints == ["not arn:aws:s3:::example-bucket/private/*"] + + +def test_check_principal_match() -> None: + principal = AwsIamUser(id="user-id", arn="arn:aws:iam::123456789012:user/user-name") + aws_principal_list = ["*", "arn:aws:iam::123456789012:user/user-name", "user-id", "123456789012"] + + assert check_principal_match(principal, aws_principal_list) is True + + principal = AwsIamUser(id="user-id", arn="arn:aws:iam::123456789012:user/user-name") + aws_principal_list = ["another-arn", "another-id"] + + assert check_principal_match(principal, aws_principal_list) is False + + principal = AwsIamUser(id="user-id", arn="arn:aws:iam::123456789012:user/user-name") + aws_principal_list = ["*"] + + assert check_principal_match(principal, aws_principal_list) is True + + +def test_no_explicit_deny() -> None: + """Test when there is no explicit deny in any policies, expect 'NextStep'.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + + request_context = IamRequestContext( + principal=principal, + identity_policies=[], + permission_boundaries=[], + service_control_policy_levels=[], + ) + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies=[]) + assert result == "NextStep" + + +def test_explicit_deny_in_identity_policy() -> None: + """Test when there is an explicit deny without condition in identity policy, expect 'Denied'.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + assert principal.arn + + policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [{"Effect": "Deny", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::example-bucket/*"}], + } + policy_document = PolicyDocument(policy_json) + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=principal.arn), policy_document)] + permission_boundaries: List[PolicyDocument] = [] + service_control_policy_levels: List[List[PolicyDocument]] = [] + + request_context = IamRequestContext( + principal=principal, + identity_policies=identity_policies, + permission_boundaries=permission_boundaries, + service_control_policy_levels=service_control_policy_levels, + ) + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies=[]) + assert result == "Denied" + + +def test_explicit_deny_with_condition_in_identity_policy() -> None: + """Test when there is an explicit deny with condition in identity policy, expect list of conditions.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + assert principal.arn + + policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::example-bucket/*", + "Condition": {"StringNotEquals": {"aws:username": "test-user"}}, + } + ], + } + policy_document = PolicyDocument(policy_json) + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=principal.arn), policy_document)] + + request_context = IamRequestContext( + principal=principal, + identity_policies=identity_policies, + permission_boundaries=[], + service_control_policy_levels=[], + ) + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies=[]) + expected_conditions = [policy_json["Statement"][0]["Condition"]] + assert result == expected_conditions + + +def test_explicit_deny_in_scp() -> None: + """Test when there is an explicit deny without condition in SCP, expect 'Denied'.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + + scp_policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [{"Effect": "Deny", "Action": "s3:GetObject", "Resource": "*"}], + } + scp_policy_document = PolicyDocument(scp_policy_json) + service_control_policy_levels = [[scp_policy_document]] + + request_context = IamRequestContext( + principal=principal, + identity_policies=[], + permission_boundaries=[], + service_control_policy_levels=service_control_policy_levels, + ) + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies=[]) + assert result == "Denied" + + +def test_explicit_deny_with_condition_in_scp() -> None: + """Test when there is an explicit deny with condition in SCP, expect list of conditions.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + + scp_policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "Action": "s3:GetObject", + "Resource": "*", + "Condition": {"Bool": {"aws:SecureTransport": "false"}}, + } + ], + } + scp_policy_document = PolicyDocument(scp_policy_json) + service_control_policy_levels = [ + [ + scp_policy_document, + ] + ] + + request_context = IamRequestContext( + principal=principal, + identity_policies=[], + permission_boundaries=[], + service_control_policy_levels=service_control_policy_levels, + ) + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies=[]) + expected_conditions = [scp_policy_json["Statement"][0]["Condition"]] + assert result == expected_conditions + + +def test_explicit_deny_in_resource_policy() -> None: + """Test when there is an explicit deny without condition in resource-based policy, expect 'Denied'.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + + request_context = IamRequestContext( + principal=principal, + identity_policies=[], + permission_boundaries=[], + service_control_policy_levels=[], + ) + + policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "Principal": {"AWS": "*"}, + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::example-bucket/*", + } + ], + } + policy_document = PolicyDocument(policy_json) + resource_based_policies = [ + (PolicySource(kind=PolicySourceKind.resource, uri="arn:aws:s3:::example-bucket"), policy_document) + ] + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies) + assert result == "Denied" + + +def test_explicit_deny_with_condition_in_resource_policy() -> None: + """Test when there is an explicit deny with condition in resource-based policy, expect list of conditions.""" + principal = AwsIamUser(id="AID1234567890", arn="arn:aws:iam::123456789012:user/test-user") + + request_context = IamRequestContext( + principal=principal, + identity_policies=[], + permission_boundaries=[], + service_control_policy_levels=[], + ) + + policy_json: Dict[str, Any] = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "Principal": {"AWS": "*"}, + "Action": "s3:GetObject", + "Resource": "arn:aws:s3:::example-bucket/*", + "Condition": {"IpAddress": {"aws:SourceIp": "192.0.2.0/24"}}, + } + ], + } + policy_document = PolicyDocument(policy_json) + resource_based_policies = [ + (PolicySource(kind=PolicySourceKind.resource, uri="arn:aws:s3:::example-bucket"), policy_document) + ] + + resource = AwsResource(id="some-resource", arn="arn:aws:s3:::example-bucket/object.txt") + action = "s3:GetObject" + + result = check_explicit_deny(request_context, resource, action, resource_based_policies) + expected_conditions = [policy_json["Statement"][0]["Condition"]] + assert result == expected_conditions + + +def test_compute_permissions_user_inline_policy_allow() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3GetObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), policy_document)] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + assert len(permissions) == 1 + assert permissions[0].action == "s3:ListBucket" + assert permissions[0].level == PermissionLevel.list + assert len(permissions[0].scopes) == 1 + s = permissions[0].scopes[0] + assert s.source.kind == PolicySourceKind.principal + assert s.source.uri == user.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + + +def test_compute_permissions_user_inline_policy_allow_with_conditions() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + condition = {"IpAddress": {"aws:SourceIp": "1.1.1.1"}} + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3GetObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + "Condition": condition, + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), policy_document)] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + assert len(permissions) == 1 + assert permissions[0].action == "s3:ListBucket" + assert permissions[0].level == PermissionLevel.list + assert len(permissions[0].scopes) == 1 + s = permissions[0].scopes[0] + assert s.source.kind == PolicySourceKind.principal + assert s.source.uri == user.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + assert s.conditions + assert s.conditions.allow == (to_json_str(condition),) + + +def test_compute_permissions_user_inline_policy_deny() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyS3PutObject", + "Effect": "Deny", + "Action": "s3:PutObject", + "Resource": "arn:aws:s3:::my-test-bucket/*", + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), policy_document)] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 0 + + +def test_compute_permissions_user_inline_policy_deny_with_condition() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + condition = {"IpAddress": {"aws:SourceIp": "1.1.1.1"}} + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyS3PutObject", + "Effect": "Deny", + "Action": "s3:PutObject", + "Resource": "arn:aws:s3:::my-test-bucket/*", + "Condition": condition, + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), policy_document)] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + # deny does not grant any permissions by itself, even if the condition is met + assert len(permissions) == 0 + + +def test_deny_overrides_allow() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + deny_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyS3PutObject", + "Effect": "Deny", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + deny_policy_document = PolicyDocument(deny_policy_json) + + allow_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3PutObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + allow_policy_document = PolicyDocument(allow_policy_json) + + identity_policies = [ + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), deny_policy_document), + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), allow_policy_document), + ] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 0 + + +def test_deny_different_action_does_not_override_allow() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + deny_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyS3PutObject", + "Effect": "Deny", + "Action": "s3:PutObject", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + deny_policy_document = PolicyDocument(deny_policy_json) + + allow_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3PutObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + allow_policy_document = PolicyDocument(allow_policy_json) + + identity_policies = [ + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), deny_policy_document), + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), allow_policy_document), + ] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + + +def test_deny_overrides_allow_with_condition() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + condition = {"IpAddress": {"aws:SourceIp": "1.1.1.1"}} + + deny_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyS3PutObject", + "Effect": "Deny", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + "Condition": condition, + } + ], + } + deny_policy_document = PolicyDocument(deny_policy_json) + + allow_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3PutObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + allow_policy_document = PolicyDocument(allow_policy_json) + + identity_policies = [ + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), deny_policy_document), + (PolicySource(kind=PolicySourceKind.principal, uri=user.arn), allow_policy_document), + ] + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + p = permissions[0] + assert p.action == "s3:ListBucket" + assert p.level == PermissionLevel.list + assert len(p.scopes) == 1 + s = p.scopes[0] + assert s.source.kind == PolicySourceKind.principal + assert s.source.uri == user.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + assert s.conditions + assert s.conditions.deny == (to_json_str(condition),) + + +def test_compute_permissions_resource_based_policy_allow() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::111122223333:user/test-user") + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + assert bucket.arn + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowCrossAccountAccess", + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::111122223333:user/test-user"}, + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + policy_document = PolicyDocument(policy_json) + + request_context = IamRequestContext( + principal=user, identity_policies=[], permission_boundaries=[], service_control_policy_levels=[] + ) + + resource_based_policies = [(PolicySource(kind=PolicySourceKind.resource, uri=bucket.arn), policy_document)] + + permissions = compute_permissions( + resource=bucket, iam_context=request_context, resource_based_policies=resource_based_policies + ) + + assert len(permissions) == 1 + p = permissions[0] + assert p.action == "s3:ListBucket" + assert p.level == PermissionLevel.list + assert len(p.scopes) == 1 + s = p.scopes[0] + assert s.source.kind == PolicySourceKind.resource + assert s.source.uri == bucket.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + + +def test_compute_permissions_permission_boundary_restrict() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + identity_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3DeleteObject", + "Effect": "Allow", + "Action": "s3:DeleteBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + }, + { + "Sid": "AllowS3GetObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + }, + ], + } + identity_policy_document = PolicyDocument(identity_policy_json) + + permission_boundary_json = { + "Version": "2012-10-17", + "Statement": [ + {"Sid": "Boundary", "Effect": "Allow", "Action": ["s3:ListBucket", "s3:PutObject"], "Resource": "*"} + ], + } + permission_boundary_document = PolicyDocument(permission_boundary_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), identity_policy_document)] + + permission_boundaries = [permission_boundary_document] + + request_context = IamRequestContext( + principal=user, + identity_policies=identity_policies, + permission_boundaries=permission_boundaries, + service_control_policy_levels=[], + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + p = permissions[0] + assert p.action == "s3:ListBucket" + assert p.level == PermissionLevel.list + assert len(p.scopes) == 1 + s = p.scopes[0] + assert s.source.kind == PolicySourceKind.principal + assert s.source.uri == user.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + + +def test_compute_permissions_scp_deny() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + assert user.arn + + ec2_instance = AwsResource(id="instance123", arn="arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0") + + identity_policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowTerminateInstances", + "Effect": "Allow", + "Action": "ec2:TerminateInstances", + "Resource": ec2_instance.arn, + } + ], + } + identity_policy_document = PolicyDocument(identity_policy_json) + + scp_policy_json = { + "Version": "2012-10-17", + "Statement": [ + {"Sid": "DenyTerminateInstances", "Effect": "Deny", "Action": "ec2:TerminateInstances", "Resource": "*"} + ], + } + scp_policy_document = PolicyDocument(scp_policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=user.arn), identity_policy_document)] + + service_control_policy_levels = [[scp_policy_document]] + + request_context = IamRequestContext( + principal=user, + identity_policies=identity_policies, + permission_boundaries=[], + service_control_policy_levels=service_control_policy_levels, + ) + + permissions = compute_permissions(resource=ec2_instance, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 0 + + +def test_compute_permissions_user_with_group_policies() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + group = AwsResource(id="group123", arn="arn:aws:iam::123456789012:group/test-group") + assert group.arn + + group_policy_json = { + "Version": "2012-10-17", + "Statement": [ + {"Sid": "AllowS3ListBucket", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": bucket.arn} + ], + } + group_policy_document = PolicyDocument(group_policy_json) + + identity_policies = [] + + identity_policies.append((PolicySource(kind=PolicySourceKind.group, uri=group.arn), group_policy_document)) + + request_context = IamRequestContext( + principal=user, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + p = permissions[0] + assert p.action == "s3:ListBucket" + assert p.level == PermissionLevel.list + assert len(p.scopes) == 1 + s = p.scopes[0] + assert s.source.kind == PolicySourceKind.group + assert s.source.uri == group.arn + assert s.constraints == (bucket.arn,) + + +def test_compute_permissions_implicit_deny() -> None: + user = AwsIamUser(id="user123", arn="arn:aws:iam::123456789012:user/test-user") + table = AwsResource(id="table123", arn="arn:aws:dynamodb:us-east-1:123456789012:table/my-table") + + request_context = IamRequestContext( + principal=user, identity_policies=[], permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=table, iam_context=request_context, resource_based_policies=[]) + + # Assert that permissions do not include any actions (implicit deny) + assert len(permissions) == 0 + + +def test_compute_permissions_group_inline_policy_allow() -> None: + group = AwsIamGroup(id="group123", arn="arn:aws:iam::123456789012:group/test-group") + assert group.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3ListBucket", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.group, uri=group.arn), policy_document)] + + request_context = IamRequestContext( + principal=group, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + assert permissions[0].action == "s3:ListBucket" + assert permissions[0].level == PermissionLevel.list + assert len(permissions[0].scopes) == 1 + s = permissions[0].scopes[0] + assert s.source.kind == PolicySourceKind.group + assert s.source.uri == group.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) + + +def test_compute_permissions_role_inline_policy_allow() -> None: + role = AwsIamRole(id="role123", arn="arn:aws:iam::123456789012:role/test-role") + assert role.arn + + bucket = AwsResource(id="bucket123", arn="arn:aws:s3:::my-test-bucket") + + policy_json = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowS3PutObject", + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws:s3:::my-test-bucket", + } + ], + } + policy_document = PolicyDocument(policy_json) + + identity_policies = [(PolicySource(kind=PolicySourceKind.principal, uri=role.arn), policy_document)] + + request_context = IamRequestContext( + principal=role, identity_policies=identity_policies, permission_boundaries=[], service_control_policy_levels=[] + ) + + permissions = compute_permissions(resource=bucket, iam_context=request_context, resource_based_policies=[]) + + assert len(permissions) == 1 + assert permissions[0].action == "s3:ListBucket" + assert permissions[0].level == PermissionLevel.list + assert len(permissions[0].scopes) == 1 + s = permissions[0].scopes[0] + assert s.source.kind == PolicySourceKind.principal + assert s.source.uri == role.arn + assert s.constraints == ("arn:aws:s3:::my-test-bucket",) diff --git a/plugins/aws/test/collector_test.py b/plugins/aws/test/collector_test.py index 9c2b747013..5d19ae5c2c 100644 --- a/plugins/aws/test/collector_test.py +++ b/plugins/aws/test/collector_test.py @@ -35,7 +35,7 @@ def count_kind(clazz: Type[AwsResource]) -> int: assert len(threading.enumerate()) == 1 # ensure the correct number of nodes and edges assert count_kind(AwsResource) == 261 - assert len(account_collector.graph.edges) == 579 + assert len(account_collector.graph.edges) == 575 assert len(account_collector.graph.deferred_edges) == 2 for node in account_collector.graph.nodes: if isinstance(node, AwsRegion): diff --git a/plugins/aws/test/resources/files/secretsmanager/get-resource-policy__foo.json b/plugins/aws/test/resources/files/secretsmanager/get-resource-policy__foo.json new file mode 100644 index 0000000000..71b4d4d733 --- /dev/null +++ b/plugins/aws/test/resources/files/secretsmanager/get-resource-policy__foo.json @@ -0,0 +1,5 @@ +{ + "ARN": "foo", + "Name": "bar", + "ResourcePolicy": "{\"Version\":\"2012-10-17\",\"Statement\":[]}" +} \ No newline at end of file diff --git a/plugins/aws/test/resources/lambda_test.py b/plugins/aws/test/resources/lambda_test.py index 780ccbabf5..496d8bd681 100644 --- a/plugins/aws/test/resources/lambda_test.py +++ b/plugins/aws/test/resources/lambda_test.py @@ -1,33 +1,11 @@ -from fix_plugin_aws.resource.lambda_ import AwsLambdaFunction, AwsLambdaPolicy +from fix_plugin_aws.resource.lambda_ import AwsLambdaFunction from fixlib.graph import Graph -from fixlib.json import from_json from test.resources import round_trip_for from typing import Any, cast from types import SimpleNamespace from fix_plugin_aws.aws_client import AwsClient -def test_regression_lamda_get_policy() -> None: - value_to_read = { - "policy": { - "id": "default", - "version": "2012-10-17", - "statement": [ - { - "sid": "StackSet-AWSControlTower-ALCD-LZ-resource-owner-tag", - "effect": "Allow", - "principal": {"Service": "events.amazonaws.com"}, - "action": "lambda:InvokeFunction", - "resource": "arn:aws:lambda:eu-central-1:test:function:aws-controltower-owner-tagging-func", - "condition": None, - } - ], - }, - "policy_revision_id": "b3f179eb-569b-4ea2-8ec4-4324609b0694", - } - from_json(value_to_read, AwsLambdaPolicy) - - def test_lambda() -> None: first, graph = round_trip_for(AwsLambdaFunction) assert len(graph.resources_of(AwsLambdaFunction)) == 2 diff --git a/plugins/aws/tox.ini b/plugins/aws/tox.ini index eb2b934b13..0f6055cb16 100644 --- a/plugins/aws/tox.ini +++ b/plugins/aws/tox.ini @@ -27,7 +27,7 @@ commands = flake8 commands= pytest [testenv:black] -commands = black --line-length 120 --check --diff --target-version py39 . +commands = black --line-length 120 --check --diff --target-version py312 . [testenv:mypy] commands= python -m mypy --install-types --non-interactive --python-version 3.12 --strict fix_plugin_aws test