Skip to content

Commit

Permalink
feat: compute stats if we do not use align in gcp
Browse files Browse the repository at this point in the history
  • Loading branch information
1101-1 committed Nov 26, 2024
1 parent 8aebcb7 commit 6c1e5e0
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from fixlib.baseresources import MetricName, MetricUnit, BaseResource, StatName
from fixlib.durations import duration_str
from fixlib.json import from_json
from fixlib.json_bender import S, Bender, ForallBend, bend
from fixlib.json_bender import S, Bender, ForallBend, bend, K
from fixlib.utils import utc_str

service_name = "monitoring"
Expand All @@ -27,6 +27,10 @@ def identity(x: T) -> T:
return x


def compute_stats(values: List[float]) -> List[Tuple[float, Optional[StatName]]]:
return [(sum(values) / len(values), None)]


@frozen(kw_only=True)
class MetricNormalization:
unit: MetricUnit
Expand All @@ -38,6 +42,8 @@ class MetricNormalization:
)
normalize_value: Callable[[float], float] = identity

compute_stats: Callable[[List[float]], List[Tuple[float, Optional[StatName]]]] = compute_stats

def get_stat_value(self, key: str) -> Optional[StatName]:
"""
Get the value from stat_map based on the given key.
Expand All @@ -61,7 +67,7 @@ class GcpMonitoringQuery:
period: timedelta # period of the metric
ref_id: str # A unique identifier for the resource, formatted as `{resource_kind}/{resource_id}/{resource_region}`.
# Example: "gcp_instance/12345/us-central1". This is used to uniquely reference resources across kinds and regions.
metric_id: str # unique metric identifier (metric_name + instance_id)
metric_id: str # unique metric identifier
stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN
project_id: str # GCP project name
normalization: Optional[MetricNormalization] = None # normalization info
Expand Down Expand Up @@ -101,12 +107,14 @@ class GcpMonitoringMetricData:
kind: ClassVar[str] = "gcp_monitoring_metric_data"
mapping: ClassVar[Dict[str, Bender]] = {
"metric_values": S("points")
>> ForallBend(S("value", "doubleValue").or_else(S("value", "int64Value", default=0.0))),
>> ForallBend(
S("value", "doubleValue").or_else(S("value", "int64Value")).or_else(S("value", "distributionValue", "mean"))
).or_else(K([])),
"metric_kind": S("metricKind"),
"value_type": S("valueType"),
"metric_type": S("metric", "type"),
}
metric_values: Optional[List[float]] = field(factory=list)
metric_values: List[float] = field(factory=list)
metric_kind: Optional[str] = field(default=None)
value_type: Optional[str] = field(default=None)
metric_type: Optional[str] = field(default=None)
Expand Down Expand Up @@ -229,25 +237,25 @@ def update_resource_metrics(
resource = resources_map.get(query.ref_id)
if resource is None:
continue
if not metric.metric_values or len(metric.metric_values) == 0:
if len(metric.metric_values) == 0:
continue
normalizer = query.normalization
if not normalizer:
continue

average_value = sum(metric.metric_values) / len(metric.metric_values)

try:
metric_name = query.metric_name
if not metric_name:
continue
name = metric_name + "_" + normalizer.unit
value = normalizer.normalize_value(average_value)
stat_name = normalizer.get_stat_value(query.stat) if normalizer.get_stat_value(query.stat) else "avg"
resource._resource_usage[name][str(stat_name)] = value
except KeyError as e:
log.warning(f"An error occured while setting metric values: {e}")
raise
for metric_value, maybe_stat_name in normalizer.compute_stats(metric.metric_values):
try:
metric_name = query.metric_name
if not metric_name:
continue
name = metric_name + "_" + normalizer.unit
value = normalizer.normalize_value(metric_value)
stat_name = maybe_stat_name or normalizer.get_stat_value(query.stat)
if stat_name:
resource._resource_usage[name][str(stat_name)] = value
except KeyError as e:
log.warning(f"An error occured while setting metric values: {e}")
raise


class NormalizerFactory:
Expand Down Expand Up @@ -290,6 +298,7 @@ def seconds(self) -> MetricNormalization:
def milliseconds(self) -> MetricNormalization:
return MetricNormalization(
unit=MetricUnit.Milliseconds,
compute_stats=calculate_min_max_avg,
normalize_value=lambda x: round(x, ndigits=4),
)

Expand All @@ -301,4 +310,12 @@ def percent(self) -> MetricNormalization:
)


def calculate_min_max_avg(values: List[float]) -> List[Tuple[float, Optional[StatName]]]:
return [
(min(values), StatName.min),
(max(values), StatName.max),
(sum(values) / len(values), StatName.avg),
]


normalizer_factory = NormalizerFactory()

0 comments on commit 6c1e5e0

Please sign in to comment.