Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gcp][feat] Add metrics collection #2283

Merged
merged 45 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b61779c
feat: init implementation for metrics collection
1101-1 Nov 18, 2024
d57d41d
feat: finished fetching implementation
1101-1 Nov 19, 2024
387de89
finished metric setting implementation
1101-1 Nov 20, 2024
bb95a8f
feat: added metric collection to the vm instances
1101-1 Nov 20, 2024
d4f5ecc
feat: avoid race conditions
1101-1 Nov 20, 2024
828ee24
feat: finish implementing metrics collection
1101-1 Nov 20, 2024
17ebd5e
feat: added tests
1101-1 Nov 20, 2024
19d7cf6
chore: removed comment
1101-1 Nov 20, 2024
e286fa7
Merge branch 'main' into km/gcp_collect_metrics
1101-1 Nov 20, 2024
1799688
feat: fix tests
1101-1 Nov 21, 2024
f2f3c92
fix syntax test
1101-1 Nov 21, 2024
46b7259
feat: added new test
1101-1 Nov 21, 2024
48c7131
added region to the logging
1101-1 Nov 21, 2024
aecc816
fix mypy
1101-1 Nov 21, 2024
7179a4e
feat: added collection for volumes
1101-1 Nov 22, 2024
8bc5c09
feat: added collection for sql instance
1101-1 Nov 22, 2024
85d6183
feat: make unique queries if resources have same name
1101-1 Nov 22, 2024
1fd4d11
fix test
1101-1 Nov 22, 2024
ce4d3b2
chore: delete unused var
1101-1 Nov 22, 2024
240fd91
feat: reimplemented collection and make filter dynamically
1101-1 Nov 25, 2024
ace0bc9
feat: adjusted better
1101-1 Nov 25, 2024
9527132
feat: make queries more unique
1101-1 Nov 25, 2024
fce7179
feat: make safety check for region
1101-1 Nov 25, 2024
8aebcb7
fix tests
1101-1 Nov 26, 2024
6c1e5e0
feat: compute stats if we do not use align in gcp
1101-1 Nov 26, 2024
fc527e2
feat: reimplemented collection and increase default max workers
1101-1 Nov 26, 2024
7a21031
fixed tests and move wait for submitted work in aws
1101-1 Nov 26, 2024
6d50044
simplify and cleanup
aquamatthias Nov 27, 2024
bd9774d
feat: resolved issues
1101-1 Nov 27, 2024
69bf8b2
move stat_map to monitoring
aquamatthias Nov 27, 2024
0a7c326
fix: adjusted collection of cloudfunction metrics
1101-1 Nov 27, 2024
21d5751
chore: removed unused
1101-1 Nov 27, 2024
6095f8e
feat: created tests for new implementation
1101-1 Nov 27, 2024
68a006a
chore: removed comments
1101-1 Nov 27, 2024
508a01e
fix: fixed cloudfunctions metric fetching
1101-1 Nov 27, 2024
364f8cd
feat: finished new implementation and adjusted tests
1101-1 Nov 27, 2024
8d41cf8
fixed linter
1101-1 Nov 27, 2024
e669085
fixed syntax test
1101-1 Nov 27, 2024
86f6435
feat: finished collecting metrics for lb
1101-1 Nov 28, 2024
cf987fd
convert seconds to ms
1101-1 Nov 28, 2024
7f5d7f7
fix: take values correctly
1101-1 Nov 28, 2024
abfcfe8
small improvement
1101-1 Nov 28, 2024
738d5e9
fix: moved method to make collection correctly
1101-1 Nov 29, 2024
ef5b88b
make sure all futures done
1101-1 Dec 2, 2024
3f97c38
Merge branch 'main' into km/gcp_collect_metrics
1101-1 Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fixlib/fixlib/baseresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def get(self) -> Dict[str, Any]:
return changes


# todo: replace to StrEnum once resoto is on 3.11
class MetricName(str, Enum):
class MetricName(StrEnum):
def __str__(self) -> str:
return self.value

