Skip to content

Commit

Permalink
feat: reimplemented collection and increase default max workers
Browse files Browse the repository at this point in the history
  • Loading branch information
1101-1 committed Nov 26, 2024
1 parent 6c1e5e0 commit fc527e2
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 38 deletions.
43 changes: 17 additions & 26 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ def get_last_run() -> Optional[datetime]:
try:
log.info(f"[GCP:{self.project.id}] Collect usage metrics.")
self.collect_usage_metrics(global_builder)
global_builder.executor.wait_for_submitted_work()
except Exception as e:
log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}")
shared_queue.wait_for_submitted_work()
log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
# connect nodes
for node, data in list(self.graph.nodes(data=True)):
Expand All @@ -161,40 +161,31 @@ def get_last_run() -> Optional[datetime]:
log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def collect_usage_metrics(self, builder: GraphBuilder) -> None:
metrics_queries = defaultdict(list)
two_hours = timedelta(hours=2)
thirty_minutes = timedelta(minutes=30)
lookup_map = {}
for resource in builder.graph.nodes:
if not isinstance(resource, GcpResource):
continue
# region can be overridden in the query
if region := cast(GcpRegion, resource.region()):
resource_queries: List[monitoring.GcpMonitoringQuery] = resource.collect_usage_metrics(builder)
if resource_queries:
# set unique GcpMonitoringQuery.ref_id
lookup_map[f"{resource.kind}/{resource.id}/{region.id}"] = resource
for query in resource_queries:
query_region = region
start = builder.metrics_delta
if query.period and query.period < thirty_minutes:
start = min(start, two_hours)
metrics_queries[(query_region, start)].append(query)
for (region, start), queries in metrics_queries.items():

def collect_and_set_metrics(
start: timedelta, region: GcpRegion, queries: List[monitoring.GcpMonitoringQuery]
) -> None:
start_at = builder.created_at - start
try:
result = monitoring.GcpMonitoringMetricData.query_for(
builder.for_region(region), queries, start_at, builder.created_at
)
monitoring.update_resource_metrics(lookup_map, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")

builder.submit_work(collect_and_set_metrics, start, region, queries)
def collect_and_set_metrics(
start: timedelta,
region: GcpRegion,
queries: List[monitoring.GcpMonitoringQuery],
resource: GcpResource,
) -> None:
start_at = builder.created_at - start
try:
result = monitoring.GcpMonitoringMetricData.query_for(
builder.for_region(region), queries, start_at, builder.created_at
)
monitoring.update_resource_metrics(resource, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")

builder.submit_work(collect_and_set_metrics, start, region, resource_queries, resource)

def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
Expand Down
3 changes: 1 addition & 2 deletions plugins/gcp/fix_plugin_gcp/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from fixlib.proc import num_default_threads
from attrs import define, field
from typing import List, ClassVar, Optional

Expand All @@ -17,7 +16,7 @@ class GcpConfig:
metadata={"description": "GCP services to exclude (default: none)"},
)
project_pool_size: int = field(
factory=num_default_threads,
default=64,
metadata={"description": "GCP project thread/process pool size"},
)
fork_process: bool = field(
Expand Down
7 changes: 2 additions & 5 deletions plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from attr import define, field, frozen

from fix_plugin_gcp.resources.base import GraphBuilder
from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource
from fix_plugin_gcp.gcp_client import GcpApiSpec
from fixlib.baseresources import MetricName, MetricUnit, BaseResource, StatName
from fixlib.durations import duration_str
Expand Down Expand Up @@ -230,13 +230,10 @@ def _query_for_chunk(


def update_resource_metrics(
resources_map: Dict[str, V],
resource: GcpResource,
monitoring_metric_result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData],
) -> None:
for query, metric in monitoring_metric_result.items():
resource = resources_map.get(query.ref_id)
if resource is None:
continue
if len(metric.metric_values) == 0:
continue
normalizer = query.normalization
Expand Down
6 changes: 2 additions & 4 deletions plugins/gcp/test/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,9 @@ def test_gcp_instance_usage_metrics(random_builder: GraphBuilder) -> None:

random_builder.region = GcpRegion(id="us-east1", name="us-east1")
queries = gcp_instance.collect_usage_metrics(random_builder)
lookup_map = {}
lookup_map[f"{gcp_instance.kind}/{gcp_instance.id}/{gcp_instance.region().id}"] = gcp_instance

# simulates the `collect_usage_metrics` method found in `GcpAccountCollector`.
def collect_and_set_metrics(start_at: datetime, region: GcpRegion, queries: List[GcpMonitoringQuery]) -> None:
def collect_and_set_metrics(start_at: datetime, _: GcpRegion, queries: List[GcpMonitoringQuery]) -> None:
with ThreadPoolExecutor(max_workers=1) as executor:
queue = ExecutorQueue(executor, tasks_per_key=lambda _: 1, name="test")
g_builder = GraphBuilder(
Expand All @@ -204,7 +202,7 @@ def collect_and_set_metrics(start_at: datetime, region: GcpRegion, queries: List
last_run_started_at=random_builder.last_run_started_at,
)
result = GcpMonitoringMetricData.query_for(g_builder, queries, start_at, start_at + timedelta(hours=2))
update_resource_metrics(lookup_map, result)
update_resource_metrics(gcp_instance, result)

start = datetime(2020, 5, 30, 15, 45, 30)

Expand Down
2 changes: 1 addition & 1 deletion plugins/gcp/test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ def test_args() -> None:
assert len(Config.gcp.project) == 0
assert len(Config.gcp.collect) == 0
assert len(Config.gcp.no_collect) == 0
assert Config.gcp.project_pool_size == num_default_threads()
assert Config.gcp.project_pool_size == 64
assert Config.gcp.fork_process is True

0 comments on commit fc527e2

Please sign in to comment.