From 9b934889fba595e1c6e121344f60d2971bd1f85a Mon Sep 17 00:00:00 2001 From: Nikita Melkozerov Date: Thu, 28 Nov 2024 12:44:12 +0100 Subject: [PATCH] use segmented arn matching for resources --- plugins/aws/fix_plugin_aws/access_edges.py | 123 +++++++++++++++++---- 1 file changed, 101 insertions(+), 22 deletions(-) diff --git a/plugins/aws/fix_plugin_aws/access_edges.py b/plugins/aws/fix_plugin_aws/access_edges.py index 5f1a08258..d348779a3 100644 --- a/plugins/aws/fix_plugin_aws/access_edges.py +++ b/plugins/aws/fix_plugin_aws/access_edges.py @@ -72,6 +72,8 @@ def pattern_from_action(action: str) -> ActionWildcardPattern: self.actions_patterns = [pattern_from_action(action) for action in self.actions] self.not_action_patterns = [pattern_from_action(action) for action in self.not_action] + self.resource_patterns = [ResourceWildcardPattern.from_str(resource) for resource in self.resources] + self.not_resource_patterns = [ResourceWildcardPattern.from_str(resource) for resource in self.not_resource] class FixPolicyDocument(PolicyDocument): @@ -94,6 +96,14 @@ class ArnResourceValueKind(enum.Enum): Pattern = 2 # the segment is a pattern, e.g. "my_corporate_bucket/*", Any = 3 # the segment is missing, e.g. "::" or it is a wildcard, e.g. "*" + @staticmethod + def from_str(value: str) -> "ArnResourceValueKind": + if value == "*": + return ArnResourceValueKind.Any + if "*" in value: + return ArnResourceValueKind.Pattern + return ArnResourceValueKind.Static + @frozen(slots=True) class ArnResource: value: str @@ -310,6 +320,55 @@ def list_principals(self, resource_arn: ARN) -> Set[str]: +@frozen(slots=True) +class ResourceWildcardPattern: + raw_value: str + partition: str | None # None in case the whole string is "*" + service: str + region: str + region_value_kind: ArnResourceValueKind + account: str + account_value_kind: ArnResourceValueKind + resource: str + resource_value_kind: ArnResourceValueKind + + + @staticmethod + def from_str(value: str) -> "ResourceWildcardPattern": + if value == "*": + return ResourceWildcardPattern( + raw_value=value, + partition=None, + service="*", + region="*", + region_value_kind=ArnResourceValueKind.Any, + account="*", + account_value_kind=ArnResourceValueKind.Any, + resource="*", + resource_value_kind=ArnResourceValueKind.Any + ) + + try: + splitted = value.split(":", 5) + if len(splitted) != 6: + raise ValueError(f"Invalid resource pattern: {value}") + _, partition, service, region, account, resource = splitted + + return ResourceWildcardPattern( + raw_value=value, + partition=partition, + service=service, + region=region, + region_value_kind=ArnResourceValueKind.from_str(region), + account=account, + account_value_kind=ArnResourceValueKind.from_str(account), + resource=resource, + resource_value_kind=ArnResourceValueKind.from_str(resource) + ) + except Exception as e: + log.error(f"Error parsing resource pattern {value}: {e}") + raise e + @frozen(slots=True) class IamRequestContext: principal: AwsResource @@ -472,18 +531,6 @@ def make_resoruce_regex(aws_resorce_wildcard: str) -> Pattern[str]: return re.compile(f"^{python_regex}$", re.IGNORECASE) -def _expand_wildcards_and_match(*, identifier: str, wildcard_string: str) -> bool: - """ - helper function to expand wildcards and match the identifier - - use case: - match the resource constraint (wildcard) with the ARN - match the wildcard action with the specific action - """ - pattern = make_resoruce_regex(wildcard_string) - return pattern.match(identifier) is not None - - @lru_cache(maxsize=1024) def _compile_action_pattern(wildcard_pattern: str) -> tuple[str, re.Pattern[str] | None]: """ @@ -535,8 +582,40 @@ def expand_action_wildcards_and_match(action: ActionToCheck, wildcard_pattern: A return False -def expand_arn_wildcards_and_match(identifier: str, wildcard_string: str) -> bool: - return _expand_wildcards_and_match(identifier=identifier, wildcard_string=wildcard_string) +def match_pattern(resource_segment: str, wildcard_segment: str, wildcard_segment_kind: ArnResourceValueKind) -> bool: + match wildcard_segment_kind: + case ArnResourceValueKind.Any: + return True + case ArnResourceValueKind.Pattern: + return fnmatch.fnmatch(resource_segment, wildcard_segment) + case ArnResourceValueKind.Static: + return resource_segment == wildcard_segment + + + +def expand_arn_wildcards_and_match(identifier: ARN, wildcard_string: ResourceWildcardPattern) -> bool: + + # if wildard is *, we can shortcut here + if wildcard_string.partition is None: + return True + + # go through the ARN segments and match them + if not wildcard_string.partition == identifier.partition: + return False + + if not wildcard_string.service == identifier.service_prefix: + return False + + if not match_pattern(identifier.region, wildcard_string.region, wildcard_string.region_value_kind): + return False + + if not match_pattern(identifier.account, wildcard_string.account, wildcard_string.account_value_kind): + return False + + if not match_pattern(identifier.resource_string, wildcard_string.resource, wildcard_string.resource_value_kind): + return False + + return True @lru_cache(maxsize=4096) @@ -621,19 +700,19 @@ def check_resource_match(arn: ARN) -> Optional[List[ResourceConstraint]]: # step 4: check if the resource matches matched_resource_constraints: List[ResourceConstraint] = [] resource_matches = False - if len(statement.resources) > 0: - for resource_constraint in statement.resources: - if expand_arn_wildcards_and_match(identifier=arn.arn, wildcard_string=resource_constraint): - matched_resource_constraints.append(resource_constraint) + if len(statement.resource_patterns) > 0: + for resource_constraint in statement.resource_patterns: + if expand_arn_wildcards_and_match(identifier=arn, wildcard_string=resource_constraint): + matched_resource_constraints.append(resource_constraint.raw_value) resource_matches = True break - elif len(statement.not_resource) > 0: + elif len(statement.not_resource_patterns) > 0: resource_matches = True - for not_resource_constraint in statement.not_resource: - if expand_arn_wildcards_and_match(identifier=arn.arn, wildcard_string=not_resource_constraint): + for not_resource_constraint in statement.not_resource_patterns: + if expand_arn_wildcards_and_match(identifier=arn, wildcard_string=not_resource_constraint): resource_matches = False break - matched_resource_constraints.append("not " + not_resource_constraint) + matched_resource_constraints.append("not " + not_resource_constraint.raw_value) else: # no Resource/NotResource specified, consider allowed resource_matches = True