Expand Down Expand Up @@ -195,6 +194,8 @@ def __str__(self) -> str:
DiskQueueDepth = "disk_queue_depth"
NetworkReceiveThroughput = "network_receive_throughput"
NetworkTransmitThroughput = "network_transmit_throughput"
NetworkBytesSent = "network_bytes_sent"
NetworkBytesReceived = "network_bytes_received"

# serverless
Invocations = "invocations"
Expand Down
3 changes: 2 additions & 1 deletion plugins/aws/fix_plugin_aws/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> None:
continue
# region can be overridden in the query: s3 is global, but need to be queried per region
if region := cast(AwsRegion, resource.region()):
lookup_map[resource.id] = resource
resource_queries: List[cloudwatch.AwsCloudwatchQuery] = resource.collect_usage_metrics(builder)
if resource_queries:
lookup_map[resource.id] = resource
for query in resource_queries:
query_region = query.region or region
start = query.start_delta or builder.metrics_delta
Expand Down
6 changes: 4 additions & 2 deletions plugins/azure/fix_plugin_azure/resource/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from datetime import datetime, timedelta
from concurrent.futures import as_completed
import logging
Expand Down Expand Up @@ -271,12 +272,13 @@ def _query_for_single(
interval: str,
) -> "Tuple[Optional[AzureMetricData], Optional[str]]":
try:
local_api_spec = deepcopy(api_spec)
# Set the path for the API call based on the instance ID of the query
api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
# Retrieve metric data from the API
aggregation = ",".join(query.aggregation)
part = builder.client.list(
api_spec,
local_api_spec,
metricnames=query.metric_name,
metricNamespace=query.metric_namespace,
timespan=timespan,
Expand Down
7 changes: 5 additions & 2 deletions plugins/gcp/fix_plugin_gcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fixlib.core.actions import CoreFeedback
from fixlib.graph import Graph, MaxNodesExceeded
from fixlib.logger import log, setup_logger
from fixlib.types import Json
from .collector import GcpProjectCollector
from .config import GcpConfig
from .resources.base import GcpProject
Expand Down Expand Up @@ -77,10 +78,11 @@ def collect(self) -> None:
project_id,
feedback,
cloud,
self.task_data or {},
max_resources_per_account=self.max_resources_per_account,
**collect_args,
)
for project_id in credentials.keys()
for project_id in credentials
]
for future in futures.as_completed(wait_for):
project_graph = future.result()
Expand All @@ -98,6 +100,7 @@ def collect_project(
project_id: str,
core_feedback: CoreFeedback,
cloud: Cloud,
task_data: Json,
args: Optional[Namespace] = None,
running_config: Optional[RunningConfig] = None,
credentials: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -130,7 +133,7 @@ def collect_project(

try:
core_feedback.progress_done(project_id, 0, 1)
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, max_resources_per_account)
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, task_data, max_resources_per_account)
try:
gpc.collect()
except MaxNodesExceeded as ex:
Expand Down
67 changes: 66 additions & 1 deletion plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Type, List, Any, Optional
from typing import Type, List, Any, Optional, cast

from fix_plugin_gcp.config import GcpConfig
from fix_plugin_gcp.gcp_client import GcpApiSpec
Expand All @@ -14,12 +16,15 @@
firestore,
filestore,
cloudfunctions,
monitoring,
)
from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
from fix_plugin_gcp.utils import Credentials
from fixlib.baseresources import Cloud
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph
from fixlib.json import value_in_path
from fixlib.types import Json

