diff --git a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py index 41afb7d36..7053ccba4 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py +++ b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py @@ -12,7 +12,7 @@ from fixlib.baseresources import MetricName, MetricUnit, BaseResource, StatName from fixlib.durations import duration_str from fixlib.json import from_json -from fixlib.json_bender import S, Bender, ForallBend, bend +from fixlib.json_bender import S, Bender, ForallBend, bend, K from fixlib.utils import utc_str service_name = "monitoring" @@ -27,6 +27,10 @@ def identity(x: T) -> T: return x +def compute_stats(values: List[float]) -> List[Tuple[float, Optional[StatName]]]: + return [(sum(values) / len(values), None)] + + @frozen(kw_only=True) class MetricNormalization: unit: MetricUnit @@ -38,6 +42,8 @@ class MetricNormalization: ) normalize_value: Callable[[float], float] = identity + compute_stats: Callable[[List[float]], List[Tuple[float, Optional[StatName]]]] = compute_stats + def get_stat_value(self, key: str) -> Optional[StatName]: """ Get the value from stat_map based on the given key. @@ -61,7 +67,7 @@ class GcpMonitoringQuery: period: timedelta # period of the metric ref_id: str # A unique identifier for the resource, formatted as `{resource_kind}/{resource_id}/{resource_region}`. # Example: "gcp_instance/12345/us-central1". This is used to uniquely reference resources across kinds and regions. - metric_id: str # unique metric identifier (metric_name + instance_id) + metric_id: str # unique metric identifier stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN project_id: str # GCP project name normalization: Optional[MetricNormalization] = None # normalization info @@ -101,12 +107,14 @@ class GcpMonitoringMetricData: kind: ClassVar[str] = "gcp_monitoring_metric_data" mapping: ClassVar[Dict[str, Bender]] = { "metric_values": S("points") - >> ForallBend(S("value", "doubleValue").or_else(S("value", "int64Value", default=0.0))), + >> ForallBend( + S("value", "doubleValue").or_else(S("value", "int64Value")).or_else(S("value", "distributionValue", "mean")) + ).or_else(K([])), "metric_kind": S("metricKind"), "value_type": S("valueType"), "metric_type": S("metric", "type"), } - metric_values: Optional[List[float]] = field(factory=list) + metric_values: List[float] = field(factory=list) metric_kind: Optional[str] = field(default=None) value_type: Optional[str] = field(default=None) metric_type: Optional[str] = field(default=None) @@ -229,25 +237,25 @@ def update_resource_metrics( resource = resources_map.get(query.ref_id) if resource is None: continue - if not metric.metric_values or len(metric.metric_values) == 0: + if len(metric.metric_values) == 0: continue normalizer = query.normalization if not normalizer: continue - average_value = sum(metric.metric_values) / len(metric.metric_values) - - try: - metric_name = query.metric_name - if not metric_name: - continue - name = metric_name + "_" + normalizer.unit - value = normalizer.normalize_value(average_value) - stat_name = normalizer.get_stat_value(query.stat) if normalizer.get_stat_value(query.stat) else "avg" - resource._resource_usage[name][str(stat_name)] = value - except KeyError as e: - log.warning(f"An error occured while setting metric values: {e}") - raise + for metric_value, maybe_stat_name in normalizer.compute_stats(metric.metric_values): + try: + metric_name = query.metric_name + if not metric_name: + continue + name = metric_name + "_" + normalizer.unit + value = normalizer.normalize_value(metric_value) + stat_name = maybe_stat_name or normalizer.get_stat_value(query.stat) + if stat_name: + resource._resource_usage[name][str(stat_name)] = value + except KeyError as e: + log.warning(f"An error occured while setting metric values: {e}") + raise class NormalizerFactory: @@ -290,6 +298,7 @@ def seconds(self) -> MetricNormalization: def milliseconds(self) -> MetricNormalization: return MetricNormalization( unit=MetricUnit.Milliseconds, + compute_stats=calculate_min_max_avg, normalize_value=lambda x: round(x, ndigits=4), ) @@ -301,4 +310,12 @@ def percent(self) -> MetricNormalization: ) +def calculate_min_max_avg(values: List[float]) -> List[Tuple[float, Optional[StatName]]]: + return [ + (min(values), StatName.min), + (max(values), StatName.max), + (sum(values) / len(values), StatName.avg), + ] + + normalizer_factory = NormalizerFactory()