Skip to content

Commit

Permalink
feat: finished collecting metrics for lb
Browse files Browse the repository at this point in the history
  • Loading branch information
1101-1 committed Nov 28, 2024
1 parent e669085 commit 86f6435
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 39 deletions.
3 changes: 0 additions & 3 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ class GcpMonitoringQuery:
project_id: str # GCP project name
normalization: MetricNormalization # normalization info
metric_filters: frozendict[str, str] # filters for the metric
cross_series_reducer: str # default REDUCE_NONE

@staticmethod
def create(
Expand All @@ -368,7 +367,6 @@ def create(
project_id: str,
metric_filters: Dict[str, str],
normalization: MetricNormalization,
cross_series_reducer: str = "REDUCE_NONE",
) -> "GcpMonitoringQuery":
filter_suffix = "/" + "/".join(f"{key}={value}" for key, value in sorted(metric_filters.items()))
metric_id = f"{query_name}/{ref_id}/{stat}{filter_suffix}"
Expand All @@ -382,7 +380,6 @@ def create(
normalization=normalization,
project_id=project_id,
metric_filters=frozendict(metric_filters),
cross_series_reducer=cross_series_reducer,
)


Expand Down
13 changes: 7 additions & 6 deletions plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STAT_MAP
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STANDART_STAT_MAP, PERCENTILE_STAT_MAP
from fixlib.baseresources import BaseServerlessFunction, MetricName
from fixlib.json_bender import Bender, S, Bend, ForallBend

Expand Down Expand Up @@ -321,7 +321,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
"resource.type": "cloud_function",
},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
]
)
queries.extend(
Expand All @@ -341,7 +341,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
"resource.type": "cloud_function",
},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
]
)
queries.extend(
Expand All @@ -351,16 +351,17 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.Duration,
normalization=normalizer_factory.milliseconds,
stat="ALIGN_DELTA",
# convert nanoseconds to milliseconds
normalization=normalizer_factory.milliseconds(lambda x: round(x / 1_000_000, ndigits=4)),
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.function_name": self.resource_raw_name,
"resource.labels.region": self.region().id,
"resource.type": "cloud_function",
},
cross_series_reducer="REDUCE_PERCENTILE_50",
)
for stat in PERCENTILE_STAT_MAP
]
)
return queries
Expand Down
67 changes: 54 additions & 13 deletions plugins/gcp/fix_plugin_gcp/resources/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fix_plugin_gcp.gcp_client import GcpApiSpec, InternalZoneProp
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.billing import GcpSku
from fix_plugin_gcp.resources.monitoring import STAT_MAP, normalizer_factory
from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, PERCENTILE_STAT_MAP, normalizer_factory
from fixlib.baseresources import (
BaseAutoScalingGroup,
BaseBucket,
Expand Down Expand Up @@ -1207,7 +1207,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
queries = []
delta = builder.metrics_delta
queries.extend(
[
Expand All @@ -1221,7 +1220,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.device_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
]
)

Expand All @@ -1237,7 +1236,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.device_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("compute.googleapis.com/instance/disk/read_ops_count", MetricName.DiskRead),
("compute.googleapis.com/instance/disk/write_ops_count", MetricName.DiskWrite),
Expand All @@ -1257,7 +1256,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.device_name": self.id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("compute.googleapis.com/instance/disk/read_bytes_count", MetricName.DiskRead),
("compute.googleapis.com/instance/disk/write_bytes_count", MetricName.DiskWrite),
Expand Down Expand Up @@ -1732,9 +1731,52 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
GcpTargetPool,
)
builder.add_edge(self, clazz=target_classes, link=self.target)
self._collect_backends(builder)

