diff --git a/plugins/gcp/fix_plugin_gcp/collector.py b/plugins/gcp/fix_plugin_gcp/collector.py index 0bb4113dd5..0376a5afc9 100644 --- a/plugins/gcp/fix_plugin_gcp/collector.py +++ b/plugins/gcp/fix_plugin_gcp/collector.py @@ -8,7 +8,7 @@ from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone from fix_plugin_gcp.utils import Credentials from fixlib.baseresources import Cloud -from fixlib.core.actions import CoreFeedback +from fixlib.core.actions import CoreFeedback, ErrorAccumulator from fixlib.graph import Graph log = logging.getLogger("fix.plugins.gcp") @@ -51,6 +51,7 @@ def __init__( self.cloud = cloud self.project = project self.core_feedback = core_feedback + self.error_accumulator = ErrorAccumulator() self.graph = Graph(root=self.project, max_nodes=max_resources_per_account) self.credentials = Credentials.get(self.project.id) @@ -58,6 +59,7 @@ def collect(self) -> None: with ThreadPoolExecutor( thread_name_prefix=f"gcp_{self.project.id}", max_workers=self.config.project_pool_size ) as executor: + self.core_feedback.progress_done(self.project.id, 0, 1, context=[self.cloud.id]) # The shared executor is used to parallelize the collection of resources "as fast as possible" # It should only be used in scenarios, where it is safe to do so. # This executor is shared between all regions. @@ -70,6 +72,7 @@ def collect(self) -> None: self.credentials, shared_queue, self.core_feedback, + self.error_accumulator, project_global_region, ) global_builder.add_node(project_global_region, {}) @@ -95,6 +98,8 @@ def collect(self) -> None: global_builder.submit_work(self.collect_region, region, global_builder.for_region(region)) global_builder.executor.wait_for_submitted_work() + self.error_accumulator.report_all(global_builder.core_feedback) + log.info(f"[GCP:{self.project.id}] Connect resources and create edges.") # connect nodes for node, data in list(self.graph.nodes(data=True)): @@ -110,6 +115,7 @@ def collect(self) -> None: if isinstance(node, GcpResource): node.post_process_instance(global_builder, data.get("source", {})) + self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id]) log.info(f"[GCP:{self.project.id}] Collecting resources done.") def remove_unconnected_nodes(self): diff --git a/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py b/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py index ff51ae763a..886f44264d 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py +++ b/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py @@ -35,14 +35,15 @@ class AIPlatformRegionFilter: def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]: # Default behavior: in case the class has an ApiSpec, call the api and call collect. if issubclass(cls, GcpResource): - if kwargs: - log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind} with ({kwargs})") - else: - log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}") + region_name = "global" if not builder.region else builder.region.safe_name + log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}") if spec := cls.api_spec: expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"} with GcpErrorHandler( - builder.core_feedback, + spec.action, + builder.error_accumulator, + spec.service, + builder.region.safe_name if builder.region else None, expected_errors, f" in {builder.project.id} kind {cls.kind}", expected_message_substrings, diff --git a/plugins/gcp/fix_plugin_gcp/resources/base.py b/plugins/gcp/fix_plugin_gcp/resources/base.py index cd590b0a80..e2402d8c71 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/base.py +++ b/plugins/gcp/fix_plugin_gcp/resources/base.py @@ -23,7 +23,7 @@ ModelReference, ) from fixlib.config import Config -from fixlib.core.actions import CoreFeedback +from fixlib.core.actions import CoreFeedback, ErrorAccumulator from fixlib.graph import Graph, EdgeKey from fixlib.json import from_json as from_js, value_in_path from fixlib.json_bender import bend, Bender, S, Bend, MapDict, F @@ -78,6 +78,7 @@ def __init__( credentials: GoogleAuthCredentials, executor: ExecutorQueue, core_feedback: CoreFeedback, + error_accumulator: ErrorAccumulator, fallback_global_region: GcpRegion, region: Optional[GcpRegion] = None, graph_nodes_access: Optional[Lock] = None, @@ -87,12 +88,11 @@ def __init__( self.cloud = cloud self.region = region self.project = project - self.client = GcpClient( - credentials, project_id=project.id, region=region.name if region else None, core_feedback=core_feedback - ) + self.client = GcpClient(credentials, project_id=project.id, region=region.name if region else None) self.executor = executor self.name = f"GCP:{project.name}" self.core_feedback = core_feedback + self.error_accumulator = error_accumulator self.fallback_global_region = fallback_global_region self.region_by_name: Dict[str, GcpRegion] = {} self.region_by_zone_name: Dict[str, GcpRegion] = {} @@ -272,6 +272,7 @@ def for_region(self, region: GcpRegion) -> GraphBuilder: self.client.credentials, self.executor, self.core_feedback, + self.error_accumulator, self.fallback_global_region, region, self.graph_nodes_access, @@ -381,7 +382,14 @@ def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: A log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}") if spec := cls.api_spec: expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) - with GcpErrorHandler(builder.core_feedback, expected_errors, f" in {builder.project.id} kind {cls.kind}"): + with GcpErrorHandler( + spec.action, + builder.error_accumulator, + spec.service, + builder.region.safe_name if builder.region else None, + expected_errors, + f" in {builder.project.id} kind {cls.kind}", + ): items = builder.client.list(spec, **kwargs) resources = cls.collect(items, builder) log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}") @@ -610,12 +618,18 @@ class GcpZone(GcpResource, BaseZone): class GcpErrorHandler: def __init__( self, - core_feedback: CoreFeedback, + action: str, + error_accumulator: ErrorAccumulator, + service: str, + region: Optional[str], expected_errors: Set[str], extra_info: str = "", expected_message_substrings: Optional[Set[str]] = None, ) -> None: - self.core_feedback = core_feedback + self.action = action + self.error_accumulator = error_accumulator + self.service = service + self.region = region self.extra_info = extra_info self.expected_errors = expected_errors self.expected_message_substrings = expected_message_substrings @@ -649,31 +663,39 @@ def __exit__( if self.expected_message_substrings: for substring in self.expected_message_substrings: if substring in error_details: - log.info(f"Ignoring expected HttpError in {self.extra_info}: {error_details}.") return True # Suppress the exception except Exception as ex: errors = {f"ParseError:unknown:{ex}"} error_summary = " Error Codes: " + (", ".join(errors)) if errors else "" if errors and errors.issubset(self.expected_errors): - log.info( + log.debug( f"Expected Exception while collecting{self.extra_info} ({exc_type.__name__}): " f"{error_details}{error_summary}. Ignore." ) return True if not Config.gcp.discard_account_on_resource_error: - self.core_feedback.error( - f"Error while collecting{self.extra_info} ({exc_type.__name__}): " f"{error_details}{error_summary}", - log, + if exc_type is HttpError and isinstance(exc_value, HttpError): + if exc_value.resp.status == 403: + self.error_accumulator.add_error( + as_info=False, + error_kind="AccessDenied", + service=self.service, + action=self.action, + message=f"Access denied: {error_details}", + region=None, + ) + return True + + self.error_accumulator.add_error( + as_info=False, + error_kind=exc_type.__name__, + service=self.service, + action=self.action, + message=f"Error while collecting{self.extra_info}: {error_details}{error_summary}", + region=self.region, ) return True - if exc_type is HttpError and isinstance(exc_value, HttpError): - if exc_value.resp.status == 403: - self.core_feedback.error( - f"Access denied{self.extra_info}: {error_details} Error Codes: {error_summary}", log - ) - return True - return False diff --git a/plugins/gcp/test/conftest.py b/plugins/gcp/test/conftest.py index 15a0d333cc..ec6b291b83 100644 --- a/plugins/gcp/test/conftest.py +++ b/plugins/gcp/test/conftest.py @@ -11,7 +11,7 @@ from fix_plugin_gcp.resources.base import GcpRegion, GraphBuilder, GcpProject from fixlib.baseresources import Cloud from fixlib.config import Config -from fixlib.core.actions import CoreFeedback +from fixlib.core.actions import CoreFeedback, ErrorAccumulator from fixlib.graph import Graph from fixlib.threading import ExecutorQueue from .random_client import build_random_data_client, random_predefined @@ -27,10 +27,13 @@ def random_builder() -> Iterator[GraphBuilder]: gcp_client._discovery_function = build_random_data_client queue = ExecutorQueue(executor, "dummy") feedback = CoreFeedback("test", "test", "test", Queue()) + accumulator = ErrorAccumulator() project = GcpProject(id="test") project_global_region = GcpRegion.fallback_global_region(project) credentials = AnonymousCredentials() # type: ignore - builder = GraphBuilder(Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, project_global_region) + builder = GraphBuilder( + Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, accumulator, project_global_region + ) builder.add_node(project_global_region, {}) # add predefined regions and zones for predefined in random_predefined: