diff --git a/plugins/gcp/fix_plugin_gcp/collector.py b/plugins/gcp/fix_plugin_gcp/collector.py index 2635b3210..7e03b1c58 100644 --- a/plugins/gcp/fix_plugin_gcp/collector.py +++ b/plugins/gcp/fix_plugin_gcp/collector.py @@ -139,9 +139,9 @@ def get_last_run() -> Optional[datetime]: try: log.info(f"[GCP:{self.project.id}] Collect usage metrics.") self.collect_usage_metrics(global_builder) + global_builder.executor.wait_for_submitted_work() except Exception as e: log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}") - shared_queue.wait_for_submitted_work() log.info(f"[GCP:{self.project.id}] Connect resources and create edges.") # connect nodes for node, data in list(self.graph.nodes(data=True)): @@ -161,10 +161,6 @@ def get_last_run() -> Optional[datetime]: log.info(f"[GCP:{self.project.id}] Collecting resources done.") def collect_usage_metrics(self, builder: GraphBuilder) -> None: - metrics_queries = defaultdict(list) - two_hours = timedelta(hours=2) - thirty_minutes = timedelta(minutes=30) - lookup_map = {} for resource in builder.graph.nodes: if not isinstance(resource, GcpResource): continue @@ -172,29 +168,24 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> None: if region := cast(GcpRegion, resource.region()): resource_queries: List[monitoring.GcpMonitoringQuery] = resource.collect_usage_metrics(builder) if resource_queries: - # set unique GcpMonitoringQuery.ref_id - lookup_map[f"{resource.kind}/{resource.id}/{region.id}"] = resource - for query in resource_queries: - query_region = region start = builder.metrics_delta - if query.period and query.period < thirty_minutes: - start = min(start, two_hours) - metrics_queries[(query_region, start)].append(query) - for (region, start), queries in metrics_queries.items(): - - def collect_and_set_metrics( - start: timedelta, region: GcpRegion, queries: List[monitoring.GcpMonitoringQuery] - ) -> None: - start_at = builder.created_at - start - try: - result = monitoring.GcpMonitoringMetricData.query_for( - builder.for_region(region), queries, start_at, builder.created_at - ) - monitoring.update_resource_metrics(lookup_map, result) - except Exception as e: - log.warning(f"Error occurred in region {region}: {e}") - builder.submit_work(collect_and_set_metrics, start, region, queries) + def collect_and_set_metrics( + start: timedelta, + region: GcpRegion, + queries: List[monitoring.GcpMonitoringQuery], + resource: GcpResource, + ) -> None: + start_at = builder.created_at - start + try: + result = monitoring.GcpMonitoringMetricData.query_for( + builder.for_region(region), queries, start_at, builder.created_at + ) + monitoring.update_resource_metrics(resource, result) + except Exception as e: + log.warning(f"Error occurred in region {region}: {e}") + + builder.submit_work(collect_and_set_metrics, start, region, resource_queries, resource) def remove_unconnected_nodes(self, builder: GraphBuilder) -> None: def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None: diff --git a/plugins/gcp/fix_plugin_gcp/config.py b/plugins/gcp/fix_plugin_gcp/config.py index afb046399..e7373ccc7 100644 --- a/plugins/gcp/fix_plugin_gcp/config.py +++ b/plugins/gcp/fix_plugin_gcp/config.py @@ -1,4 +1,3 @@ -from fixlib.proc import num_default_threads from attrs import define, field from typing import List, ClassVar, Optional @@ -17,7 +16,7 @@ class GcpConfig: metadata={"description": "GCP services to exclude (default: none)"}, ) project_pool_size: int = field( - factory=num_default_threads, + default=64, metadata={"description": "GCP project thread/process pool size"}, ) fork_process: bool = field( diff --git a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py index 7053ccba4..e38f86956 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py +++ b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py @@ -7,7 +7,7 @@ from attr import define, field, frozen -from fix_plugin_gcp.resources.base import GraphBuilder +from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource from fix_plugin_gcp.gcp_client import GcpApiSpec from fixlib.baseresources import MetricName, MetricUnit, BaseResource, StatName from fixlib.durations import duration_str @@ -230,13 +230,10 @@ def _query_for_chunk( def update_resource_metrics( - resources_map: Dict[str, V], + resource: GcpResource, monitoring_metric_result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData], ) -> None: for query, metric in monitoring_metric_result.items(): - resource = resources_map.get(query.ref_id) - if resource is None: - continue if len(metric.metric_values) == 0: continue normalizer = query.normalization diff --git a/plugins/gcp/test/test_compute.py b/plugins/gcp/test/test_compute.py index c306f8119..ce18bb3ad 100644 --- a/plugins/gcp/test/test_compute.py +++ b/plugins/gcp/test/test_compute.py @@ -184,11 +184,9 @@ def test_gcp_instance_usage_metrics(random_builder: GraphBuilder) -> None: random_builder.region = GcpRegion(id="us-east1", name="us-east1") queries = gcp_instance.collect_usage_metrics(random_builder) - lookup_map = {} - lookup_map[f"{gcp_instance.kind}/{gcp_instance.id}/{gcp_instance.region().id}"] = gcp_instance # simulates the `collect_usage_metrics` method found in `GcpAccountCollector`. - def collect_and_set_metrics(start_at: datetime, region: GcpRegion, queries: List[GcpMonitoringQuery]) -> None: + def collect_and_set_metrics(start_at: datetime, _: GcpRegion, queries: List[GcpMonitoringQuery]) -> None: with ThreadPoolExecutor(max_workers=1) as executor: queue = ExecutorQueue(executor, tasks_per_key=lambda _: 1, name="test") g_builder = GraphBuilder( @@ -204,7 +202,7 @@ def collect_and_set_metrics(start_at: datetime, region: GcpRegion, queries: List last_run_started_at=random_builder.last_run_started_at, ) result = GcpMonitoringMetricData.query_for(g_builder, queries, start_at, start_at + timedelta(hours=2)) - update_resource_metrics(lookup_map, result) + update_resource_metrics(gcp_instance, result) start = datetime(2020, 5, 30, 15, 45, 30) diff --git a/plugins/gcp/test/test_config.py b/plugins/gcp/test/test_config.py index 5fc9cae0b..7ebac1252 100644 --- a/plugins/gcp/test/test_config.py +++ b/plugins/gcp/test/test_config.py @@ -11,5 +11,5 @@ def test_args() -> None: assert len(Config.gcp.project) == 0 assert len(Config.gcp.collect) == 0 assert len(Config.gcp.no_collect) == 0 - assert Config.gcp.project_pool_size == num_default_threads() + assert Config.gcp.project_pool_size == 64 assert Config.gcp.fork_process is True