Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gcp][feat] Add metrics collection to GcpPubSub and GcpBucket resources #2296

Merged
1 change: 1 addition & 0 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
rm_leaf_nodes(compute.GcpAcceleratorType)
rm_leaf_nodes(billing.GcpSku)
rm_leaf_nodes(billing.GcpService)
rm_leaf_nodes(compute.GcpInterconnectLocation)
# remove regions that are not in use
self.graph.remove_recursively(builder.nodes(GcpRegion, lambda r: r.compute_region_in_use(builder) is False))

Expand Down
1 change: 1 addition & 0 deletions plugins/gcp/fix_plugin_gcp/gcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
InternalZoneProp = "_zone"
ZoneProp = "zone"
RegionProp = "region"
LocationProp = "location"

# Store the discovery function as separate variable.
# This is used in tests to change the builder function.
Expand Down
89 changes: 45 additions & 44 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from googleapiclient.errors import HttpError

from fix_plugin_gcp.config import GcpConfig
from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, InternalZoneProp, ZoneProp, RegionProp
from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, LocationProp, InternalZoneProp, ZoneProp, RegionProp
from fix_plugin_gcp.utils import Credentials
from fixlib.baseresources import (
BaseResource,
Expand Down Expand Up @@ -210,60 +210,61 @@ def add_region_to_node(self, node: GcpResourceType, source: Optional[Json] = Non
self.add_edge(node, node=node._region, reverse=True)
return

parts = node.id.split("/", maxsplit=4)
if len(parts) > 3 and parts[0] == "projects":
if parts[2] in ["locations", "zones", "regions"]:
location_name = parts[3]
# Check for zone first
if zone := self.zone_by_name.get(location_name):
node._zone = zone
node._region = self.region_by_zone_name.get(zone.id)
self.add_edge(zone, node=node)
return
def set_zone_or_region(location_name: str) -> bool:
return set_zone(location_name) or set_region(location_name)

# Then check for region
if region := self.region_by_name.get(location_name):
node._region = region
self.add_edge(region, node=node)
return
def set_zone(zone_name: str) -> bool:
if zone := self.zone_by_name.get(zone_name):
node._zone = zone
node._region = self.region_by_zone_name.get(zone.id)
self.add_edge(zone, node=node)
return True
else:
log.debug(
"Zone property '%s' found in the source but no corresponding region object is available to associate with the node.",
zone_name,
)
return False

def set_region(region_name: str) -> bool:
if region := self.region_by_name.get(region_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
return True
else:
log.debug(
"Region property '%s' found in the source but no corresponding region object is available to associate with the node.",
region_name,
)
return False

if source is not None:
if ZoneProp in source:
zone_name = source[ZoneProp].rsplit("/", 1)[-1]
if zone := self.zone_by_name.get(zone_name):
node._zone = zone
node._region = self.region_by_zone_name[zone_name]
self.add_edge(node, node=zone, reverse=True)
zone_name = source[ZoneProp].lower().rsplit("/", 1)[-1]
if set_zone(zone_name):
return
else:
log.debug(
"Zone property '%s' found in the source but no corresponding zone object is available to associate with the node.",
zone_name,
)

if InternalZoneProp in source:
if zone := self.zone_by_name.get(source[InternalZoneProp]):
node._zone = zone
node._region = self.region_by_zone_name[source[InternalZoneProp]]
self.add_edge(node, node=zone, reverse=True)
zone_name = source[InternalZoneProp].lower().rsplit("/", 1)[-1]
if set_zone(zone_name):
return
else:
log.debug(
"Internal zone property '%s' exists in the source but no corresponding zone object is available to associate with the node.",
source[InternalZoneProp],
)

if RegionProp in source:
region_name = source[RegionProp].rsplit("/", 1)[-1]
if region := self.region_by_name.get(region_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
region_name = source[RegionProp].lower().rsplit("/", 1)[-1]
if set_region(region_name):
return
# location property can be a zone or region
if LocationProp in source:
location_name = source[LocationProp].lower().rsplit("/", 1)[-1]
if set_zone_or_region(location_name):
return

parts = node.id.split("/", maxsplit=4)
if len(parts) > 3 and parts[0] == "projects":
if parts[2] in ["locations", "zones", "regions"]:
location_name = parts[3].lower()
if set_zone_or_region(location_name):
return
else:
log.debug(
"Region property '%s' found in the source but no corresponding region object is available to associate with the node.",
region_name,
)

# Fallback to GraphBuilder region, i.e. regional collection
if self.region is not None:
Expand Down
4 changes: 2 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4429,7 +4429,7 @@ class GcpMachineType(GcpResource, BaseInstanceType):
"maximum_persistent_disks_size_gb": S("maximumPersistentDisksSizeGb"),
"scratch_disks": S("scratchDisks", default=[]) >> ForallBend(S("diskGb")),
"instance_type": S("name"),
"instance_cores": S("guestCpus") >> F(lambda x: float(x)),
"instance_cores": S("guestCpus") >> F(float),
"instance_memory": S("memoryMb") >> F(lambda x: float(x) / 1024),
}
accelerators: Optional[List[GcpAccelerators]] = field(default=None)
Expand Down Expand Up @@ -5419,7 +5419,7 @@ class GcpNotificationEndpointGrpcSettings:


@define(eq=False, slots=False)
class GcpNotificationEndpoint(GcpResource, PhantomBaseResource):
class GcpNotificationEndpoint(GcpResource):
kind: ClassVar[str] = "gcp_notification_endpoint"
_kind_display: ClassVar[str] = "GCP Notification Endpoint"
_kind_description: ClassVar[str] = "GCP Notification Endpoint is a Google Cloud Platform service that receives and processes notifications from various GCP resources. It acts as a central point for collecting and routing alerts, updates, and event data. Users can configure endpoints to direct notifications to specific destinations like email, SMS, or third-party applications for monitoring and response purposes." # fmt: skip
Expand Down
2 changes: 1 addition & 1 deletion plugins/gcp/fix_plugin_gcp/resources/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _query_for_chunk(
# Base filter
filters = [
f'metric.type = "{query.query_name}"',
f'resource.labels.project_id="{query.project_id}"',
f'resource.labels.project_id = "{query.project_id}"',
]

# Add additional filters
Expand Down
64 changes: 62 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder
from fixlib.baseresources import BaseQueue, ModelReference, QueueType
from fix_plugin_gcp.resources.base import GcpMonitoringQuery, GcpResource, GcpDeprecationStatus, GraphBuilder
from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory
from fixlib.baseresources import BaseQueue, MetricName, ModelReference, QueueType
from fixlib.json_bender import Bender, S, Bend, K, F
from fixlib.types import Json

Expand Down Expand Up @@ -268,6 +269,65 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
if topic := self.subscription_topic:
builder.add_edge(self, clazz=GcpPubSubTopic, reverse=True, name=topic)

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
delta = builder.metrics_delta

queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/subscription/push_request_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfMessagesReceived,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.subscription_id": self.resource_raw_name,
},
)
for stat in STANDART_STAT_MAP
]
)
if self.subscription_topic:
topic_id = self.subscription_topic.rsplit("/", maxsplit=1)[-1]
queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/topic/send_message_operation_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfMessagesSent,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.topic_id": topic_id,
},
)
for stat in STANDART_STAT_MAP
]
)
queries.extend(
[
GcpMonitoringQuery.create(
query_name="pubsub.googleapis.com/subscription/oldest_unacked_message_age",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.ApproximateAgeOfOldestMessage,
normalization=normalizer_factory.seconds,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.subscription_id": self.resource_raw_name,
},
)
for stat in STANDART_STAT_MAP
]
)
return queries


