forked from bridgecrewio/checkov
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(graph): support creation of resource type allow/deny lists (brid…
…gecrewio#6451) * support creation of resource type allow/deny lists * update docs Co-authored-by: Taylor <[email protected]> * change igraph to networkx * fix test failing * remove forbidden import * fix mypy issues --------- Co-authored-by: gruebel <[email protected]> Co-authored-by: Taylor <[email protected]> Co-authored-by: Max Amelchenko <[email protected]>
- Loading branch information
1 parent
c3a8a04
commit ff615d6
Showing
15 changed files
with
228 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 2 additions & 0 deletions
2
checkov/common/checks_infra/solvers/resource_solvers/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from checkov.common.checks_infra.solvers.resource_solvers.not_exists_resource_solver import ExistsResourcerSolver # noqa | ||
from checkov.common.checks_infra.solvers.resource_solvers.not_exists_resource_solver import NotExistsResourcerSolver # noqa |
68 changes: 68 additions & 0 deletions
68
checkov/common/checks_infra/solvers/resource_solvers/base_resource_solver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from __future__ import annotations | ||
|
||
from abc import abstractmethod | ||
from typing import Any, Callable, TYPE_CHECKING | ||
|
||
from networkx import DiGraph | ||
|
||
import concurrent.futures | ||
|
||
from concurrent.futures import ThreadPoolExecutor | ||
|
||
from checkov.common.graph.checks_infra.enums import SolverType | ||
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver | ||
from checkov.common.graph.graph_builder import CustomAttributes | ||
|
||
if TYPE_CHECKING: | ||
from checkov.common.typing import LibraryGraph | ||
|
||
|
||
class BaseResourceSolver(BaseSolver): | ||
def __init__(self, resource_types: list[str]) -> None: | ||
super().__init__(SolverType.RESOURCE) | ||
self.resource_types = resource_types | ||
self.vertices: list[dict[str, Any]] = [] | ||
|
||
@abstractmethod | ||
def get_operation(self, resource_type: str) -> bool: | ||
raise NotImplementedError() | ||
|
||
def _get_operation(self, *args: Any, **kwargs: Any) -> Callable[..., bool]: | ||
# not needed | ||
return lambda: True | ||
|
||
def run( | ||
self, graph_connector: LibraryGraph | ||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: | ||
executer = ThreadPoolExecutor() | ||
jobs = [] | ||
passed_vertices: list[dict[str, Any]] = [] | ||
failed_vertices: list[dict[str, Any]] = [] | ||
unknown_vertices: list[dict[str, Any]] = [] | ||
|
||
if isinstance(graph_connector, DiGraph): | ||
for _, data in graph_connector.nodes(data=True): | ||
jobs.append(executer.submit(self._process_node, data, passed_vertices, failed_vertices, unknown_vertices)) | ||
|
||
concurrent.futures.wait(jobs) | ||
return passed_vertices, failed_vertices, unknown_vertices | ||
|
||
for _, data in graph_connector.nodes(): | ||
result = self.get_operation(resource_type=data.get(CustomAttributes.RESOURCE_TYPE)) | ||
if result: | ||
passed_vertices.append(data) | ||
else: | ||
failed_vertices.append(data) | ||
|
||
return passed_vertices, failed_vertices, [] | ||
|
||
def _process_node(self, data: dict[str, str], passed_vartices: list[dict[str, Any]], | ||
failed_vertices: list[dict[str, Any]], unknown_vertices: list[dict[str, Any]]) -> None: | ||
result = self.get_operation(data.get(CustomAttributes.RESOURCE_TYPE)) # type:ignore[arg-type] | ||
# A None indicate for UNKNOWN result - the vertex shouldn't be added to the passed or the failed vertices | ||
if result is None: | ||
unknown_vertices.append(data) | ||
elif result: | ||
passed_vartices.append(data) | ||
else: | ||
failed_vertices.append(data) |
12 changes: 12 additions & 0 deletions
12
checkov/common/checks_infra/solvers/resource_solvers/exists_resource_solver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from __future__ import annotations | ||
|
||
|
||
from checkov.common.graph.checks_infra.enums import Operators | ||
from checkov.common.checks_infra.solvers.resource_solvers.base_resource_solver import BaseResourceSolver | ||
|
||
|
||
class ExistsResourcerSolver(BaseResourceSolver): | ||
operator = Operators.EXISTS # noqa: CCE003 # a static attribute | ||
|
||
def get_operation(self, resource_type: str | None) -> bool: | ||
return resource_type in self.resource_types |
13 changes: 13 additions & 0 deletions
13
checkov/common/checks_infra/solvers/resource_solvers/not_exists_resource_solver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
|
||
from checkov.common.checks_infra.solvers.resource_solvers.exists_resource_solver import ExistsResourcerSolver | ||
from checkov.common.graph.checks_infra.enums import Operators | ||
|
||
|
||
class NotExistsResourcerSolver(ExistsResourcerSolver): | ||
operator = Operators.NOT_EXISTS # noqa: CCE003 # a static attribute | ||
|
||
def get_operation(self, *args: Any, **kwargs: Any) -> bool: | ||
return not super().get_operation(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
11 changes: 11 additions & 0 deletions
11
tests/terraform/graph/checks_infra/resource_solvers/exists_solver/ResourceAllowList.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
metadata: | ||
name: "example" | ||
category: "GENERAL_SECURITY" | ||
id: "ResourceAllowList" | ||
scope: | ||
provider: "AWS" | ||
definition: | ||
cond_type: "resource" | ||
resource_types: | ||
- "aws_s3_bucket" | ||
operator: "exists" |
Empty file.
33 changes: 33 additions & 0 deletions
33
tests/terraform/graph/checks_infra/resource_solvers/exists_solver/test_solver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from pathlib import Path | ||
|
||
from parameterized import parameterized_class | ||
|
||
from tests.terraform.graph.checks_infra.test_base import TestBaseSolver | ||
|
||
TEST_DIRNAME = Path(__file__).parent | ||
|
||
|
||
@parameterized_class([{"graph_framework": "NETWORKX"}, {"graph_framework": "IGRAPH"}]) | ||
class ExistsSolver(TestBaseSolver): | ||
def setUp(self): | ||
self.checks_dir = str(TEST_DIRNAME) | ||
super().setUp() | ||
|
||
def test_allow_list(self): | ||
# given | ||
root_folder = TEST_DIRNAME.parents[2] / "resources/encryption_test" | ||
check_id = "ResourceAllowList" | ||
should_pass = [ | ||
"aws_s3_bucket.encrypted_bucket", | ||
"aws_s3_bucket.unencrypted_bucket", | ||
] | ||
should_fail = [ | ||
"aws_rds_cluster.rds_cluster_encrypted", | ||
"aws_rds_cluster.rds_cluster_unencrypted", | ||
"aws_neptune_cluster.encrypted_neptune", | ||
"aws_neptune_cluster.unencrypted_neptune", | ||
] | ||
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}} | ||
|
||
# when/then | ||
self.run_test(root_folder=str(root_folder), expected_results=expected_results, check_id=check_id) |
11 changes: 11 additions & 0 deletions
11
tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/ResourceDenyList.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
metadata: | ||
name: "example" | ||
category: "GENERAL_SECURITY" | ||
id: "ResourceDenyList" | ||
scope: | ||
provider: "AWS" | ||
definition: | ||
cond_type: "resource" | ||
resource_types: | ||
- "aws_s3_bucket" | ||
operator: "not_exists" |
Empty file.
33 changes: 33 additions & 0 deletions
33
tests/terraform/graph/checks_infra/resource_solvers/not_exists_solver/test_solver.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from pathlib import Path | ||
|
||
from parameterized import parameterized_class | ||
|
||
from tests.terraform.graph.checks_infra.test_base import TestBaseSolver | ||
|
||
TEST_DIRNAME = Path(__file__).parent | ||
|
||
|
||
@parameterized_class([{"graph_framework": "NETWORKX"}, {"graph_framework": "IGRAPH"}]) | ||
class NotExistsSolver(TestBaseSolver): | ||
def setUp(self): | ||
self.checks_dir = str(TEST_DIRNAME) | ||
super().setUp() | ||
|
||
def test_deny_list(self): | ||
# given | ||
root_folder = TEST_DIRNAME.parents[2] / "resources/encryption_test" | ||
check_id = "ResourceDenyList" | ||
should_pass = [ | ||
"aws_rds_cluster.rds_cluster_encrypted", | ||
"aws_rds_cluster.rds_cluster_unencrypted", | ||
"aws_neptune_cluster.encrypted_neptune", | ||
"aws_neptune_cluster.unencrypted_neptune", | ||
] | ||
should_fail = [ | ||
"aws_s3_bucket.encrypted_bucket", | ||
"aws_s3_bucket.unencrypted_bucket", | ||
] | ||
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}} | ||
|
||
# when/then | ||
self.run_test(root_folder=str(root_folder), expected_results=expected_results, check_id=check_id) |