log = logging.getLogger("fix.plugins.gcp")
all_resources: List[Type[GcpResource]] = (
Expand Down Expand Up @@ -58,6 +63,7 @@ def __init__(
cloud: Cloud,
project: GcpProject,
core_feedback: CoreFeedback,
task_data: Json,
max_resources_per_account: Optional[int] = None,
) -> None:
self.config = config
Expand All @@ -67,6 +73,7 @@ def __init__(
self.error_accumulator = ErrorAccumulator()
self.graph = Graph(root=self.project, max_nodes=max_resources_per_account)
self.credentials = Credentials.get(self.project.id)
self.task_data = task_data

def collect(self) -> None:
with ThreadPoolExecutor(
Expand All @@ -77,7 +84,20 @@ def collect(self) -> None:
# It should only be used in scenarios, where it is safe to do so.
# This executor is shared between all regions.
shared_queue = ExecutorQueue(executor, self.project.safe_name)

def get_last_run() -> Optional[datetime]:
td = self.task_data
if not td:
return None
timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"])

if timestamp is None:
return None

return datetime.fromtimestamp(timestamp, timezone.utc)

project_global_region = GcpRegion.fallback_global_region(self.project)
last_run = get_last_run()
global_builder = GraphBuilder(
self.graph,
self.cloud,
Expand All @@ -87,6 +107,8 @@ def collect(self) -> None:
self.core_feedback,
self.error_accumulator,
project_global_region,
config=self.config,
last_run_started_at=last_run,
)
global_builder.add_node(project_global_region, {})

Expand All @@ -113,6 +135,13 @@ def collect(self) -> None:

self.error_accumulator.report_all(global_builder.core_feedback)

if global_builder.config.collect_usage_metrics:
try:
log.info(f"[GCP:{self.project.id}] Collect usage metrics.")
self.collect_usage_metrics(global_builder)
except Exception as e:
log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}")
shared_queue.wait_for_submitted_work()
log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
# connect nodes
for node, data in list(self.graph.nodes(data=True)):
Expand All @@ -131,6 +160,42 @@ def collect(self) -> None:
self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id])
log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def collect_usage_metrics(self, builder: GraphBuilder) -> None:
metrics_queries = defaultdict(list)
aquamatthias marked this conversation as resolved.
Show resolved Hide resolved
two_hours = timedelta(hours=2)
thirty_minutes = timedelta(minutes=30)
lookup_map = {}
for resource in builder.graph.nodes:
if not isinstance(resource, GcpResource):
continue
# region can be overridden in the query
if region := cast(GcpRegion, resource.region()):
resource_queries: List[monitoring.GcpMonitoringQuery] = resource.collect_usage_metrics(builder)
if resource_queries:
# set unique GcpMonitoringQuery.ref_id
lookup_map[f"{resource.kind}/{resource.id}/{region.id}"] = resource
for query in resource_queries:
query_region = query.region or region
start = builder.metrics_delta
if query.period and query.period < thirty_minutes:
start = min(start, two_hours)
metrics_queries[(query_region, start)].append(query)
for (region, start), queries in metrics_queries.items():

def collect_and_set_metrics(
start: timedelta, region: GcpRegion, queries: List[monitoring.GcpMonitoringQuery]
) -> None:
start_at = builder.created_at - start
try:
result = monitoring.GcpMonitoringMetricData.query_for(
builder.for_region(region), queries, start_at, builder.created_at
)
monitoring.update_resource_metrics(lookup_map, result)
except Exception as e:
log.warning(f"Error occurred in region {region}: {e}")

builder.submit_work(collect_and_set_metrics, start, region, queries)

def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
remove_nodes = set()
Expand Down
6 changes: 5 additions & 1 deletion plugins/gcp/fix_plugin_gcp/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fixlib.proc import num_default_threads
from attrs import define, field
from typing import List, ClassVar
from typing import List, ClassVar, Optional


@define
Expand Down Expand Up @@ -31,6 +31,10 @@ class GcpConfig:
"If false, the error is logged and the resource is skipped."
},
)
collect_usage_metrics: Optional[bool] = field(
default=True,
metadata={"description": "Collect resource usage metrics via GCP Monitoring, enabled by default"},
)

