Skip to content

Commit

Permalink
simplify and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Nov 27, 2024
1 parent 7a21031 commit 6d50044
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 132 deletions.
39 changes: 14 additions & 25 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta, timezone
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from typing import Type, List, Any, Optional, cast

from fix_plugin_gcp.config import GcpConfig
Expand Down Expand Up @@ -161,30 +161,19 @@ def get_last_run() -> Optional[datetime]:

def collect_usage_metrics(self, builder: GraphBuilder) -> None:
for resource in builder.graph.nodes:
if not isinstance(resource, GcpResource):
continue
# region can be overridden in the query
if region := cast(GcpRegion, resource.region()):
resource_queries: List[monitoring.GcpMonitoringQuery] = resource.collect_usage_metrics(builder)
if resource_queries:
start = builder.metrics_delta

def collect_and_set_metrics(
start: timedelta,
region: GcpRegion,
queries: List[monitoring.GcpMonitoringQuery],
resource: GcpResource,
) -> None:
start_at = builder.created_at - start
try:
result = monitoring.GcpMonitoringMetricData.query_for(
builder.for_region(region), queries, start_at, builder.created_at
)
monitoring.update_resource_metrics(resource, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")

builder.submit_work(collect_and_set_metrics, start, region, resource_queries, resource)
if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)):

def collect_and_set_metrics() -> None:
start_at = builder.created_at - builder.metrics_delta
region = cast(GcpRegion, resource.region())
try:
rb = builder.for_region(region)
result = monitoring.GcpMonitoringMetricData.query_for(rb, mq, start_at, builder.created_at)
monitoring.update_resource_metrics(resource, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")

builder.submit_work(collect_and_set_metrics)

def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
Expand Down
63 changes: 57 additions & 6 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from typing import Callable, List, ClassVar, Optional, TypeVar, Type, Any, Dict, Set, Tuple

from attr import define, field
from attrs import frozen
from frozendict import frozendict
from google.auth.credentials import Credentials as GoogleAuthCredentials
from googleapiclient.errors import HttpError

Expand All @@ -24,6 +26,9 @@
BaseZone,
ModelReference,
PhantomBaseResource,
MetricName,
MetricUnit,
StatName,
)
from fixlib.config import Config
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
Expand Down Expand Up @@ -204,22 +209,21 @@ def _standard_edges(self, node: GcpResourceType, source: Optional[Json] = None)
self.add_edge(node, node=node._region, reverse=True)
return True

parts = node.id.split("/")
parts = node.id.split("/", maxsplit=4)
if len(parts) > 3 and parts[0] == "projects":
location_types = ["locations", "zones", "regions"]
if parts[2] in location_types:
if parts[2] in ["locations", "zones", "regions"]:
location_name = parts[3]
# Check for zone first
if zone := self.zone_by_name.get(location_name):
node._zone = zone
node._region = self.region_by_zone_name.get(zone.id)
self.add_edge(node, node=zone, reverse=True)
self.add_edge(zone, node=node)
return True

# Then check for region
if region := self.region_by_name.get(location_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
self.add_edge(region, node=node)
return True

if source is not None:
Expand Down Expand Up @@ -333,6 +337,53 @@ def for_region(self, region: GcpRegion) -> GraphBuilder:
)


@frozen(kw_only=True)
class MetricNormalization:
unit: MetricUnit
stat_map: frozendict[str, StatName] = frozendict(
{"ALIGN_MIN": StatName.min, "ALIGN_MEAN": StatName.avg, "ALIGN_MAX": StatName.max}
)
normalize_value: Callable[[float], float] = lambda x: x
compute_stats: Callable[[List[float]], List[Tuple[float, Optional[StatName]]]] = lambda x: [(sum(x) / len(x), None)]


@define(hash=True, frozen=True)
class GcpMonitoringQuery:
metric_name: MetricName # final name of the metric
query_name: str # name of the metric (e.g., GCP metric type)
period: timedelta # period of the metric
metric_id: str # unique metric identifier
stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN
project_id: str # GCP project name
normalization: Optional[MetricNormalization] # normalization info
metric_filters: frozendict[str, str] # filters for the metric

