diff --git a/checkov/common/checks_infra/checks_parser.py b/checkov/common/checks_infra/checks_parser.py index 33f296a1812..6560253bcad 100644 --- a/checkov/common/checks_infra/checks_parser.py +++ b/checkov/common/checks_infra/checks_parser.py @@ -58,6 +58,8 @@ ) from checkov.common.checks_infra.solvers.connections_solvers.connection_one_exists_solver import \ ConnectionOneExistsSolver +from checkov.common.checks_infra.solvers.resource_solvers import ExistsResourcerSolver, NotExistsResourcerSolver +from checkov.common.checks_infra.solvers.resource_solvers.base_resource_solver import BaseResourceSolver from checkov.common.graph.checks_infra.base_check import BaseGraphCheck from checkov.common.graph.checks_infra.base_parser import BaseGraphCheckParser from checkov.common.graph.checks_infra.enums import SolverType @@ -144,6 +146,12 @@ "attribute": SolverType.ATTRIBUTE, "connection": SolverType.CONNECTION, "filter": SolverType.FILTER, + "resource": SolverType.RESOURCE, +} + +operator_to_resource_solver_classes: dict[str, Type[BaseResourceSolver]] = { + "exists": ExistsResourcerSolver, + "not_exists": NotExistsResourcerSolver, } JSONPATH_PREFIX = "jsonpath_" @@ -298,6 +306,9 @@ def get_check_solver(self, check: BaseGraphCheck) -> BaseSolver: SolverType.FILTER: operator_to_filter_solver_classes.get(check.operator, lambda *args: None)( check.resource_types, check.attribute, check.attribute_value ), + SolverType.RESOURCE: operator_to_resource_solver_classes.get(check.operator, lambda *args: None)( + check.resource_types + ), } solver = type_to_solver.get(check.type) # type:ignore[arg-type] # if not str will return None diff --git a/checkov/common/checks_infra/solvers/resource_solvers/__init__.py b/checkov/common/checks_infra/solvers/resource_solvers/__init__.py new file mode 100644 index 00000000000..cc33f5a072d --- /dev/null +++ b/checkov/common/checks_infra/solvers/resource_solvers/__init__.py @@ -0,0 +1,2 @@ +from checkov.common.checks_infra.solvers.resource_solvers.not_exists_resource_solver import ExistsResourcerSolver # noqa +from checkov.common.checks_infra.solvers.resource_solvers.not_exists_resource_solver import NotExistsResourcerSolver # noqa diff --git a/checkov/common/checks_infra/solvers/resource_solvers/base_resource_solver.py b/checkov/common/checks_infra/solvers/resource_solvers/base_resource_solver.py new file mode 100644 index 00000000000..5d6f061f4ee --- /dev/null +++ b/checkov/common/checks_infra/solvers/resource_solvers/base_resource_solver.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from abc import abstractmethod +from typing import Any, Callable, TYPE_CHECKING + +from networkx import DiGraph + +import concurrent.futures + +from concurrent.futures import ThreadPoolExecutor + +from checkov.common.graph.checks_infra.enums import SolverType +from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver +from checkov.common.graph.graph_builder import CustomAttributes + +if TYPE_CHECKING: + from checkov.common.typing import LibraryGraph + + +class BaseResourceSolver(BaseSolver): + def __init__(self, resource_types: list[str]) -> None: + super().__init__(SolverType.RESOURCE) + self.resource_types = resource_types + self.vertices: list[dict[str, Any]] = [] + + @abstractmethod + def get_operation(self, resource_type: str) -> bool: + raise NotImplementedError() + + def _get_operation(self, *args: Any, **kwargs: Any) -> Callable[..., bool]: + # not needed + return lambda: True + + def run( + self, graph_connector: LibraryGraph + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: + executer = ThreadPoolExecutor() + jobs = [] + passed_vertices: list[dict[str, Any]] = [] + failed_vertices: list[dict[str, Any]] = [] + unknown_vertices: list[dict[str, Any]] = [] + + if isinstance(graph_connector, DiGraph): + for _, data in graph_connector.nodes(data=True): + jobs.append(executer.submit(self._process_node, data, passed_vertices, failed_vertices, unknown_vertices)) + + concurrent.futures.wait(jobs) + return passed_vertices, failed_vertices, unknown_vertices + + for _, data in graph_connector.nodes(): + result = self.get_operation(resource_type=data.get(CustomAttributes.RESOURCE_TYPE)) + if result: + passed_vertices.append(data) + else: + failed_vertices.append(data) + + return passed_vertices, failed_vertices, [] + + def _process_node(self, data: dict[str, str], passed_vartices: list[dict[str, Any]], + failed_vertices: list[dict[str, Any]], unknown_vertices: list[dict[str, Any]]) -> None: + result = self.get_operation(data.get(CustomAttributes.RESOURCE_TYPE)) # type:ignore[arg-type] + # A None indicate for UNKNOWN result - the vertex shouldn't be added to the passed or the failed vertices + if result is None: + unknown_vertices.append(data) + elif result: + passed_vartices.append(data) + else: + failed_vertices.append(data) diff --git a/checkov/common/checks_infra/solvers/resource_solvers/exists_resource_solver.py b/checkov/common/checks_infra/solvers/resource_solvers/exists_resource_solver.py new file mode 100644 index 00000000000..b54189c0935 --- /dev/null +++ b/checkov/common/checks_infra/solvers/resource_solvers/exists_resource_solver.py @@ -0,0 +1,12 @@ +from __future__ import annotations + + +from checkov.common.graph.checks_infra.enums import Operators +from checkov.common.checks_infra.solvers.resource_solvers.base_resource_solver import BaseResourceSolver + + +class ExistsResourcerSolver(BaseResourceSolver): + operator = Operators.EXISTS # noqa: CCE003 # a static attribute + + def get_operation(self, resource_type: str | None) -> bool: + return resource_type in self.resource_types diff --git a/checkov/common/checks_infra/solvers/resource_solvers/not_exists_resource_solver.py b/checkov/common/checks_infra/solvers/resource_solvers/not_exists_resource_solver.py new file mode 100644 index 00000000000..d8d34a801af --- /dev/null +++ b/checkov/common/checks_infra/solvers/resource_solvers/not_exists_resource_solver.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from typing import Any + +from checkov.common.checks_infra.solvers.resource_solvers.exists_resource_solver import ExistsResourcerSolver +from checkov.common.graph.checks_infra.enums import Operators + + +class NotExistsResourcerSolver(ExistsResourcerSolver): + operator = Operators.NOT_EXISTS # noqa: CCE003 # a static attribute + + def get_operation(self, *args: Any, **kwargs: Any) -> bool: + return not super().get_operation(*args, **kwargs) diff --git a/checkov/common/graph/checks_infra/enums.py b/checkov/common/graph/checks_infra/enums.py index 388a86d1e4a..9858bb6641d 100644 --- a/checkov/common/graph/checks_infra/enums.py +++ b/checkov/common/graph/checks_infra/enums.py @@ -20,6 +20,9 @@ class SolverType(str, Enum): FILTER = "FILTER" # Filters results according to specific value / type, i.e. resource type is aws_s3_bucket + RESOURCE = "RESOURCE" + # Used to define allow/deny lists of resource types + class Operators: ANY = 'any' diff --git a/docs/3.Custom Policies/Examples.md b/docs/3.Custom Policies/Examples.md index 342d0ffc04e..b479663e2d4 100644 --- a/docs/3.Custom Policies/Examples.md +++ b/docs/3.Custom Policies/Examples.md @@ -479,3 +479,16 @@ definition: - prod - prod-eu ``` + +## Creating an allow list of resource types + +The following policy only allows resources of type `aws_instance` and `aws_db_instance` to be provisioned. + +```yaml +definition: + cond_type: "resource" + resource_types: + - "aws_instance" + - "aws_db_instance" + operator: "exists" +``` diff --git a/docs/3.Custom Policies/YAML Custom Policies.md b/docs/3.Custom Policies/YAML Custom Policies.md index e774c60e02f..3652f89765d 100644 --- a/docs/3.Custom Policies/YAML Custom Policies.md +++ b/docs/3.Custom Policies/YAML Custom Policies.md @@ -58,7 +58,8 @@ The top level object under `definition` must be a single object (not a list). It ## Types of Definition Blocks * **Attribute Blocks:** The policy describes resources with a certain configuration as defined by a configuration **attribute** and its value (per Terraform), or by the presence/absence of an attribute. -* **Connection State Blocks** - The policy describes resources in a particular **Connection state**; either connected or not connected to another type of resource (for example, a security group). +* **Connection State Blocks:** The policy describes resources in a particular **Connection state**; either connected or not connected to another type of resource (for example, a security group). +* **Resource Type Blocks:** The policy describes resource types that are either allowed or forbidden to use, commonly referred to as allow/deny lists. ### Using AND/OR Logic A policy definition may include multiple blocks (**Attribute**, **Connection state** or both), associated by **AND/OR** logic. @@ -253,6 +254,22 @@ definition: *Note: The condition above uses AND logic. See [additional examples](https://www.checkov.io/3.Custom%20Policies/Examples.html) for complex logic in policy definitions.* +## Resource Type Blocks + +A **Resource Type Block** in a policy's definition indicates that a resource will be compliant/non-complaint depending on the resource type, which is allowed/forbidden. Use the `exist` operator to define an allowlist and the `not_exist` operator to define a blocklist. + +### Resource Type Block Example + +The Resource Type Block in the `definition` in the example below is used to ensure CloudHSM cluster won't be provisioned: + +```yaml +definition: + cond_type: "resource" + resource_types: + - "aws_cloudhsm_v2_cluster" + operator: "not_exists" +``` + ## Using AND/OR Logic The Prisma Cloud platform allows you to combine definition blocks using AND/OR operators. diff --git a/tests/terraform/graph/checks_infra/resource_solvers/__init__.py b/tests/terraform/graph/checks_infra/resource_solvers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/ResourceAllowList.yaml b/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/ResourceAllowList.yaml new file mode 100644 index 00000000000..774200c0c52 --- /dev/null +++ b/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/ResourceAllowList.yaml @@ -0,0 +1,11 @@ +metadata: + name: "example" + category: "GENERAL_SECURITY" + id: "ResourceAllowList" +scope: + provider: "AWS" +definition: + cond_type: "resource" + resource_types: + - "aws_s3_bucket" + operator: "exists" diff --git a/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/__init__.py b/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/test_solver.py b/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/test_solver.py new file mode 100644 index 00000000000..34f637e9314 --- /dev/null +++ b/tests/terraform/graph/checks_infra/resource_solvers/exists_solver/test_solver.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from parameterized import parameterized_class + +from tests.terraform.graph.checks_infra.test_base import TestBaseSolver + +TEST_DIRNAME = Path(__file__).parent + + +@parameterized_class([{"graph_framework": "NETWORKX"}, {"graph_framework": "IGRAPH"}]) +class ExistsSolver(TestBaseSolver): + def setUp(self): + self.checks_dir = str(TEST_DIRNAME) + super().setUp() + + def test_allow_list(self): + # given + root_folder = TEST_DIRNAME.parents[2] / "resources/encryption_test" + check_id = "ResourceAllowList" + should_pass = [ + "aws_s3_bucket.encrypted_bucket", + "aws_s3_bucket.unencrypted_bucket", + ] + should_fail = [ + "aws_rds_cluster.rds_cluster_encrypted", + "aws_rds_cluster.rds_cluster_unencrypted", + "aws_neptune_cluster.encrypted_neptune", + "aws_neptune_cluster.unencrypted_neptune", + ] + expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}} + + # when/then + self.run_test(root_folder=str(root_folder), expected_results=expected_results, check_id=check_id) diff --git a/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/ResourceDenyList.yaml b/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/ResourceDenyList.yaml new file mode 100644 index 00000000000..fa5101eae62 --- /dev/null +++ b/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/ResourceDenyList.yaml @@ -0,0 +1,11 @@ +metadata: + name: "example" + category: "GENERAL_SECURITY" + id: "ResourceDenyList" +scope: + provider: "AWS" +definition: + cond_type: "resource" + resource_types: + - "aws_s3_bucket" + operator: "not_exists" diff --git a/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/__init__.py b/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/test_solver.py b/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/test_solver.py new file mode 100644 index 00000000000..565cc1a1a03 --- /dev/null +++ b/tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/test_solver.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from parameterized import parameterized_class + +from tests.terraform.graph.checks_infra.test_base import TestBaseSolver + +TEST_DIRNAME = Path(__file__).parent + + +@parameterized_class([{"graph_framework": "NETWORKX"}, {"graph_framework": "IGRAPH"}]) +class NotExistsSolver(TestBaseSolver): + def setUp(self): + self.checks_dir = str(TEST_DIRNAME) + super().setUp() + + def test_deny_list(self): + # given + root_folder = TEST_DIRNAME.parents[2] / "resources/encryption_test" + check_id = "ResourceDenyList" + should_pass = [ + "aws_rds_cluster.rds_cluster_encrypted", + "aws_rds_cluster.rds_cluster_unencrypted", + "aws_neptune_cluster.encrypted_neptune", + "aws_neptune_cluster.unencrypted_neptune", + ] + should_fail = [ + "aws_s3_bucket.encrypted_bucket", + "aws_s3_bucket.unencrypted_bucket", + ] + expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}} + + # when/then + self.run_test(root_folder=str(root_folder), expected_results=expected_results, check_id=check_id)