def should_collect(self, name: str) -> bool:
if self.collect:
Expand Down
6 changes: 3 additions & 3 deletions plugins/gcp/fix_plugin_gcp/resources/aiplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class AIPlatformRegionFilter:
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
if issubclass(cls, GcpResource):
region_name = "global" if not builder.region else builder.region.safe_name
log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}")
if spec := cls.api_spec:
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"}
with GcpErrorHandler(
Expand All @@ -66,7 +64,9 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpReso
if builder.region:
items = builder.client.list(spec, **kwargs)
collected_resources = cls.collect(items, builder)
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
log.info(
f"[GCP:{builder.project.id}:{builder.region.safe_name}] finished collecting: {cls.kind}"
)
return collected_resources
return []

Expand Down
47 changes: 42 additions & 5 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime, timedelta
import json
import logging
from concurrent.futures import Future
Expand All @@ -8,6 +9,7 @@
from typing import Callable, List, ClassVar, Optional, TypeVar, Type, Any, Dict, Set, Tuple

from attr import define, field
from fix_plugin_gcp.config import GcpConfig
from google.auth.credentials import Credentials as GoogleAuthCredentials
from googleapiclient.errors import HttpError

Expand All @@ -32,6 +34,8 @@
from fixlib.types import Json
from fixinventorydata.cloud import regions as cloud_region_data

from fixlib.utils import utc

log = logging.getLogger("fix.plugins.gcp")


Expand Down Expand Up @@ -81,7 +85,9 @@ def __init__(
core_feedback: CoreFeedback,
error_accumulator: ErrorAccumulator,
fallback_global_region: GcpRegion,
config: GcpConfig,
region: Optional[GcpRegion] = None,
last_run_started_at: Optional[datetime] = None,
graph_nodes_access: Optional[Lock] = None,
graph_edges_access: Optional[Lock] = None,
) -> None:
Expand All @@ -95,12 +101,39 @@ def __init__(
self.core_feedback = core_feedback
self.error_accumulator = error_accumulator
self.fallback_global_region = fallback_global_region
self.config = config
self.created_at = utc()
self.last_run_started_at = last_run_started_at
self.region_by_name: Dict[str, GcpRegion] = {}
self.region_by_zone_name: Dict[str, GcpRegion] = {}
self.zone_by_name: Dict[str, GcpZone] = {}
self.graph_nodes_access = graph_nodes_access or Lock()
self.graph_edges_access = graph_edges_access or Lock()

if last_run_started_at:
now = utc()

# limit the metrics to the last 2 hours
if now - last_run_started_at > timedelta(hours=2):
start = now - timedelta(hours=2)
else:
start = last_run_started_at

delta = now - start

min_delta = max(delta, timedelta(seconds=60))
# in case the last collection happened too quickly, raise the metrics timedelta to 60s,
if min_delta != delta:
start = now - min_delta
delta = min_delta
else:
now = utc()
delta = timedelta(hours=1)
start = now - delta

self.metrics_start = start
self.metrics_delta = delta

def submit_work(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
"""
Use this method for work that can be done in parallel.
Expand Down Expand Up @@ -275,7 +308,9 @@ def for_region(self, region: GcpRegion) -> GraphBuilder:
self.core_feedback,
self.error_accumulator,
self.fallback_global_region,
self.config,
region,
self.last_run_started_at,
self.graph_nodes_access,
self.graph_edges_access,
)
Expand Down Expand Up @@ -374,13 +409,13 @@ def post_process_instance(self, builder: GraphBuilder, source: Json) -> None:
"""
pass

def collect_usage_metrics(self, builder: GraphBuilder) -> List: # type: ignore
1101-1 marked this conversation as resolved.
Show resolved Hide resolved
# Default behavior: do nothing
return []

@classmethod
def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
if kwargs:
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind} with ({kwargs})")
else:
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}")
if spec := cls.api_spec:
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set())
with GcpErrorHandler(
Expand All @@ -393,7 +428,9 @@ def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: A
):
items = builder.client.list(spec, **kwargs)
resources = cls.collect(items, builder)
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
log.info(
f"[GCP:{builder.project.id}:{builder.region.safe_name if builder.region else "global"}] finished collecting: {cls.kind}"
)
return resources
return []

Expand Down
Loading
Loading