@define(eq=False, slots=False)
class GcpAwsKinesis:
Expand Down
53 changes: 51 additions & 2 deletions plugins/gcp/fix_plugin_gcp/resources/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, get_client
from fixlib.baseresources import BaseBucket
from fix_plugin_gcp.resources.base import (
GcpMonitoringQuery,
GcpResource,
GcpDeprecationStatus,
GraphBuilder,
get_client,
)
from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory
from fixlib.baseresources import BaseBucket, MetricName
from fixlib.graph import Graph
from fixlib.json_bender import Bender, S, Bend, ForallBend, AsBool

Expand Down Expand Up @@ -418,6 +425,48 @@ class GcpBucket(GcpResource, BaseBucket):
requester_pays: Optional[bool] = field(default=None)
lifecycle_rule: List[GcpRule] = field(factory=list)

def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]:
queries: List[GcpMonitoringQuery] = []
delta = builder.metrics_delta

queries.extend(
[
GcpMonitoringQuery.create(
query_name="storage.googleapis.com/storage/total_bytes",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.BucketSizeBytes,
normalization=normalizer_factory.bytes,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.bucket_name": self.id,
"resource.labels.location": self.region().id,
},
)
for stat in STANDART_STAT_MAP
]
)
queries.extend(
[
GcpMonitoringQuery.create(
query_name="storage.googleapis.com/storage/object_count",
period=delta,
ref_id=f"{self.kind}/{self.id}/{self.region().id}",
metric_name=MetricName.NumberOfObjects,
normalization=normalizer_factory.count,
stat=stat,
project_id=builder.project.id,
metric_filters={
"resource.labels.bucket_name": self.id,
"resource.labels.location": self.region().id,
},
)
for stat in STANDART_STAT_MAP
]
)
return queries

def pre_delete(self, graph: Graph) -> bool:
client = get_client(self)
objects = client.list(GcpObject.api_spec, bucket=self.name)
Expand Down
Loading