def _collect_backends(self, graph_builder: GraphBuilder) -> None:
def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
delta = builder.metrics_delta
if not self.load_balancing_scheme:
return []
lb_type = "external/regional" if "EXTERNAL" in self.load_balancing_scheme else "internal"
queries.extend(
[
GcpMonitoringQuery.create(
query_name=f"loadbalancing.googleapis.com/https/{lb_type}/request_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.RequestCount,
normalization=normalizer_factory.count_with_compute,
stat="ALIGN_RATE",
project_id=builder.project.id,
metric_filters={
"resource.label.forwarding_rule_name": self.id,
"resource.labels.region": self.region().id,
},
)
]
)
queries.extend(
[
GcpMonitoringQuery.create(
query_name=f"loadbalancing.googleapis.com/https/{lb_type}/backend_latencies",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.Latency,
normalization=normalizer_factory.milliseconds(),
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.label.forwarding_rule_name": self.id,
"resource.labels.region": self.region().id,
},
)
for stat in PERCENTILE_STAT_MAP
]
)

return queries

def post_process(self, graph_builder: GraphBuilder, source: Json) -> None:
if not self.target:
return
backend_services = graph_builder.nodes(clazz=GcpBackendService)
Expand Down Expand Up @@ -1776,7 +1818,7 @@ def fetch_instances(group: str) -> None:
if vm_id := item.get("instance"):
self.backends.append(vm_id)
except Exception as e:
log.warning(f"An error occured while setting backends property: {e}")
log.warning(f"An error occurred while setting backends property: {e}")

graph_builder.submit_work(fetch_instances, backend.group)

