Skip to content

Commit

Permalink
feat: created tests for new implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
1101-1 committed Nov 27, 2024
1 parent 21d5751 commit 6095f8e
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 55 deletions.
31 changes: 23 additions & 8 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from typing import Type, List, Any, Optional, cast
from typing import Tuple, Type, List, Any, Optional, cast, Dict

from fix_plugin_gcp.config import GcpConfig
from fix_plugin_gcp.gcp_client import GcpApiSpec
Expand Down Expand Up @@ -160,16 +160,31 @@ def get_last_run() -> Optional[datetime]:
log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def collect_usage_metrics(self, builder: GraphBuilder) -> None:
metric_data_futures = []
mq_lookup = {}
resources_map = {}
result: Dict[monitoring.GcpMonitoringQuery, monitoring.GcpMonitoringMetricData] = {}
for resource in builder.graph.nodes:
if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)):
mq_lookup.update({q.metric_id: q for q in mq})
start_at = builder.created_at - builder.metrics_delta
region = cast(GcpRegion, resource.region())
try:
rb = builder.for_region(region)
result = monitoring.GcpMonitoringMetricData.query_for(rb, mq, start_at, builder.created_at)
monitoring.update_resource_metrics(resource, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")
rb = builder.for_region(region)
resources_map[f"{resource.kind}/{resource.id}/{resource.region().id}"] = resource
metric_data_futures.extend(
monitoring.GcpMonitoringMetricData.query_for(rb, mq, start_at, builder.created_at)
)

for metric_data in as_completed(metric_data_futures):
try:
metric_query_result: List[Tuple[str, monitoring.GcpMonitoringMetricData]] = metric_data.result()
for metric_id, metric in metric_query_result:
if metric is not None and metric_id is not None:
result[mq_lookup[metric_id]] = metric
except Exception as e:
log.warning(f"An error occurred while processing a metric query: {e}")

monitoring.update_resource_metrics(resources_map, result)

def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
Expand Down
2 changes: 2 additions & 0 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class GcpMonitoringQuery:
metric_name: MetricName # final name of the metric
query_name: str # name of the metric (e.g., GCP metric type)
period: timedelta # period of the metric
ref_id: str # unique id of the resource
metric_id: str # unique metric identifier
stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN
project_id: str # GCP project name
Expand All @@ -375,6 +376,7 @@ def create(
metric_name=metric_name,
query_name=query_name,
period=period,
ref_id=ref_id,
metric_id=metric_id,
stat=stat,
normalization=normalization,
Expand Down
39 changes: 21 additions & 18 deletions plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from concurrent.futures import as_completed
from concurrent.futures import Future
from copy import deepcopy
from datetime import datetime
from functools import cached_property
Expand All @@ -8,8 +8,8 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource, GcpMonitoringQuery, MetricNormalization
from fixlib.baseresources import MetricUnit, StatName
from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery, MetricNormalization
from fixlib.baseresources import MetricUnit, StatName, BaseResource
from fixlib.durations import duration_str
from fixlib.json import from_json
from fixlib.json_bender import S, Bender, ForallBend, bend, K
Expand All @@ -18,6 +18,7 @@
service_name = "monitoring"
log = logging.getLogger("fix.plugins.gcp")
T = TypeVar("T")
V = TypeVar("V", bound=BaseResource)

STAT_MAP: Dict[str, StatName] = {"ALIGN_MIN": StatName.min, "ALIGN_MEAN": StatName.avg, "ALIGN_MAX": StatName.max}

Expand Down Expand Up @@ -58,15 +59,15 @@ def query_for(
queries: List[GcpMonitoringQuery],
start_time: datetime,
end_time: datetime,
) -> "Dict[GcpMonitoringQuery, GcpMonitoringMetricData]":
) -> List[Future[List[Tuple[str, "GcpMonitoringMetricData"]]]]:
if builder.region:
log.info(
f"[{builder.region.safe_name}|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics."
)
else:
log.info(f"[global|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics.")
lookup = {q.metric_id: q for q in queries}
result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {}
# lookup = {q.metric_id: q for q in queries}
# result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {}
futures = []

api_spec = GcpApiSpec(
Expand All @@ -76,7 +77,6 @@ def query_for(
action="list",
request_parameter={
"name": "projects/{project}",
"aggregation_groupByFields": "",
"interval_endTime": utc_str(end_time),
"interval_startTime": utc_str(start_time),
"view": "FULL",
Expand All @@ -99,16 +99,16 @@ def query_for(
)
futures.append(future)
# Retrieve results from submitted queries and populate the result dictionary
for future in as_completed(futures):
try:
metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = future.result()
for metric_id, metric in metric_query_result:
if metric is not None and metric_id is not None:
result[lookup[metric_id]] = metric
except Exception as e:
log.warning(f"An error occurred while processing a metric query: {e}")
raise e
return result
# for future in as_completed(futures):
# try:
# metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = future.result()
# for metric_id, metric in metric_query_result:
# if metric is not None and metric_id is not None:
# result[lookup[metric_id]] = metric
# except Exception as e:
# log.warning(f"An error occurred while processing a metric query: {e}")
# raise e
return futures

@staticmethod
def _query_for_chunk(
Expand Down Expand Up @@ -146,10 +146,13 @@ def _query_for_chunk(


def update_resource_metrics(
resource: GcpResource,
resources_map: Dict[str, V],
monitoring_metric_result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData],
) -> None:
for query, metric in monitoring_metric_result.items():
resource = resources_map.get(query.ref_id)
if resource is None:
continue
if len(metric.metric_values) == 0:
continue
normalizer = query.normalization
Expand Down
68 changes: 40 additions & 28 deletions plugins/gcp/test/test_compute.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta, datetime
import json
import os
Expand Down Expand Up @@ -183,34 +183,46 @@ def test_gcp_instance_usage_metrics(random_builder: GraphBuilder) -> None:
gcp_instance.instance_status = InstanceStatus.RUNNING

random_builder.region = GcpRegion(id="us-east1", name="us-east1")
queries = gcp_instance.collect_usage_metrics(random_builder)
random_builder.created_at = datetime(2020, 5, 30, 15, 45, 30)
random_builder.metrics_delta = timedelta(hours=1)

# simulates the `collect_usage_metrics` method found in `GcpAccountCollector`.
def collect_and_set_metrics(start_at: datetime, _: GcpRegion, queries: List[GcpMonitoringQuery]) -> None:
with ThreadPoolExecutor(max_workers=1) as executor:
queue = ExecutorQueue(executor, tasks_per_key=lambda _: 1, name="test")
g_builder = GraphBuilder(
random_builder.graph,
random_builder.cloud,
random_builder.project,
AnonymousCredentials(), # type: ignore
queue,
random_builder.core_feedback,
random_builder.error_accumulator,
GcpRegion(id="global", name="global"),
random_builder.config,
last_run_started_at=random_builder.last_run_started_at,
)
result = GcpMonitoringMetricData.query_for(g_builder, queries, start_at, start_at + timedelta(hours=2))
update_resource_metrics(gcp_instance, result)

start = datetime(2020, 5, 30, 15, 45, 30)

collect_and_set_metrics(start, GcpRegion(id="us-east-1", name="us-east-1"), queries)

assert gcp_instance._resource_usage["cpu_utilization_percent"]["avg"] > 0.0
assert gcp_instance._resource_usage["network_in_count"]["avg"] > 0.0
assert gcp_instance._resource_usage["disk_read_count"]["avg"] > 0.0
queries = gcp_instance.collect_usage_metrics(random_builder)
mq_lookup = {q.metric_id: q for q in queries}
resources_map = {f"{gcp_instance.kind}/{gcp_instance.id}/{gcp_instance.region().id}": gcp_instance}
with ThreadPoolExecutor(max_workers=1) as executor:
queue = ExecutorQueue(executor, tasks_per_key=lambda _: 10, name="test")
g_builder = GraphBuilder(
random_builder.graph,
random_builder.cloud,
random_builder.project,
AnonymousCredentials(), # type: ignore
queue,
random_builder.core_feedback,
random_builder.error_accumulator,
GcpRegion(id="global", name="global"),
random_builder.config,
last_run_started_at=random_builder.last_run_started_at,
)
metric_data_futures = GcpMonitoringMetricData.query_for(
g_builder, queries, random_builder.created_at, random_builder.created_at + random_builder.metrics_delta
)

result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {}

for metric_data in as_completed(metric_data_futures):
try:
metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = metric_data.result()
for metric_id, metric in metric_query_result:
if metric is not None and metric_id is not None:
result[mq_lookup[metric_id]] = metric
except Exception as e:
log.warning(f"An error occurred while processing a metric query: {e}")

update_resource_metrics(resources_map, result)

assert gcp_instance._resource_usage["cpu_utilization_percent"]["avg"] > 0.0
assert gcp_instance._resource_usage["network_in_count"]["avg"] > 0.0
assert gcp_instance._resource_usage["disk_read_count"]["avg"] > 0.0


def test_machine_type_ondemand_cost(random_builder: GraphBuilder) -> None:
Expand Down
11 changes: 10 additions & 1 deletion plugins/gcp/test/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from concurrent.futures import as_completed
from datetime import timedelta, datetime, timezone
from typing import List, Tuple, Dict

from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery
from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData, normalizer_factory
Expand Down Expand Up @@ -28,6 +30,13 @@ def test_metric(random_builder: GraphBuilder) -> None:
project_id=random_builder.project.id,
metric_filters={"metric.labels.instance_name": "random_instance", "resource.labels.zone": "global"},
)
result = GcpMonitoringMetricData.query_for(random_builder, [read, write], earlier, now)
queries = GcpMonitoringMetricData.query_for(random_builder, [read, write], earlier, now)
mq_lookup = {q.metric_id: q for q in [read, write]}
result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {}
for metric_data in as_completed(queries):
metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = metric_data.result()
for metric_id, metric in metric_query_result:
if metric is not None and metric_id is not None:
result[mq_lookup[metric_id]] = metric
assert all(value > 0 for value in result[read].metric_values or [])
assert all(value > 0 for value in result[write].metric_values or [])

0 comments on commit 6095f8e

Please sign in to comment.