@staticmethod
def create(
*,
query_name: str,
period: timedelta,
ref_id: str,
metric_name: MetricName,
stat: str,
project_id: str,
metric_filters: Dict[str, str],
normalization: Optional[MetricNormalization] = None,
) -> "GcpMonitoringQuery":
filter_suffix = "/" + "/".join(f"{key}={value}" for key, value in sorted(metric_filters.items()))
metric_id = f"{query_name}/{ref_id}/{stat}{filter_suffix}"
return GcpMonitoringQuery(
metric_name=metric_name,
query_name=query_name,
period=period,
metric_id=metric_id,
stat=stat,
normalization=normalization,
project_id=project_id,
metric_filters=frozendict(metric_filters),
)


@define(eq=False, slots=False)
class GcpResource(BaseResource):
kind: ClassVar[str] = "gcp_resource"
Expand Down Expand Up @@ -436,7 +487,7 @@ def post_process_instance(self, builder: GraphBuilder, source: Json) -> None:
"""
pass

def collect_usage_metrics(self, builder: GraphBuilder) -> List: # type: ignore
def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
# Default behavior: do nothing
return []

Expand Down
5 changes: 2 additions & 3 deletions plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder
from fix_plugin_gcp.resources.monitoring import GcpMonitoringQuery, normalizer_factory, STAT_LIST
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STAT_LIST
from fixlib.baseresources import BaseServerlessFunction, MetricName
from fixlib.json_bender import Bender, S, Bend, ForallBend

Expand Down Expand Up @@ -303,7 +303,6 @@ class GcpCloudFunction(GcpResource, BaseServerlessFunction):

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
queries = []
delta = builder.metrics_delta
queries.extend(
[
Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec, InternalZoneProp
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.billing import GcpSku
from fix_plugin_gcp.resources.monitoring import GcpMonitoringQuery, STAT_LIST, normalizer_factory
from fix_plugin_gcp.resources.monitoring import STAT_LIST, normalizer_factory
from fixlib.baseresources import (
BaseAutoScalingGroup,
BaseBucket,
Expand Down
100 changes: 10 additions & 90 deletions plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from copy import deepcopy
from functools import cached_property
import logging
from datetime import datetime, timedelta
from typing import Callable, ClassVar, Dict, List, Optional, Tuple, TypeVar, Union
from concurrent.futures import as_completed
from copy import deepcopy
from datetime import datetime
from functools import cached_property
from typing import ClassVar, Dict, List, Optional, Tuple, TypeVar

from attr import define, field, frozen
from attr import define, field

from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource
from fix_plugin_gcp.gcp_client import GcpApiSpec
from fixlib.baseresources import MetricName, MetricUnit, BaseResource, StatName
from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource, GcpMonitoringQuery, MetricNormalization
from fixlib.baseresources import MetricUnit, BaseResource, StatName
from fixlib.durations import duration_str
from fixlib.json import from_json
from fixlib.json_bender import S, Bender, ForallBend, bend, K
Expand All @@ -23,85 +23,6 @@
STAT_LIST: List[str] = ["ALIGN_MIN", "ALIGN_MEAN", "ALIGN_MAX"]


def identity(x: T) -> T:
return x


def compute_stats(values: List[float]) -> List[Tuple[float, Optional[StatName]]]:
return [(sum(values) / len(values), None)]


@frozen(kw_only=True)
class MetricNormalization:
unit: MetricUnit
# Use Tuple instead of Dict for stat_map because it should be immutable
stat_map: Tuple[Tuple[str, StatName], Tuple[str, StatName], Tuple[str, StatName]] = (
("ALIGN_MIN", StatName.min),
("ALIGN_MEAN", StatName.avg),
("ALIGN_MAX", StatName.max),
)
normalize_value: Callable[[float], float] = identity

compute_stats: Callable[[List[float]], List[Tuple[float, Optional[StatName]]]] = compute_stats

def get_stat_value(self, key: str) -> Optional[StatName]:
"""
Get the value from stat_map based on the given key.
Args:
key: The key to search for in the stat_map.
Returns:
The corresponding value from stat_map.
"""
for stat_key, value in self.stat_map:
if stat_key == key:
return value
return None


@define(hash=True, frozen=True)
class GcpMonitoringQuery:
metric_name: Union[str, MetricName] # final name of the metric
query_name: str # name of the metric (e.g., GCP metric type)
period: timedelta # period of the metric
ref_id: str # A unique identifier for the resource, formatted as `{resource_kind}/{resource_id}/{resource_region}`.
# Example: "gcp_instance/12345/us-central1". This is used to uniquely reference resources across kinds and regions.
metric_id: str # unique metric identifier
stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN
project_id: str # GCP project name
normalization: Optional[MetricNormalization] = None # normalization info
metric_filters: Optional[Tuple[Tuple[str, str], ...]] = None # Immutable structure

@staticmethod
def create(
*,
query_name: str,
period: timedelta,
ref_id: str,
metric_name: Union[str, MetricName],
stat: str,
project_id: str,
metric_filters: Dict[str, str],
normalization: Optional[MetricNormalization] = None,
) -> "GcpMonitoringQuery":
sorted_filters = sorted(metric_filters.items())
filter_suffix = "/" + "/".join(f"{key}={value}" for key, value in sorted_filters)
metric_id = f"{query_name}/{ref_id}/{stat}{filter_suffix}"
immutable_filters = tuple(sorted_filters)
return GcpMonitoringQuery(
metric_name=metric_name,
query_name=query_name,
period=period,
ref_id=ref_id,
metric_id=metric_id,
stat=stat,
normalization=normalization,
project_id=project_id,
metric_filters=immutable_filters,
)


@define(eq=False, slots=False)
class GcpMonitoringMetricData:
kind: ClassVar[str] = "gcp_monitoring_metric_data"
Expand Down Expand Up @@ -209,7 +130,7 @@ def _query_for_chunk(

# Add additional filters
if query.metric_filters:
filters.extend(f'{key} = "{value}"' for key, value in query.metric_filters)
filters.extend(f'{key} = "{value}"' for key, value in query.metric_filters.items())

# Join filters with " AND " to form the final filter string
local_api_spec.request_parameter["filter"] = " AND ".join(filters)
Expand Down Expand Up @@ -247,9 +168,8 @@ def update_resource_metrics(
continue
name = metric_name + "_" + normalizer.unit
value = normalizer.normalize_value(metric_value)
stat_name = maybe_stat_name or normalizer.get_stat_value(query.stat)
if stat_name:
resource._resource_usage[name][str(stat_name)] = value
stat_name = maybe_stat_name or normalizer.stat_map[query.stat]
resource._resource_usage[name][str(stat_name)] = value
except KeyError as e:
log.warning(f"An error occured while setting metric values: {e}")
raise
Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/sqladmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.compute import GcpSslCertificate
from fix_plugin_gcp.resources.monitoring import GcpMonitoringQuery, normalizer_factory, STAT_LIST
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STAT_LIST
from fixlib.baseresources import BaseDatabase, DatabaseInstanceStatus, MetricName, ModelReference
from fixlib.json_bender import F, Bender, S, Bend, ForallBend, K, MapEnum, AsInt
from fixlib.types import Json
Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/test/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import os
from typing import List

from fix_plugin_gcp.resources.base import GraphBuilder, GcpRegion
from fix_plugin_gcp.resources.base import GraphBuilder, GcpRegion, GcpMonitoringQuery
from fix_plugin_gcp.resources.compute import *
from fix_plugin_gcp.resources.billing import GcpSku
from fix_plugin_gcp.resources.monitoring import GcpMonitoringQuery, GcpMonitoringMetricData, update_resource_metrics
from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData, update_resource_metrics
from fixlib.threading import ExecutorQueue
from fixlib.baseresources import InstanceStatus

Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/test/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import timedelta, datetime, timezone

from fix_plugin_gcp.resources.base import GraphBuilder
from fix_plugin_gcp.resources.monitoring import GcpMonitoringQuery, GcpMonitoringMetricData, normalizer_factory
from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData, normalizer_factory
from fixlib.baseresources import MetricName


Expand Down

0 comments on commit 6d50044

Please sign in to comment.