Expand Down Expand Up @@ -3620,7 +3662,6 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
if self.instance_status != InstanceStatus.RUNNING:
return []
queries: List[GcpMonitoringQuery] = []
queries = []
delta = builder.metrics_delta
queries.extend(
[
Expand All @@ -3634,7 +3675,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
]
)
queries.extend(
Expand All @@ -3649,7 +3690,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("compute.googleapis.com/instance/network/received_bytes_count", MetricName.NetworkIn),
("compute.googleapis.com/instance/network/sent_bytes_count", MetricName.NetworkOut),
Expand All @@ -3669,7 +3710,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("compute.googleapis.com/instance/disk/read_ops_count", MetricName.DiskRead),
("compute.googleapis.com/instance/disk/write_ops_count", MetricName.DiskWrite),
Expand All @@ -3689,7 +3730,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
project_id=builder.project.id,
metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("compute.googleapis.com/instance/disk/read_bytes_count", MetricName.DiskRead),
("compute.googleapis.com/instance/disk/write_bytes_count", MetricName.DiskWrite),
Expand Down
39 changes: 27 additions & 12 deletions plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from copy import deepcopy
from datetime import datetime
from functools import cached_property
from typing import ClassVar, Dict, List, Optional, Tuple, TypeVar
from functools import cached_property, lru_cache
from typing import ClassVar, Dict, List, Optional, Tuple, TypeVar, Callable

from attr import define, field

Expand All @@ -19,7 +19,16 @@
T = TypeVar("T")
V = TypeVar("V", bound=BaseResource)

STAT_MAP: Dict[str, StatName] = {"ALIGN_MIN": StatName.min, "ALIGN_MEAN": StatName.avg, "ALIGN_MAX": StatName.max}
STANDART_STAT_MAP: Dict[str, StatName] = {
"ALIGN_MIN": StatName.min,
"ALIGN_MEAN": StatName.avg,
"ALIGN_MAX": StatName.max,
}
PERCENTILE_STAT_MAP: Dict[str, StatName] = {
"ALIGN_PERCENTILE_05": StatName.min,
"ALIGN_PERCENTILE_50": StatName.avg,
"ALIGN_PERCENTILE_99": StatName.max,
}


@define(eq=False, slots=False)
Expand Down Expand Up @@ -76,11 +85,11 @@ def query_for(
"name": "projects/{project}",
"interval_endTime": utc_str(end_time),
"interval_startTime": utc_str(start_time),
"aggregation_crossSeriesReducer": "REDUCE_NONE",
"view": "FULL",
# Below parameters are intended to be set dynamically
# "aggregation_alignmentPeriod": None,
# "aggregation_perSeriesAligner": None,
# "aggregation_crossSeriesReducer": None,
# "filter": None,
},
request_parameter_in={"project"},
Expand Down Expand Up @@ -117,7 +126,6 @@ def _query_for_chunk(

# Join filters with " AND " to form the final filter string
local_api_spec.request_parameter["filter"] = " AND ".join(filters)
local_api_spec.request_parameter["aggregation_crossSeriesReducer"] = f"{query.cross_series_reducer}"
local_api_spec.request_parameter["aggregation_alignmentPeriod"] = f"{int(query.period.total_seconds())}s"
local_api_spec.request_parameter["aggregation_perSeriesAligner"] = query.stat

Expand Down Expand Up @@ -145,8 +153,9 @@ def update_resource_metrics(
continue
name = metric_name + "_" + normalizer.unit
value = normalizer.normalize_value(metric_value)
stat_name = maybe_stat_name or STAT_MAP[query.stat]
resource._resource_usage[name][str(stat_name)] = value
stat_name = maybe_stat_name or STANDART_STAT_MAP.get(query.stat) or PERCENTILE_STAT_MAP.get(query.stat)
if stat_name:
resource._resource_usage[name][str(stat_name)] = value
except KeyError as e:
log.warning(f"An error occurred while setting metric values: {e}")
raise
Expand All @@ -160,6 +169,14 @@ def count(self) -> MetricNormalization:
normalize_value=lambda x: round(x, ndigits=4),
)

@cached_property
def count_with_compute(self) -> MetricNormalization:
return MetricNormalization(
unit=MetricUnit.Count,
compute_stats=calculate_min_max_avg,
normalize_value=lambda x: round(x, ndigits=4),
)

@cached_property
def bytes(self) -> MetricNormalization:
return MetricNormalization(
Expand Down Expand Up @@ -188,13 +205,11 @@ def seconds(self) -> MetricNormalization:
normalize_value=lambda x: round(x, ndigits=4),
)

@cached_property
def milliseconds(self) -> MetricNormalization:
@lru_cache(maxsize=128)
def milliseconds(self, normalize_value: Optional[Callable[[float], float]] = None) -> MetricNormalization:
return MetricNormalization(
unit=MetricUnit.Milliseconds,
compute_stats=calculate_min_max_avg,
# convert nanoseconds to milliseconds
normalize_value=lambda x: round(x / 1_000_000, ndigits=4),
normalize_value=normalize_value or (lambda x: round(x, ndigits=4)),
)

@cached_property
Expand Down
9 changes: 4 additions & 5 deletions plugins/gcp/fix_plugin_gcp/resources/sqladmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.compute import GcpSslCertificate
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STAT_MAP
from fix_plugin_gcp.resources.monitoring import normalizer_factory, STANDART_STAT_MAP
from fixlib.baseresources import BaseDatabase, DatabaseInstanceStatus, MetricName, ModelReference
from fixlib.json_bender import F, Bender, S, Bend, ForallBend, K, MapEnum, AsInt
from fixlib.types import Json
Expand Down Expand Up @@ -769,7 +769,6 @@ def collect_sql_resources(spec: GcpApiSpec, clazz: Type[GcpResource]) -> None:

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
queries = []
delta = builder.metrics_delta
queries.extend(
[
Expand All @@ -786,7 +785,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
"resource.labels.region": self.region().id,
},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
]
)
queries.extend(
Expand All @@ -804,7 +803,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
"resource.labels.region": self.region().id,
},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("cloudsql.googleapis.com/database/network/connections", MetricName.DatabaseConnections),
("cloudsql.googleapis.com/database/network/sent_bytes_count", MetricName.NetworkBytesSent),
Expand All @@ -827,7 +826,7 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuer
"resource.labels.region": self.region().id,
},
)
for stat in STAT_MAP
for stat in STANDART_STAT_MAP
for name, metric_name in [
("cloudsql.googleapis.com/database/disk/read_ops_count", MetricName.DiskRead),
("cloudsql.googleapis.com/database/disk/write_ops_count", MetricName.DiskWrite),
Expand Down

0 comments on commit 86f6435

Please sign in to comment.