Skip to content

Commit

Permalink
[gcp][fix]: Deduplicate error messages in accumulator (#2223)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Veit <[email protected]>
  • Loading branch information
1101-1 and aquamatthias authored Oct 7, 2024
1 parent e27feda commit e0f8435
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
8 changes: 7 additions & 1 deletion plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
from fix_plugin_gcp.utils import Credentials
from fixlib.baseresources import Cloud
from fixlib.core.actions import CoreFeedback
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph

log = logging.getLogger("fix.plugins.gcp")
Expand Down Expand Up @@ -51,13 +51,15 @@ def __init__(
self.cloud = cloud
self.project = project
self.core_feedback = core_feedback
self.error_accumulator = ErrorAccumulator()
self.graph = Graph(root=self.project, max_nodes=max_resources_per_account)
self.credentials = Credentials.get(self.project.id)

def collect(self) -> None:
with ThreadPoolExecutor(
thread_name_prefix=f"gcp_{self.project.id}", max_workers=self.config.project_pool_size
) as executor:
self.core_feedback.progress_done(self.project.id, 0, 1, context=[self.cloud.id])
# The shared executor is used to parallelize the collection of resources "as fast as possible"
# It should only be used in scenarios, where it is safe to do so.
# This executor is shared between all regions.
Expand All @@ -70,6 +72,7 @@ def collect(self) -> None:
self.credentials,
shared_queue,
self.core_feedback,
self.error_accumulator,
project_global_region,
)
global_builder.add_node(project_global_region, {})
Expand All @@ -95,6 +98,8 @@ def collect(self) -> None:
global_builder.submit_work(self.collect_region, region, global_builder.for_region(region))
global_builder.executor.wait_for_submitted_work()

self.error_accumulator.report_all(global_builder.core_feedback)

log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
# connect nodes
for node, data in list(self.graph.nodes(data=True)):
Expand All @@ -110,6 +115,7 @@ def collect(self) -> None:
if isinstance(node, GcpResource):
node.post_process_instance(global_builder, data.get("source", {}))

self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id])
log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def remove_unconnected_nodes(self):
Expand Down
11 changes: 6 additions & 5 deletions plugins/gcp/fix_plugin_gcp/resources/aiplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ class AIPlatformRegionFilter:
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
if issubclass(cls, GcpResource):
if kwargs:
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind} with ({kwargs})")
else:
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}")
region_name = "global" if not builder.region else builder.region.safe_name
log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}")
if spec := cls.api_spec:
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"}
with GcpErrorHandler(
builder.core_feedback,
spec.action,
builder.error_accumulator,
spec.service,
builder.region.safe_name if builder.region else None,
expected_errors,
f" in {builder.project.id} kind {cls.kind}",
expected_message_substrings,
Expand Down
60 changes: 41 additions & 19 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
ModelReference,
)
from fixlib.config import Config
from fixlib.core.actions import CoreFeedback
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph, EdgeKey
from fixlib.json import from_json as from_js, value_in_path
from fixlib.json_bender import bend, Bender, S, Bend, MapDict, F
Expand Down Expand Up @@ -78,6 +78,7 @@ def __init__(
credentials: GoogleAuthCredentials,
executor: ExecutorQueue,
core_feedback: CoreFeedback,
error_accumulator: ErrorAccumulator,
fallback_global_region: GcpRegion,
region: Optional[GcpRegion] = None,
graph_nodes_access: Optional[Lock] = None,
Expand All @@ -87,12 +88,11 @@ def __init__(
self.cloud = cloud
self.region = region
self.project = project
self.client = GcpClient(
credentials, project_id=project.id, region=region.name if region else None, core_feedback=core_feedback
)
self.client = GcpClient(credentials, project_id=project.id, region=region.name if region else None)
self.executor = executor
self.name = f"GCP:{project.name}"
self.core_feedback = core_feedback
self.error_accumulator = error_accumulator
self.fallback_global_region = fallback_global_region
self.region_by_name: Dict[str, GcpRegion] = {}
self.region_by_zone_name: Dict[str, GcpRegion] = {}
Expand Down Expand Up @@ -272,6 +272,7 @@ def for_region(self, region: GcpRegion) -> GraphBuilder:
self.client.credentials,
self.executor,
self.core_feedback,
self.error_accumulator,
self.fallback_global_region,
region,
self.graph_nodes_access,
Expand Down Expand Up @@ -381,7 +382,14 @@ def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: A
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}")
if spec := cls.api_spec:
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set())
with GcpErrorHandler(builder.core_feedback, expected_errors, f" in {builder.project.id} kind {cls.kind}"):
with GcpErrorHandler(
spec.action,
builder.error_accumulator,
spec.service,
builder.region.safe_name if builder.region else None,
expected_errors,
f" in {builder.project.id} kind {cls.kind}",
):
items = builder.client.list(spec, **kwargs)
resources = cls.collect(items, builder)
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
Expand Down Expand Up @@ -610,12 +618,18 @@ class GcpZone(GcpResource, BaseZone):
class GcpErrorHandler:
def __init__(
self,
core_feedback: CoreFeedback,
action: str,
error_accumulator: ErrorAccumulator,
service: str,
region: Optional[str],
expected_errors: Set[str],
extra_info: str = "",
expected_message_substrings: Optional[Set[str]] = None,
) -> None:
self.core_feedback = core_feedback
self.action = action
self.error_accumulator = error_accumulator
self.service = service
self.region = region
self.extra_info = extra_info
self.expected_errors = expected_errors
self.expected_message_substrings = expected_message_substrings
Expand Down Expand Up @@ -649,31 +663,39 @@ def __exit__(
if self.expected_message_substrings:
for substring in self.expected_message_substrings:
if substring in error_details:
log.info(f"Ignoring expected HttpError in {self.extra_info}: {error_details}.")
return True # Suppress the exception
except Exception as ex:
errors = {f"ParseError:unknown:{ex}"}
error_summary = " Error Codes: " + (", ".join(errors)) if errors else ""

if errors and errors.issubset(self.expected_errors):
log.info(
log.debug(
f"Expected Exception while collecting{self.extra_info} ({exc_type.__name__}): "
f"{error_details}{error_summary}. Ignore."
)
return True

if not Config.gcp.discard_account_on_resource_error:
self.core_feedback.error(
f"Error while collecting{self.extra_info} ({exc_type.__name__}): " f"{error_details}{error_summary}",
log,
if exc_type is HttpError and isinstance(exc_value, HttpError):
if exc_value.resp.status == 403:
self.error_accumulator.add_error(
as_info=False,
error_kind="AccessDenied",
service=self.service,
action=self.action,
message=f"Access denied: {error_details}",
region=None,
)
return True

self.error_accumulator.add_error(
as_info=False,
error_kind=exc_type.__name__,
service=self.service,
action=self.action,
message=f"Error while collecting{self.extra_info}: {error_details}{error_summary}",
region=self.region,
)
return True

if exc_type is HttpError and isinstance(exc_value, HttpError):
if exc_value.resp.status == 403:
self.core_feedback.error(
f"Access denied{self.extra_info}: {error_details} Error Codes: {error_summary}", log
)
return True

return False
7 changes: 5 additions & 2 deletions plugins/gcp/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from fix_plugin_gcp.resources.base import GcpRegion, GraphBuilder, GcpProject
from fixlib.baseresources import Cloud
from fixlib.config import Config
from fixlib.core.actions import CoreFeedback
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph
from fixlib.threading import ExecutorQueue
from .random_client import build_random_data_client, random_predefined
Expand All @@ -27,10 +27,13 @@ def random_builder() -> Iterator[GraphBuilder]:
gcp_client._discovery_function = build_random_data_client
queue = ExecutorQueue(executor, "dummy")
feedback = CoreFeedback("test", "test", "test", Queue())
accumulator = ErrorAccumulator()
project = GcpProject(id="test")
project_global_region = GcpRegion.fallback_global_region(project)
credentials = AnonymousCredentials() # type: ignore
builder = GraphBuilder(Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, project_global_region)
builder = GraphBuilder(
Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, accumulator, project_global_region
)
builder.add_node(project_global_region, {})
# add predefined regions and zones
for predefined in random_predefined:
Expand Down

0 comments on commit e0f8435

Please sign in to comment.