Skip to content

Commit

Permalink
[azure][fix] Reimplement resource type collection of compute, psql, m…
Browse files Browse the repository at this point in the history
…ysql and ml services (#2234)
  • Loading branch information
1101-1 authored Oct 11, 2024
1 parent 8adf359 commit 1980175
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 133 deletions.
6 changes: 3 additions & 3 deletions plugins/azure/fix_plugin_azure/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ def remove_usage_zero_value() -> None:
rm_leaf_nodes(AzureComputeVirtualMachineSize, AzureLocation)
rm_leaf_nodes(AzureNetworkExpressRoutePortsLocation, AzureSubscription)
rm_leaf_nodes(AzureNetworkVirtualApplianceSku, AzureSubscription)
rm_leaf_nodes(AzureComputeDiskType, AzureSubscription)
rm_leaf_nodes(AzureComputeDiskType, (AzureSubscription, AzureLocation)) # type: ignore
rm_leaf_nodes(AzureMachineLearningVirtualMachineSize, AzureLocation)
rm_leaf_nodes(AzureStorageSku, AzureLocation)
rm_leaf_nodes(AzureMysqlServerType, AzureSubscription)
rm_leaf_nodes(AzurePostgresqlServerType, AzureSubscription)
rm_leaf_nodes(AzureMysqlServerType, AzureLocation)
rm_leaf_nodes(AzurePostgresqlServerType, AzureLocation)
rm_leaf_nodes(AzureCosmosDBLocation, AzureLocation, check_pred=False)
rm_leaf_nodes(AzureLocation, check_pred=False)
rm_leaf_nodes(AzureComputeDiskTypePricing, AzureSubscription)
Expand Down
144 changes: 91 additions & 53 deletions plugins/azure/fix_plugin_azure/resource/compute.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
import logging
from datetime import datetime
from typing import ClassVar, Dict, Optional, List, Any, Type
Expand Down Expand Up @@ -899,15 +900,16 @@ def build_custom_disk_size(
return premium_ssd_v2_object

@staticmethod
def create_unique_disk_sizes(collected_disks: List[MicrosoftResourceType], builder: GraphBuilder) -> None:
def create_unique_disk_sizes(
collected_disks: List[MicrosoftResourceType], builder: GraphBuilder, location: str
) -> None:
disk_sizes: List[Json] = []
seen_hashes = set() # Set to keep track of unique hashes
for disk in collected_disks:
if not isinstance(disk, AzureComputeDisk):
continue
if (
(volume_type := disk.volume_type)
and (location := disk.location)
and (size := disk.volume_size)
and (iops := disk.volume_iops)
and (throughput := disk.volume_throughput)
Expand Down Expand Up @@ -1046,15 +1048,22 @@ class AzureComputeDisk(MicrosoftResource, BaseVolume):
tier_name: Optional[str] = field(default=None, metadata={"description": "The sku tier."})

@classmethod
def collect_resources(
cls: Type[MicrosoftResourceType], builder: GraphBuilder, **kwargs: Any
) -> List[MicrosoftResourceType]:
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureComputeDisk"]:
log.debug(f"[Azure:{builder.account.id}] Collecting {cls.__name__} with ({kwargs})")
if not issubclass(cls, MicrosoftResource):
return []
if spec := cls.api_spec:
items = builder.client.list(spec, **kwargs)
collected = cls.collect(items, builder)
# Create additional custom disk sizes for disks with Ultra SSD or Premium SSD v2 types
AzureComputeDiskType.create_unique_disk_sizes(collected, builder)
disks_by_location = defaultdict(list)
for disk in collected:
if disk_location := getattr(disk, "location", None):
disks_by_location[disk_location].append(disk)
for d_loc, disks in disks_by_location.items():
# Collect disk types for the disks in this location
AzureComputeDisk._collect_disk_types(builder, d_loc)
# Create additional custom disk sizes for disks with Ultra SSD or Premium SSD v2 types
AzureComputeDiskType.create_unique_disk_sizes(disks, builder, d_loc)
if builder.config.collect_usage_metrics:
try:
cls.collect_usage_metrics(builder, collected)
Expand All @@ -1063,33 +1072,32 @@ def collect_resources(
return collected
return []

def post_process(self, graph_builder: GraphBuilder, source: Json) -> None:
if location := self.location:

def collect_disk_types() -> None:
log.debug(f"[Azure:{graph_builder.account.id}] Collecting AzureComputeDiskType")
product_names = {
"Standard SSD Managed Disks",
"Premium SSD Managed Disks",
"Standard HDD Managed Disks",
}
sku_items = []
for product_name in product_names:
api_spec = AzureResourceSpec(
service="compute",
version="2023-01-01-preview",
path=f"https://prices.azure.com/api/retail/prices?$filter=productName eq '{product_name}' and armRegionName eq '{location}' and unitOfMeasure eq '1/Month' and serviceFamily eq 'Storage' and type eq 'Consumption' and isPrimaryMeterRegion eq true",
path_parameters=[],
query_parameters=["api-version"],
access_path="Items",
expect_array=True,
)
@staticmethod
def _collect_disk_types(graph_builder: GraphBuilder, location: str) -> None:
def collect_disk_types() -> None:
log.debug(f"[Azure:{graph_builder.account.id}] Collecting AzureComputeDiskType")
product_names = {
"Standard SSD Managed Disks",
"Premium SSD Managed Disks",
"Standard HDD Managed Disks",
}
sku_items = []
for product_name in product_names:
api_spec = AzureResourceSpec(
service="compute",
version="2023-01-01-preview",
path=f"https://prices.azure.com/api/retail/prices?$filter=productName eq '{product_name}' and armRegionName eq '{location}' and unitOfMeasure eq '1/Month' and serviceFamily eq 'Storage' and type eq 'Consumption' and isPrimaryMeterRegion eq true",
path_parameters=[],
query_parameters=["api-version"],
access_path="Items",
expect_array=True,
)

items = graph_builder.client.list(api_spec)
sku_items.extend(items)
AzureComputeDiskType.collect(sku_items, graph_builder)
items = graph_builder.client.list(api_spec)
sku_items.extend(items)
AzureComputeDiskType.collect(sku_items, graph_builder)

graph_builder.submit_work(service_name, collect_disk_types)
graph_builder.submit_work(service_name, collect_disk_types)

@classmethod
def collect_usage_metrics(
Expand Down Expand Up @@ -2954,30 +2962,60 @@ def collect_instance_status() -> None:
if not instance_status_set:
self.instance_status = InstanceStatus.UNKNOWN

if location := self.location:
graph_builder.submit_work(service_name, collect_instance_status)

def collect_vm_sizes() -> None:
api_spec = AzureResourceSpec(
service="compute",
version="2023-03-01",
path="/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/"
+ f"{location}/vmSizes",
path_parameters=["subscriptionId"],
query_parameters=["api-version"],
access_path="value",
expect_array=True,
)
items = graph_builder.client.list(api_spec)
if not items:
return
# Set location for further connect_in_graph method
for item in items:
item["location"] = location
AzureComputeVirtualMachineSize.collect(items, graph_builder)
@classmethod
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureComputeVirtualMachineBase"]:
log.debug(f"[Azure:{builder.account.id}] Collecting {cls.__name__} with ({kwargs})")

graph_builder.submit_work(service_name, collect_vm_sizes)
if not issubclass(cls, MicrosoftResource):
return []

graph_builder.submit_work(service_name, collect_instance_status)
if spec := cls.api_spec:
items = builder.client.list(spec, **kwargs)
collected = cls.collect(items, builder)

unique_locations = set(getattr(vm, "location") for vm in collected if getattr(vm, "location"))

for location in unique_locations:
log.debug(f"Processing virtual machines in location: {location}")

# Collect VM sizes for the VM in this location
AzureComputeVirtualMachineBase._collect_vm_sizes(builder, location)

if builder.config.collect_usage_metrics:
try:
cls.collect_usage_metrics(builder, collected)
except Exception as e:
log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}")

return collected

return []

@staticmethod
def _collect_vm_sizes(graph_builder: GraphBuilder, location: str) -> None:
def collect_vm_sizes() -> None:
api_spec = AzureResourceSpec(
service="compute",
version="2023-03-01",
path=f"/subscriptions/{{subscriptionId}}/providers/Microsoft.Compute/locations/{location}/vmSizes",
path_parameters=["subscriptionId"],
query_parameters=["api-version"],
access_path="value",
expect_array=True,
)
items = graph_builder.client.list(api_spec)
if not items:
return

# Set location for further connect_in_graph method
for item in items:
item["location"] = location

AzureComputeVirtualMachineSize.collect(items, graph_builder)

graph_builder.submit_work(service_name, collect_vm_sizes)

@classmethod
def collect_usage_metrics(
Expand Down
93 changes: 70 additions & 23 deletions plugins/azure/fix_plugin_azure/resource/machinelearning.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from collections import defaultdict
import logging
from datetime import datetime
from typing import Any, ClassVar, Dict, Optional, List, Tuple, Type
Expand Down Expand Up @@ -553,32 +554,78 @@ class AzureMachineLearningCompute(MicrosoftResource):
system_data: Optional[AzureSystemData] = field(default=None, metadata={'description': 'Metadata pertaining to creation and last modification of the resource.'}) # fmt: skip
location: Optional[str] = field(default=None, metadata={'description': 'The geo-location where the resource lives'}) # fmt: skip

def post_process(self, graph_builder: GraphBuilder, source: Json) -> None:
if location := self.location:
@classmethod
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureMachineLearningCompute"]:
log.debug(f"[Azure:{builder.account.id}] Collecting {cls.__name__} with ({kwargs})")

def collect_vm_sizes() -> None:
api_spec = AzureResourceSpec(
service="machinelearningservices",
version="2024-04-01",
path="/subscriptions/{subscriptionId}/providers/Microsoft.MachineLearningServices/locations/"
+ f"{location}/vmSizes",
path_parameters=["subscriptionId"],
query_parameters=["api-version"],
access_path="value",
expect_array=True,
)
items = graph_builder.client.list(api_spec)
if not items:
return
for item in items:
item["location"] = location
collected = AzureMachineLearningVirtualMachineSize.collect(items, graph_builder)
for _ in collected:
if (properties := self.properties) and (vm_size := properties.get("vmSize")):
graph_builder.add_edge(self, clazz=AzureMachineLearningVirtualMachineSize, name=vm_size)
if not issubclass(cls, MicrosoftResource):
return []

if spec := cls.api_spec:
items = builder.client.list(spec, **kwargs)
collected = cls.collect(items, builder)

resources_by_location = defaultdict(list)

graph_builder.submit_work(service_name, collect_vm_sizes)
for compute_resource in collected:
location = getattr(compute_resource, "location", None)
if location:
resources_by_location[location].append(compute_resource)

# Process each unique location
for location, compute_resources in resources_by_location.items():
log.debug(f"Processing compute resources in location: {location}")

# Collect VM sizes for the compute resources in this location
cls._collect_vm_sizes(builder, location, compute_resources)

if builder.config.collect_usage_metrics:
try:
cls.collect_usage_metrics(builder, collected)
except Exception as e:
log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}")

return collected

return []

@staticmethod
def _collect_vm_sizes(
graph_builder: GraphBuilder, location: str, compute_resources: List["AzureMachineLearningCompute"]
) -> None:
def collect_vm_sizes() -> None:
api_spec = AzureResourceSpec(
service="machinelearningservices",
version="2024-04-01",
path=f"/subscriptions/{{subscriptionId}}/providers/Microsoft.MachineLearningServices/locations/{location}/vmSizes",
path_parameters=["subscriptionId"],
query_parameters=["api-version"],
access_path="value",
expect_array=True,
)
items = graph_builder.client.list(api_spec)

if not items:
return

# Set location for further connect_in_graph method
for item in items:
item["location"] = location

# Collect the virtual machine sizes
collected_vm_sizes = AzureMachineLearningVirtualMachineSize.collect(items, graph_builder)

for compute_resource in compute_resources:
vm_size = (compute_resource.properties or {}).get("vmSize")
if vm_size:
for size in collected_vm_sizes:
if size.name == vm_size:
graph_builder.add_edge(compute_resource, node=size)
break

graph_builder.submit_work(service_name, collect_vm_sizes)

def post_process(self, graph_builder: GraphBuilder, source: Json) -> None:
if resource_id := self.id:

def collect_nodes() -> None:
Expand Down
Loading

0 comments on commit 1980175

Please sign in to comment.