From 44fd9249df63aa86087dff0af92ccfc17de18a3e Mon Sep 17 00:00:00 2001 From: 1101-1 <70093559+1101-1@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:05:24 +0500 Subject: [PATCH 1/5] [gcp][feat] Add metrics collection to GcpPubSub and GcpBucket resources (#2296) --- plugins/gcp/fix_plugin_gcp/collector.py | 1 + plugins/gcp/fix_plugin_gcp/gcp_client.py | 1 + plugins/gcp/fix_plugin_gcp/resources/base.py | 89 ++++++++++--------- .../gcp/fix_plugin_gcp/resources/compute.py | 4 +- .../fix_plugin_gcp/resources/monitoring.py | 2 +- .../gcp/fix_plugin_gcp/resources/pubsub.py | 64 ++++++++++++- .../gcp/fix_plugin_gcp/resources/storage.py | 53 ++++++++++- 7 files changed, 163 insertions(+), 51 deletions(-) diff --git a/plugins/gcp/fix_plugin_gcp/collector.py b/plugins/gcp/fix_plugin_gcp/collector.py index fc7bf5bfdd..57333258d2 100644 --- a/plugins/gcp/fix_plugin_gcp/collector.py +++ b/plugins/gcp/fix_plugin_gcp/collector.py @@ -198,6 +198,7 @@ def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None: rm_leaf_nodes(compute.GcpAcceleratorType) rm_leaf_nodes(billing.GcpSku) rm_leaf_nodes(billing.GcpService) + rm_leaf_nodes(compute.GcpInterconnectLocation) # remove regions that are not in use self.graph.remove_recursively(builder.nodes(GcpRegion, lambda r: r.compute_region_in_use(builder) is False)) diff --git a/plugins/gcp/fix_plugin_gcp/gcp_client.py b/plugins/gcp/fix_plugin_gcp/gcp_client.py index b79398592b..7e76ece0ad 100644 --- a/plugins/gcp/fix_plugin_gcp/gcp_client.py +++ b/plugins/gcp/fix_plugin_gcp/gcp_client.py @@ -14,6 +14,7 @@ InternalZoneProp = "_zone" ZoneProp = "zone" RegionProp = "region" +LocationProp = "location" # Store the discovery function as separate variable. # This is used in tests to change the builder function. diff --git a/plugins/gcp/fix_plugin_gcp/resources/base.py b/plugins/gcp/fix_plugin_gcp/resources/base.py index ae4bb35388..e696b10007 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/base.py +++ b/plugins/gcp/fix_plugin_gcp/resources/base.py @@ -15,7 +15,7 @@ from googleapiclient.errors import HttpError from fix_plugin_gcp.config import GcpConfig -from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, InternalZoneProp, ZoneProp, RegionProp +from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, LocationProp, InternalZoneProp, ZoneProp, RegionProp from fix_plugin_gcp.utils import Credentials from fixlib.baseresources import ( BaseResource, @@ -210,60 +210,61 @@ def add_region_to_node(self, node: GcpResourceType, source: Optional[Json] = Non self.add_edge(node, node=node._region, reverse=True) return - parts = node.id.split("/", maxsplit=4) - if len(parts) > 3 and parts[0] == "projects": - if parts[2] in ["locations", "zones", "regions"]: - location_name = parts[3] - # Check for zone first - if zone := self.zone_by_name.get(location_name): - node._zone = zone - node._region = self.region_by_zone_name.get(zone.id) - self.add_edge(zone, node=node) - return + def set_zone_or_region(location_name: str) -> bool: + return set_zone(location_name) or set_region(location_name) - # Then check for region - if region := self.region_by_name.get(location_name): - node._region = region - self.add_edge(region, node=node) - return + def set_zone(zone_name: str) -> bool: + if zone := self.zone_by_name.get(zone_name): + node._zone = zone + node._region = self.region_by_zone_name.get(zone.id) + self.add_edge(zone, node=node) + return True + else: + log.debug( + "Zone property '%s' found in the source but no corresponding region object is available to associate with the node.", + zone_name, + ) + return False + + def set_region(region_name: str) -> bool: + if region := self.region_by_name.get(region_name): + node._region = region + self.add_edge(node, node=region, reverse=True) + return True + else: + log.debug( + "Region property '%s' found in the source but no corresponding region object is available to associate with the node.", + region_name, + ) + return False if source is not None: if ZoneProp in source: - zone_name = source[ZoneProp].rsplit("/", 1)[-1] - if zone := self.zone_by_name.get(zone_name): - node._zone = zone - node._region = self.region_by_zone_name[zone_name] - self.add_edge(node, node=zone, reverse=True) + zone_name = source[ZoneProp].lower().rsplit("/", 1)[-1] + if set_zone(zone_name): return - else: - log.debug( - "Zone property '%s' found in the source but no corresponding zone object is available to associate with the node.", - zone_name, - ) if InternalZoneProp in source: - if zone := self.zone_by_name.get(source[InternalZoneProp]): - node._zone = zone - node._region = self.region_by_zone_name[source[InternalZoneProp]] - self.add_edge(node, node=zone, reverse=True) + zone_name = source[InternalZoneProp].lower().rsplit("/", 1)[-1] + if set_zone(zone_name): return - else: - log.debug( - "Internal zone property '%s' exists in the source but no corresponding zone object is available to associate with the node.", - source[InternalZoneProp], - ) if RegionProp in source: - region_name = source[RegionProp].rsplit("/", 1)[-1] - if region := self.region_by_name.get(region_name): - node._region = region - self.add_edge(node, node=region, reverse=True) + region_name = source[RegionProp].lower().rsplit("/", 1)[-1] + if set_region(region_name): + return + # location property can be a zone or region + if LocationProp in source: + location_name = source[LocationProp].lower().rsplit("/", 1)[-1] + if set_zone_or_region(location_name): + return + + parts = node.id.split("/", maxsplit=4) + if len(parts) > 3 and parts[0] == "projects": + if parts[2] in ["locations", "zones", "regions"]: + location_name = parts[3].lower() + if set_zone_or_region(location_name): return - else: - log.debug( - "Region property '%s' found in the source but no corresponding region object is available to associate with the node.", - region_name, - ) # Fallback to GraphBuilder region, i.e. regional collection if self.region is not None: diff --git a/plugins/gcp/fix_plugin_gcp/resources/compute.py b/plugins/gcp/fix_plugin_gcp/resources/compute.py index 2443e18b28..93945b7a53 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/compute.py +++ b/plugins/gcp/fix_plugin_gcp/resources/compute.py @@ -4429,7 +4429,7 @@ class GcpMachineType(GcpResource, BaseInstanceType): "maximum_persistent_disks_size_gb": S("maximumPersistentDisksSizeGb"), "scratch_disks": S("scratchDisks", default=[]) >> ForallBend(S("diskGb")), "instance_type": S("name"), - "instance_cores": S("guestCpus") >> F(lambda x: float(x)), + "instance_cores": S("guestCpus") >> F(float), "instance_memory": S("memoryMb") >> F(lambda x: float(x) / 1024), } accelerators: Optional[List[GcpAccelerators]] = field(default=None) @@ -5419,7 +5419,7 @@ class GcpNotificationEndpointGrpcSettings: @define(eq=False, slots=False) -class GcpNotificationEndpoint(GcpResource, PhantomBaseResource): +class GcpNotificationEndpoint(GcpResource): kind: ClassVar[str] = "gcp_notification_endpoint" _kind_display: ClassVar[str] = "GCP Notification Endpoint" _kind_description: ClassVar[str] = "GCP Notification Endpoint is a Google Cloud Platform service that receives and processes notifications from various GCP resources. It acts as a central point for collecting and routing alerts, updates, and event data. Users can configure endpoints to direct notifications to specific destinations like email, SMS, or third-party applications for monitoring and response purposes." # fmt: skip diff --git a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py index 6219083e2e..ea4f640471 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py +++ b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py @@ -117,7 +117,7 @@ def _query_for_chunk( # Base filter filters = [ f'metric.type = "{query.query_name}"', - f'resource.labels.project_id="{query.project_id}"', + f'resource.labels.project_id = "{query.project_id}"', ] # Add additional filters diff --git a/plugins/gcp/fix_plugin_gcp/resources/pubsub.py b/plugins/gcp/fix_plugin_gcp/resources/pubsub.py index 2e413ac2a6..4ffd2a3cd9 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/pubsub.py +++ b/plugins/gcp/fix_plugin_gcp/resources/pubsub.py @@ -4,8 +4,9 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec -from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder -from fixlib.baseresources import BaseQueue, ModelReference, QueueType +from fix_plugin_gcp.resources.base import GcpMonitoringQuery, GcpResource, GcpDeprecationStatus, GraphBuilder +from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory +from fixlib.baseresources import BaseQueue, MetricName, ModelReference, QueueType from fixlib.json_bender import Bender, S, Bend, K, F from fixlib.types import Json @@ -268,6 +269,65 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: if topic := self.subscription_topic: builder.add_edge(self, clazz=GcpPubSubTopic, reverse=True, name=topic) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="pubsub.googleapis.com/subscription/push_request_count", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.NumberOfMessagesReceived, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.subscription_id": self.resource_raw_name, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + if self.subscription_topic: + topic_id = self.subscription_topic.rsplit("/", maxsplit=1)[-1] + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="pubsub.googleapis.com/topic/send_message_operation_count", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.NumberOfMessagesSent, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.topic_id": topic_id, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="pubsub.googleapis.com/subscription/oldest_unacked_message_age", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.ApproximateAgeOfOldestMessage, + normalization=normalizer_factory.seconds, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.subscription_id": self.resource_raw_name, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + return queries + @define(eq=False, slots=False) class GcpAwsKinesis: diff --git a/plugins/gcp/fix_plugin_gcp/resources/storage.py b/plugins/gcp/fix_plugin_gcp/resources/storage.py index e453bba658..ec4fbae661 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/storage.py +++ b/plugins/gcp/fix_plugin_gcp/resources/storage.py @@ -4,8 +4,15 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec -from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, get_client -from fixlib.baseresources import BaseBucket +from fix_plugin_gcp.resources.base import ( + GcpMonitoringQuery, + GcpResource, + GcpDeprecationStatus, + GraphBuilder, + get_client, +) +from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, normalizer_factory +from fixlib.baseresources import BaseBucket, MetricName from fixlib.graph import Graph from fixlib.json_bender import Bender, S, Bend, ForallBend, AsBool @@ -418,6 +425,48 @@ class GcpBucket(GcpResource, BaseBucket): requester_pays: Optional[bool] = field(default=None) lifecycle_rule: List[GcpRule] = field(factory=list) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="storage.googleapis.com/storage/total_bytes", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.BucketSizeBytes, + normalization=normalizer_factory.bytes, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.bucket_name": self.id, + "resource.labels.location": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="storage.googleapis.com/storage/object_count", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.NumberOfObjects, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.bucket_name": self.id, + "resource.labels.location": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + return queries + def pre_delete(self, graph: Graph) -> bool: client = get_client(self) objects = client.list(GcpObject.api_spec, bucket=self.name) From 61ae4470d7d5938c43ff9882487673db4e598303 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Wed, 11 Dec 2024 09:31:06 +0100 Subject: [PATCH 2/5] [core][chore] Remove "Experimental" disclaimers from API documentation. (#2301) --- fixcore/fixcore/static/api-doc.yaml | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/fixcore/fixcore/static/api-doc.yaml b/fixcore/fixcore/static/api-doc.yaml index d32c72bc04..b9c0410e79 100644 --- a/fixcore/fixcore/static/api-doc.yaml +++ b/fixcore/fixcore/static/api-doc.yaml @@ -404,7 +404,6 @@ paths: post: summary: "Merge a given graph with the existing graph under marked merge nodes as batch update." description: | - **Experimental**: This API is not stable and might be subject of change.
Merge a given graph with the existing graph under marked merge nodes as batch update. tags: - graph_management @@ -458,7 +457,6 @@ paths: get: summary: "Get a list of all running batch updates" description: | - **Experimental**: This API is not stable and might be subject of change.
Get a list of all running batch updates. tags: - graph_management @@ -475,7 +473,6 @@ paths: post: summary: "Commit a batch update" description: | - **Experimental**: This API is not stable and might be subject of change.
Commit a batch update. tags: - graph_management @@ -498,7 +495,6 @@ paths: delete: summary: "Abort a batch update" description: | - **Experimental**: This API is not stable and might be subject of change.
Abort a batch update. tags: - graph_management @@ -584,7 +580,6 @@ paths: post: summary: "Create a new node under the given parent node" description: | - **Experimental**: This API is not stable and might be subject of change.
Create a new node under the given parent node. tags: - node_management @@ -619,7 +614,6 @@ paths: get: summary: "Get a node with the given node id" description: | - **Experimental**: This API is not stable and might be subject of change.
Get a node with the given node id tags: - node_management @@ -641,7 +635,6 @@ paths: patch: summary: "Update a node with the given node id" description: | - **Experimental**: This API is not stable and might be subject of change.
Update a node with the given node id tags: - node_management @@ -670,7 +663,6 @@ paths: delete: summary: "Delete a node with the given node id." description: | - **Experimental**: This API is not stable and might be subject of change.
Delete a node with the given node id. tags: - node_management @@ -689,7 +681,6 @@ paths: patch: summary: "Patch a node with the given node id in given section" description: | - **Experimental**: This API is not stable and might be subject of change.
Patch a node with the given node id in given section tags: - node_management @@ -723,7 +714,6 @@ paths: post: summary: "Transform the search into the raw database search" description: | - **Experimental**: This API is not stable and might be subject of change.
Show the underlying raw search. tags: - debug @@ -2153,7 +2143,6 @@ paths: get: summary: "Get all configuration keys" description: | - **Experimental**: This API is not stable and might be subject of change.
Get all configuration keys in the system. tags: - config @@ -2213,7 +2202,6 @@ paths: get: summary: "Get all configuration keys that have a model defined." description: | - **Experimental**: This API is not stable and might be subject of change.
Get all configuration keys that have a model defined. tags: - config_validation @@ -2231,7 +2219,6 @@ paths: get: summary: "Get a configuration by its id" description: | - **Experimental**: This API is not stable and might be subject of change.
Fetch a configuration by id. tags: - config @@ -2261,7 +2248,6 @@ paths: put: summary: "Replace a configuration with given id" description: | - **Experimental**: This API is not stable and might be subject of change.
Replace a configuration identified by id with provided value. tags: - config @@ -2310,7 +2296,6 @@ paths: patch: summary: "Patch a configuration by its id" description: | - **Experimental**: This API is not stable and might be subject of change.
Patch a configuration identified by id with provided value. tags: - config @@ -2359,7 +2344,6 @@ paths: delete: summary: "Delete a configuration by its id" description: | - **Experimental**: This API is not stable and might be subject of change.
Delete a configuration identified by id with provided value. tags: - config @@ -2378,7 +2362,6 @@ paths: get: summary: "Get a configuration validation by its id" description: | - **Experimental**: This API is not stable and might be subject of change.
Fetch the validation of a configuration by id. tags: - config_validation @@ -2405,7 +2388,6 @@ paths: put: summary: "Replace a configuration validation with given id" description: | - **Experimental**: This API is not stable and might be subject of change.
Replace a configuration validation identified by id with provided value. tags: - config_validation @@ -2519,7 +2501,6 @@ paths: get: summary: Get information about CLI description: | - **Experimental**: This API is not stable and might be subject of change.
Get information about CLI tags: - cli From 8fe337146dad482dda1bbfa5b98dceb97220f778 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Wed, 11 Dec 2024 09:37:37 +0100 Subject: [PATCH 3/5] [fix][chore] Do not use S3/CDN in GitHub actions (#2289) --- .github/workflows/check_pr_plugin_aws.yml | 16 -- .github/workflows/check_pr_plugin_gcp.yml | 14 -- .github/workflows/create_plugin_workflows.py | 9 +- .github/workflows/publish.yml | 164 +++++++++---------- 4 files changed, 87 insertions(+), 116 deletions(-) diff --git a/.github/workflows/check_pr_plugin_aws.yml b/.github/workflows/check_pr_plugin_aws.yml index 30741eb48d..e9ce86fb35 100644 --- a/.github/workflows/check_pr_plugin_aws.yml +++ b/.github/workflows/check_pr_plugin_aws.yml @@ -73,19 +73,3 @@ jobs: user: __token__ password: ${{ secrets.PYPI_FIXINVENTORY_PLUGIN_AWS }} packages_dir: ./plugins/aws/dist/ - - - name: Upload AWS policies - if: github.event_name != 'pull_request' - working-directory: ./plugins/aws - run: | - pip install --upgrade --editable . - pip install --upgrade --editable ./tools/awspolicygen - export GITHUB_REF="${{ github.ref }}" - export GITHUB_REF_TYPE="${{ github.ref_type }}" - export GITHUB_EVENT_NAME="${{ github.event_name }}" - export API_TOKEN="${{ secrets.API_TOKEN }}" - export SPACES_KEY="${{ secrets.SPACES_KEY }}" - export SPACES_SECRET="${{ secrets.SPACES_SECRET }}" - export AWS_ACCESS_KEY_ID="${{ secrets.S3_FIXINVENTORYPUBLIC_AWS_ACCESS_KEY_ID }}" - export AWS_SECRET_ACCESS_KEY="${{ secrets.S3_FIXINVENTORYPUBLIC_AWS_SECRET_ACCESS_KEY }}" - awspolicygen --verbose --spaces-name somecdn --spaces-region ams3 --spaces-path fix/aws/ --aws-s3-bucket fixinventorypublic --aws-s3-bucket-path cf/ diff --git a/.github/workflows/check_pr_plugin_gcp.yml b/.github/workflows/check_pr_plugin_gcp.yml index 7d4a3ac68a..c6c2fd8078 100644 --- a/.github/workflows/check_pr_plugin_gcp.yml +++ b/.github/workflows/check_pr_plugin_gcp.yml @@ -73,17 +73,3 @@ jobs: user: __token__ password: ${{ secrets.PYPI_FIXINVENTORY_PLUGIN_GCP }} packages_dir: ./plugins/gcp/dist/ - - - name: Upload GCP policies - if: github.event_name != 'pull_request' - working-directory: ./plugins/gcp - run: | - pip install --upgrade --editable . - pip install --upgrade --editable ./tools/gcppolicygen - export GITHUB_REF="${{ github.ref }}" - export GITHUB_REF_TYPE="${{ github.ref_type }}" - export GITHUB_EVENT_NAME="${{ github.event_name }}" - export API_TOKEN="${{ secrets.API_TOKEN }}" - export SPACES_KEY="${{ secrets.SPACES_KEY }}" - export SPACES_SECRET="${{ secrets.SPACES_SECRET }}" - gcppolicygen --verbose --spaces-name somecdn --spaces-region ams3 --spaces-path fix/gcp/ diff --git a/.github/workflows/create_plugin_workflows.py b/.github/workflows/create_plugin_workflows.py index ac311bdf62..4700998f7d 100755 --- a/.github/workflows/create_plugin_workflows.py +++ b/.github/workflows/create_plugin_workflows.py @@ -138,7 +138,8 @@ .replace("@name@", plugin) .replace("@PKGNAME@", f"fixinventory_plugin_{plugin}".upper()) ) - if plugin == "aws": - yml.write(aws_policygen) - elif plugin == "gcp": - yml.write(gcp_policygen) + # PolicyGen Upload disabled for now. Uncomment when required. + # if plugin == "aws": + # yml.write(aws_policygen) + # elif plugin == "gcp": + # yml.write(gcp_policygen) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e91fd803cd..4b062e2284 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -34,34 +34,34 @@ jobs: run: | yarn install --frozen-lockfile - - name: Wait for AWS policies to be uploaded - if: github.event_name != 'workflow_dispatch' - uses: lewagon/wait-on-check-action@v1.3.1 - with: - ref: ${{ github.ref }} - check-name: aws - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Update AWS policy JSON - shell: bash - working-directory: ./docs.fix.security/iam/aws - run: | - wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/edge/FixOrgList.json - wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/edge/FixCollect.json - - - name: Wait for GCP policies to be uploaded - if: github.event_name != 'workflow_dispatch' - uses: lewagon/wait-on-check-action@v1.3.1 - with: - ref: ${{ github.ref }} - check-name: gcp - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Update GCP policy JSON - shell: bash - working-directory: ./docs.fix.security/iam/gcp - run: | - wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/edge/fix_access.json +# - name: Wait for AWS policies to be uploaded +# if: github.event_name != 'workflow_dispatch' +# uses: lewagon/wait-on-check-action@v1.3.1 +# with: +# ref: ${{ github.ref }} +# check-name: aws +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# +# - name: Update AWS policy JSON +# shell: bash +# working-directory: ./docs.fix.security/iam/aws +# run: | +# wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/edge/FixOrgList.json +# wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/edge/FixCollect.json +# +# - name: Wait for GCP policies to be uploaded +# if: github.event_name != 'workflow_dispatch' +# uses: lewagon/wait-on-check-action@v1.3.1 +# with: +# ref: ${{ github.ref }} +# check-name: gcp +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# +# - name: Update GCP policy JSON +# shell: bash +# working-directory: ./docs.fix.security/iam/gcp +# run: | +# wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/edge/fix_access.json - name: Clean existing Kroki images shell: bash @@ -138,28 +138,28 @@ jobs: run: | yarn gen-api-docs - - name: Update AWS policy JSON - shell: bash - working-directory: ./inventory.fix.security/iam/aws/edge - run: | - wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/edge/FixOrgList.json - wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/edge/FixCollect.json - wget -qO FixMutate.json https://cdn.some.engineering/fix/aws/edge/FixMutate.json - - - name: Wait for GCP policies to be uploaded - if: github.event_name != 'workflow_dispatch' - uses: lewagon/wait-on-check-action@v1.3.1 - with: - ref: ${{ github.ref }} - check-name: gcp - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Update GCP policy JSON - shell: bash - working-directory: ./inventory.fix.security/iam/gcp/edge - run: | - wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/edge/fix_access.json - wget -qO fix_mutate.json https://cdn.some.engineering/fix/gcp/edge/fix_mutate.json +# - name: Update AWS policy JSON +# shell: bash +# working-directory: ./inventory.fix.security/iam/aws/edge +# run: | +# wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/edge/FixOrgList.json +# wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/edge/FixCollect.json +# wget -qO FixMutate.json https://cdn.some.engineering/fix/aws/edge/FixMutate.json +# +# - name: Wait for GCP policies to be uploaded +# if: github.event_name != 'workflow_dispatch' +# uses: lewagon/wait-on-check-action@v1.3.1 +# with: +# ref: ${{ github.ref }} +# check-name: gcp +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# +# - name: Update GCP policy JSON +# shell: bash +# working-directory: ./inventory.fix.security/iam/gcp/edge +# run: | +# wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/edge/fix_access.json +# wget -qO fix_mutate.json https://cdn.some.engineering/fix/gcp/edge/fix_mutate.json - name: Clean existing Kroki images if: github.event_name == 'workflow_dispatch' # only when triggered manually @@ -286,38 +286,38 @@ jobs: run: | yarn gen-api-docs - - name: Wait for AWS policies to be uploaded - if: steps.release.outputs.prerelease == 'false' && github.event_name != 'workflow_dispatch' - uses: lewagon/wait-on-check-action@v1.3.1 - with: - ref: ${{ github.ref }} - check-name: aws - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Update AWS policy JSON - if: steps.release.outputs.prerelease == 'false' - shell: bash - working-directory: ./inventory.fix.security/iam/aws/${{ steps.release.outputs.docsVersion }} - run: | - wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixOrgList.json - wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixCollect.json - wget -qO FixMutate.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixMutate.json - - - name: Wait for GCP policies to be uploaded - if: steps.release.outputs.prerelease == 'false' && github.event_name != 'workflow_dispatch' - uses: lewagon/wait-on-check-action@v1.3.1 - with: - ref: ${{ github.ref }} - check-name: gcp - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Update GCP policy JSON - if: steps.release.outputs.prerelease == 'false' - shell: bash - working-directory: ./inventory.fix.security/iam/gcp/${{ steps.release.outputs.docsVersion }} - run: | - wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/${{ steps.release.outputs.tag }}/fix_access.json - wget -qO fix_mutate.json https://cdn.some.engineering/fix/gcp/${{ steps.release.outputs.tag }}/fix_mutate.json +# - name: Wait for AWS policies to be uploaded +# if: steps.release.outputs.prerelease == 'false' && github.event_name != 'workflow_dispatch' +# uses: lewagon/wait-on-check-action@v1.3.1 +# with: +# ref: ${{ github.ref }} +# check-name: aws +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# +# - name: Update AWS policy JSON +# if: steps.release.outputs.prerelease == 'false' +# shell: bash +# working-directory: ./inventory.fix.security/iam/aws/${{ steps.release.outputs.docsVersion }} +# run: | +# wget -qO FixOrgList.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixOrgList.json +# wget -qO FixCollect.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixCollect.json +# wget -qO FixMutate.json https://cdn.some.engineering/fix/aws/${{ steps.release.outputs.tag }}/FixMutate.json +# +# - name: Wait for GCP policies to be uploaded +# if: steps.release.outputs.prerelease == 'false' && github.event_name != 'workflow_dispatch' +# uses: lewagon/wait-on-check-action@v1.3.1 +# with: +# ref: ${{ github.ref }} +# check-name: gcp +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# +# - name: Update GCP policy JSON +# if: steps.release.outputs.prerelease == 'false' +# shell: bash +# working-directory: ./inventory.fix.security/iam/gcp/${{ steps.release.outputs.docsVersion }} +# run: | +# wget -qO fix_access.json https://cdn.some.engineering/fix/gcp/${{ steps.release.outputs.tag }}/fix_access.json +# wget -qO fix_mutate.json https://cdn.some.engineering/fix/gcp/${{ steps.release.outputs.tag }}/fix_mutate.json - name: Modify Docker Compose YAML if: steps.release.outputs.prerelease == 'false' From dd5ed5245449de464fa169cc024f2c83c598a722 Mon Sep 17 00:00:00 2001 From: 1101-1 <70093559+1101-1@users.noreply.github.com> Date: Thu, 12 Dec 2024 12:46:51 +0500 Subject: [PATCH 4/5] [azure][feat] Rewrite metrics collection to make it better (#2298) Co-authored-by: Matthias Veit --- plugins/azure/fix_plugin_azure/collector.py | 15 ++ .../azure/fix_plugin_azure/resource/base.py | 81 +++++- .../fix_plugin_azure/resource/compute.py | 232 ++++++++--------- .../resource/machinelearning.py | 6 - .../fix_plugin_azure/resource/metrics.py | 204 +++++++-------- .../azure/fix_plugin_azure/resource/mysql.py | 6 - .../fix_plugin_azure/resource/postgresql.py | 6 - .../fix_plugin_azure/resource/storage.py | 237 ++++++++++-------- plugins/azure/fix_plugin_azure/utils.py | 20 +- plugins/azure/test/compute_test.py | 1 + plugins/azure/test/metric_test.py | 32 ++- 11 files changed, 435 insertions(+), 405 deletions(-) diff --git a/plugins/azure/fix_plugin_azure/collector.py b/plugins/azure/fix_plugin_azure/collector.py index 228f2679ca..4956b58ca6 100644 --- a/plugins/azure/fix_plugin_azure/collector.py +++ b/plugins/azure/fix_plugin_azure/collector.py @@ -39,6 +39,7 @@ MicrosoftGraphOrganizationRoot, ) from fix_plugin_azure.resource.monitor import resources as monitor_resources +from fix_plugin_azure.resource.metrics import AzureMetricData from fix_plugin_azure.resource.mysql import AzureMysqlServerType, resources as mysql_resources from fix_plugin_azure.resource.network import ( AzureNetworkExpressRoutePortsLocation, @@ -159,6 +160,14 @@ def get_last_run() -> Optional[datetime]: for after_collect in builder.after_collect_actions: after_collect() + if builder.config.collect_usage_metrics: + try: + log.info(f"[Azure:{self.account.safe_name}] Collect usage metrics.") + self.collect_usage_metrics(builder) + builder.executor.wait_for_submitted_work() + except Exception as e: + log.warning(f"[Azure] Failed to collect usage metrics in project {self.account.safe_name}: {e}") + # connect nodes log.info(f"[Azure:{self.account.safe_name}] Connect resources and create edges.") for node, data in list(self.graph.nodes(data=True)): @@ -184,6 +193,12 @@ def get_last_run() -> Optional[datetime]: self.core_feedback.progress_done(self.account.id, 1, 1, context=[self.cloud.id]) log.info(f"[Azure:{self.account.safe_name}] Collecting resources done.") + def collect_usage_metrics(self, builder: GraphBuilder) -> None: + for resource in builder.graph.nodes: + if isinstance(resource, MicrosoftResource) and (mq := resource.collect_usage_metrics(builder)): + start_at = builder.created_at - builder.metrics_delta + AzureMetricData.query_for(builder, resource, mq, start_at, builder.created_at) + def collect_resource_list( self, name: str, builder: GraphBuilder, resources: List[Type[MicrosoftResource]] ) -> Future[None]: diff --git a/plugins/azure/fix_plugin_azure/resource/base.py b/plugins/azure/fix_plugin_azure/resource/base.py index fea5892192..e3d7fdf374 100644 --- a/plugins/azure/fix_plugin_azure/resource/base.py +++ b/plugins/azure/fix_plugin_azure/resource/base.py @@ -3,9 +3,10 @@ import logging from concurrent.futures import Future from datetime import datetime, timedelta -from typing import Any, ClassVar, Dict, Optional, TypeVar, List, Type, Callable, cast, Union, Set +from typing import Any, ClassVar, Dict, Optional, TypeVar, List, Type, Tuple, Callable, cast, Union, Set from attr import define, field +from attrs import frozen from azure.identity import DefaultAzureCredential from fix_plugin_azure.azure_client import AzureResourceSpec, MicrosoftClient, MicrosoftRestSpec @@ -20,6 +21,9 @@ BaseRegion, ModelReference, PhantomBaseResource, + StatName, + MetricName, + MetricUnit, ) from fixlib.config import current_config from fixlib.core.actions import CoreFeedback @@ -187,12 +191,9 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: # Default behavior: add resource to the namespace pass - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResourceType], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: # Default behavior: do nothing - pass + return [] @classmethod def collect_resources( @@ -203,13 +204,7 @@ def collect_resources( if spec := cls.api_spec: try: items = builder.client.list(spec, **kwargs) - collected = cls.collect(items, builder) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected + return cls.collect(items, builder) except Exception as e: msg = f"Error while collecting {cls.__name__} with service {spec.service} and location: {builder.location}: {e}" builder.core_feedback.info(msg, log) @@ -1008,6 +1003,66 @@ def with_location(self, location: BaseRegion) -> GraphBuilder: ) +STAT_MAP: Dict[str, StatName] = { + "minimum": StatName.min, + "average": StatName.avg, + "maximum": StatName.max, +} + + +@frozen(kw_only=True) +class MetricNormalization: + unit: MetricUnit + normalize_value: Callable[[float], float] = lambda x: x + + +@define(hash=True, frozen=True) +class AzureMetricQuery: + metric_name: str + metric_namespace: str + metric_normalization_name: MetricName + ref_id: str + instance_id: str + metric_id: str + aggregation: Tuple[str, ...] + normalization: MetricNormalization + # Optional `start_time` and `period` override defaults for query timespan and interval. + period: Optional[timedelta] = None + start_time: Optional[datetime] = None + unit: str = "Count" + + @staticmethod + def create( + *, + metric_name: str, + metric_namespace: str, + metric_normalization_name: MetricName, + instance_id: str, + ref_id: str, + normalization: MetricNormalization, + aggregation: Tuple[str, ...], + unit: str = "Count", + start_time: Optional[datetime] = None, + period: Optional[timedelta] = None, + metric_id: Optional[str] = None, + ) -> "AzureMetricQuery": + metric_id = f"{instance_id}/providers/Microsoft.Insights/metrics/{metric_name}" + # noinspection PyTypeChecker + return AzureMetricQuery( + metric_name=metric_name, + metric_namespace=metric_namespace, + metric_normalization_name=metric_normalization_name, + instance_id=instance_id, + metric_id=metric_id, + aggregation=aggregation, + ref_id=ref_id, + unit=unit, + normalization=normalization, + period=period, + start_time=start_time, + ) + + resources: List[Type[MicrosoftResource]] = [ AzureResourceGroup, ] diff --git a/plugins/azure/fix_plugin_azure/resource/compute.py b/plugins/azure/fix_plugin_azure/resource/compute.py index ab06cf50db..c4f67cf12c 100644 --- a/plugins/azure/fix_plugin_azure/resource/compute.py +++ b/plugins/azure/fix_plugin_azure/resource/compute.py @@ -17,15 +17,16 @@ AzureExtendedLocation, AzurePrincipalClient, AzurePrivateEndpointConnection, + AzureMetricQuery, ) -from fix_plugin_azure.resource.metrics import AzureMetricData, AzureMetricQuery, update_resource_metrics +from fix_plugin_azure.resource.metrics import NormalizerFactory from fix_plugin_azure.resource.network import ( AzureNetworkSecurityGroup, AzureNetworkSubnet, AzureNetworkInterface, AzureNetworkLoadBalancer, ) -from fix_plugin_azure.utils import MetricNormalization, rgetvalue +from fix_plugin_azure.utils import rgetvalue from fixlib.baseresources import ( BaseInstance, BaseKeyPair, @@ -34,7 +35,6 @@ BaseSnapshot, BaseVolumeType, MetricName, - MetricUnit, VolumeStatus, BaseAutoScalingGroup, InstanceStatus, @@ -1064,11 +1064,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureC AzureComputeDisk._collect_disk_types(builder, d_loc) # Create additional custom disk sizes for disks with Ultra SSD or Premium SSD v2 types AzureComputeDiskType.create_unique_disk_sizes(disks, builder, d_loc) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") return collected return [] @@ -1099,61 +1094,48 @@ def collect_disk_types() -> None: graph_builder.submit_work(service_name, collect_disk_types) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - volumes = {volume.id: volume for volume in collected_resources if volume} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: + volume_id = self.id queries = [] - start = builder.metrics_start - now = builder.created_at - delta = builder.metrics_delta - for volume_id in volumes: - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="microsoft.compute/disks", - instance_id=volume_id, - aggregation=("average",), - ref_id=volume_id, - unit="BytesPerSecond", - ) - for metric_name in ["Composite Disk Write Bytes/sec", "Composite Disk Read Bytes/sec"] + + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="microsoft.compute/disks", + metric_normalization_name=metric_name, + instance_id=volume_id, + aggregation=("average",), + ref_id=volume_id, + unit="BytesPerSecond", + normalization=NormalizerFactory.bytes, + ) + for name, metric_name in [ + ("Composite Disk Write Bytes/sec", MetricName.VolumeWrite), + ("Composite Disk Read Bytes/sec", MetricName.VolumeRead), ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="microsoft.compute/disks", - instance_id=volume_id, - aggregation=("average",), - ref_id=volume_id, - unit="CountPerSecond", - ) - for metric_name in ["Composite Disk Write Operations/sec", "Composite Disk Read Operations/sec"] + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="microsoft.compute/disks", + metric_normalization_name=metric_name, + instance_id=volume_id, + aggregation=("average",), + ref_id=volume_id, + unit="CountPerSecond", + normalization=NormalizerFactory.iops, + ) + for name, metric_name in [ + ("Composite Disk Write Operations/sec", MetricName.VolumeWrite), + ("Composite Disk Read Operations/sec", MetricName.VolumeRead), ] - ) - - metric_normalizers = { - "Composite Disk Write Bytes/sec": MetricNormalization( - metric_name=MetricName.VolumeWrite, unit=MetricUnit.Bytes - ), - "Composite Disk Read Bytes/sec": MetricNormalization( - metric_name=MetricName.VolumeRead, unit=MetricUnit.Bytes - ), - "Composite Disk Write Operations/sec": MetricNormalization( - metric_name=MetricName.VolumeWrite, unit=MetricUnit.IOPS - ), - "Composite Disk Read Operations/sec": MetricNormalization( - metric_name=MetricName.VolumeRead, unit=MetricUnit.IOPS - ), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) + ] + ) - update_resource_metrics(volumes, metric_result, metric_normalizers) + return queries @staticmethod def _get_nearest_size(size: int, lookup_map: Dict[int, Any]) -> int: @@ -2983,12 +2965,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureC # Collect VM sizes for the VM in this location AzureComputeVirtualMachineBase._collect_vm_sizes(builder, location) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected return [] @@ -3017,83 +2993,75 @@ def collect_vm_sizes() -> None: graph_builder.submit_work(service_name, collect_vm_sizes) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - virtual_machines = {vm.id: vm for vm in collected_resources if vm} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: + vm_id = self.id queries = [] - start = builder.metrics_start - now = builder.created_at - delta = builder.metrics_delta - for vm_id in virtual_machines: - queries.append( + + queries.append( + AzureMetricQuery.create( + metric_name="Percentage CPU", + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=MetricName.CpuUtilization, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="Percent", + normalization=NormalizerFactory.percent, + ) + ) + queries.extend( + [ AzureMetricQuery.create( - metric_name="Percentage CPU", + metric_name=name, metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, instance_id=vm_id, aggregation=("average", "minimum", "maximum"), ref_id=vm_id, - unit="Percent", + unit="Bytes", + normalization=NormalizerFactory.bytes, ) - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="Bytes", - ) - for metric_name in ["Disk Write Bytes", "Disk Read Bytes"] + for name, metric_name in [ + ("Disk Write Bytes", MetricName.DiskWrite), + ("Disk Read Bytes", MetricName.DiskRead), ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="CountPerSecond", - ) - for metric_name in ["Disk Write Operations/Sec", "Disk Read Operations/Sec"] - ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="Bytes", - ) - for metric_name in ["Network In", "Network Out"] + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="CountPerSecond", + normalization=NormalizerFactory.iops, + ) + for name, metric_name in [ + ("Disk Write Operations/Sec", MetricName.DiskWrite), + ("Disk Read Operations/Sec", MetricName.DiskRead), ] - ) - - metric_normalizers = { - "Percentage CPU": MetricNormalization( - metric_name=MetricName.CpuUtilization, - unit=MetricUnit.Percent, - normalize_value=lambda x: round(x, ndigits=3), - ), - "Network In": MetricNormalization(metric_name=MetricName.NetworkIn, unit=MetricUnit.Bytes), - "Network Out": MetricNormalization(metric_name=MetricName.NetworkOut, unit=MetricUnit.Bytes), - "Disk Read Operations/Sec": MetricNormalization(metric_name=MetricName.DiskRead, unit=MetricUnit.IOPS), - "Disk Write Operations/Sec": MetricNormalization(metric_name=MetricName.DiskWrite, unit=MetricUnit.IOPS), - "Disk Read Bytes": MetricNormalization(metric_name=MetricName.DiskRead, unit=MetricUnit.Bytes), - "Disk Write Bytes": MetricNormalization(metric_name=MetricName.DiskWrite, unit=MetricUnit.Bytes), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + ) + for name, metric_name in [("Network In", MetricName.NetworkIn), ("Network Out", MetricName.NetworkOut)] + ] + ) - update_resource_metrics(virtual_machines, metric_result, metric_normalizers) + return queries def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: if placement_group_id := self.proximity_placement_group: diff --git a/plugins/azure/fix_plugin_azure/resource/machinelearning.py b/plugins/azure/fix_plugin_azure/resource/machinelearning.py index 3ef4599e02..b46647eaac 100644 --- a/plugins/azure/fix_plugin_azure/resource/machinelearning.py +++ b/plugins/azure/fix_plugin_azure/resource/machinelearning.py @@ -604,12 +604,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureM # Collect VM sizes for the compute resources in this location cls._collect_vm_sizes(builder, location, compute_resources) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/metrics.py b/plugins/azure/fix_plugin_azure/resource/metrics.py index e22b243f28..d91221d7af 100644 --- a/plugins/azure/fix_plugin_azure/resource/metrics.py +++ b/plugins/azure/fix_plugin_azure/resource/metrics.py @@ -1,6 +1,6 @@ from copy import deepcopy from datetime import datetime, timedelta -from concurrent.futures import as_completed +from functools import cached_property import logging from typing import ClassVar, Dict, Optional, List, Tuple, TypeVar @@ -11,9 +11,14 @@ from attr import define, field from fix_plugin_azure.azure_client import AzureResourceSpec -from fix_plugin_azure.resource.base import GraphBuilder -from fix_plugin_azure.utils import MetricNormalization -from fixlib.baseresources import BaseResource +from fix_plugin_azure.resource.base import ( + GraphBuilder, + AzureMetricQuery, + MetricNormalization, + STAT_MAP, + MicrosoftResource, +) +from fixlib.baseresources import BaseResource, MetricUnit from fixlib.json import from_json from fixlib.json_bender import Bender, S, ForallBend, Bend, bend from fixlib.utils import utc_str @@ -93,39 +98,6 @@ class AzureMetricValue: timeseries: Optional[List[AzureMetricTimeSeries]] = field(default=None) -@define(hash=True, frozen=True) -class AzureMetricQuery: - metric_name: str - metric_namespace: str - ref_id: str - instance_id: str - metric_id: str - aggregation: Tuple[str, ...] - unit: str = "Count" - - @staticmethod - def create( - metric_name: str, - metric_namespace: str, - instance_id: str, - ref_id: str, - aggregation: Tuple[str, ...], - metric_id: Optional[str] = None, - unit: str = "Count", - ) -> "AzureMetricQuery": - metric_id = f"{instance_id}/providers/Microsoft.Insights/metrics/{metric_name}" - # noinspection PyTypeChecker - return AzureMetricQuery( - metric_name=metric_name, - metric_namespace=metric_namespace, - instance_id=instance_id, - metric_id=metric_id, - aggregation=aggregation, - ref_id=ref_id, - unit=unit, - ) - - @define(eq=False, slots=False) class AzureMetricData: kind: ClassVar[str] = "azure_metric" @@ -193,27 +165,11 @@ def compute_interval(delta: timedelta) -> str: @staticmethod def query_for( builder: GraphBuilder, + resource: MicrosoftResource, queries: List[AzureMetricQuery], start_time: datetime, end_time: datetime, - delta: timedelta, - ) -> "Dict[AzureMetricQuery, AzureMetricData]": - """ - A static method to query Azure metrics for multiple queries simultaneously. - - Args: - builder (GraphBuilder): An instance of GraphBuilder used for submitting work. - queries (List[AzureMetricQuery]): A list of AzureMetricQuery objects representing the metrics to query. - start_time (datetime): The start time for the metrics query. - end_time (datetime): The end time for the metrics query. - delta (timedelta): The delta over which to aggregate the metrics. - - Returns: - Dict[AzureMetricQuery, AzureMetricData]: A dictionary mapping each query to its corresponding metric data. - """ - # Create a lookup dictionary for efficient mapping of metric IDs to queries - lookup = {q.metric_id: q for q in queries} - result: Dict[AzureMetricQuery, AzureMetricData] = {} + ) -> None: # Define API specifications for querying Azure metrics api_spec = AzureResourceSpec( @@ -234,49 +190,38 @@ def query_for( access_path=None, expect_array=False, ) - # Define the timespan and interval for the query - timespan = f"{utc_str(start_time)}/{utc_str(end_time)}" - interval = AzureMetricData.compute_interval(delta) # Submit queries for each AzureMetricQuery - futures = [] for query in queries: - future = builder.submit_work( + builder.submit_work( service_name, AzureMetricData._query_for_single, builder, query, api_spec, - timespan, - interval, + start_time, + end_time, + resource, ) - futures.append(future) - - # Retrieve results from submitted queries and populate the result dictionary - for future in as_completed(futures): - try: - metric, metric_id = future.result() - if metric is not None and metric_id is not None: - result[lookup[metric_id]] = metric - except Exception as e: - log.warning(f"An error occurred while processing a metric query: {e}") - raise e - return result @staticmethod def _query_for_single( builder: GraphBuilder, query: AzureMetricQuery, api_spec: AzureResourceSpec, - timespan: str, - interval: str, - ) -> "Tuple[Optional[AzureMetricData], Optional[str]]": + start_time: datetime, + end_time: datetime, + resource: MicrosoftResource, + ) -> None: try: local_api_spec = deepcopy(api_spec) # Set the path for the API call based on the instance ID of the query local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics" # Retrieve metric data from the API aggregation = ",".join(query.aggregation) + # Define the timespan and interval for the query + timespan = f"{utc_str(query.start_time or start_time)}/{utc_str(end_time)}" + interval = AzureMetricData.compute_interval(query.period or builder.metrics_delta) part = builder.client.list( local_api_spec, metricnames=query.metric_name, @@ -291,37 +236,94 @@ def _query_for_single( for single in part: metric: AzureMetricData = from_json(bend(AzureMetricData.mapping, single), AzureMetricData) metric.set_values(query.aggregation) - metric_id = metric.metric_id - if metric_id is not None: - return metric, metric_id - return None, None + update_resource_metrics(resource, query, metric) except HttpResponseError as e: # Handle unsupported metric namespace error - log.warning(f"Request error occurred: {e}.") - return None, None + log.warning(f"Request error occurredwhile processing metrics: {e}.") except Exception as e: - raise e + log.warning(f"An error occurred while processing metrics: {e}.") V = TypeVar("V", bound=BaseResource) def update_resource_metrics( - resources_map: Dict[str, V], - metric_result: Dict[AzureMetricQuery, AzureMetricData], - metric_normalizers: Dict[str, MetricNormalization], + resource: MicrosoftResource, + query: AzureMetricQuery, + metric: AzureMetricData, ) -> None: - for query, metric in metric_result.items(): - resource = resources_map.get(query.ref_id) - if resource is None: - continue - metric_data = metric.metric_values - if metric_data: - for aggregation, metric_value in metric_data.items(): - normalizer = metric_normalizers.get(query.metric_name) - if not normalizer: - continue - name = normalizer.metric_name + "_" + normalizer.unit - value = metric_normalizers[query.metric_name].normalize_value(metric_value) - - resource._resource_usage[name][normalizer.stat_map[aggregation]] = value + + metric_data = metric.metric_values + normalizer = query.normalization + if metric_data: + for aggregation, metric_value in metric_data.items(): + name = query.metric_normalization_name + "_" + normalizer.unit + value = normalizer.normalize_value(metric_value) + stat_name = STAT_MAP.get(aggregation) + try: + if stat_name: + resource._resource_usage[name][str(stat_name)] = value + except KeyError as e: + log.warning(f"An error occurred while setting metric values: {e}") + raise + + +class __NormalizerFactory: + __instance = None + + def __new__(cls) -> "__NormalizerFactory": + if cls.__instance is None: + cls.__instance = super().__new__(cls) + return cls.__instance + + @cached_property + def count(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Count, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Bytes, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes_per_second(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.BytesPerSecond, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def iops(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.IOPS, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def seconds(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Seconds, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def milliseconds(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Milliseconds, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def percent(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Percent, + normalize_value=lambda x: round(x, ndigits=4), + ) + + +NormalizerFactory = __NormalizerFactory() diff --git a/plugins/azure/fix_plugin_azure/resource/mysql.py b/plugins/azure/fix_plugin_azure/resource/mysql.py index d9e928ec34..945eda6560 100644 --- a/plugins/azure/fix_plugin_azure/resource/mysql.py +++ b/plugins/azure/fix_plugin_azure/resource/mysql.py @@ -753,12 +753,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureM # Collect MySQL server types for the servers in this group AzureMysqlServer._collect_mysql_server_types(builder, location, sku_name, sku_tier, version) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__} in {location}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/postgresql.py b/plugins/azure/fix_plugin_azure/resource/postgresql.py index 4f5df0f35e..42e06a5241 100644 --- a/plugins/azure/fix_plugin_azure/resource/postgresql.py +++ b/plugins/azure/fix_plugin_azure/resource/postgresql.py @@ -636,12 +636,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureP # Collect PostgreSQL server types for the servers in this group AzurePostgresqlServer._collect_postgresql_server_types(builder, location, sku_name, sku_tier, version) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__} in {location}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/storage.py b/plugins/azure/fix_plugin_azure/resource/storage.py index 256c3e43a2..ae19c8fa09 100644 --- a/plugins/azure/fix_plugin_azure/resource/storage.py +++ b/plugins/azure/fix_plugin_azure/resource/storage.py @@ -8,22 +8,20 @@ from fix_plugin_azure.resource.base import ( AzureBaseUsage, MicrosoftResource, - MicrosoftResourceType, GraphBuilder, AzureExtendedLocation, AzureSku, AzureManagedServiceIdentity, AzurePrivateEndpointConnection, + AzureMetricQuery, ) -from fix_plugin_azure.resource.metrics import AzureMetricData, AzureMetricQuery, update_resource_metrics -from fix_plugin_azure.utils import MetricNormalization +from fix_plugin_azure.resource.metrics import NormalizerFactory from fixlib.baseresources import ( BaseBucket, BaseNetworkShare, BaseQueue, EdgeType, MetricName, - MetricUnit, ModelReference, PhantomBaseResource, QueueType, @@ -945,11 +943,7 @@ def sku_filter(sku: AzureStorageSku) -> bool: clazz=AzureStorageSku, ) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - accounts = {storage_acc.id: storage_acc for storage_acc in collected_resources} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: queries = [] start = builder.metrics_start now = builder.created_at @@ -958,117 +952,140 @@ def collect_usage_metrics( if delta.total_seconds() < 3600: delta = timedelta(hours=1) start = now - delta - for account_id in accounts: - blob_instance_id = account_id + "/blobServices/default" - file_instance_id = account_id + "/fileServices/default" - table_instance_id = account_id + "/tableServices/default" - queue_instance_id = account_id + "/queueServices/default" - queries.append( - AzureMetricQuery.create( - metric_name="UsedCapacity", - metric_namespace="microsoft.storage/storageaccounts", - instance_id=account_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + account_id = self.id + blob_instance_id = account_id + "/blobServices/default" + file_instance_id = account_id + "/fileServices/default" + table_instance_id = account_id + "/tableServices/default" + queue_instance_id = account_id + "/queueServices/default" + + queries.append( + AzureMetricQuery.create( + metric_name="UsedCapacity", + metric_namespace="microsoft.storage/storageaccounts", + metric_normalization_name=MetricName.UsedCapacity, + instance_id=account_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="TableCapacity", - metric_namespace="microsoft.storage/storageaccounts/tableservices", - instance_id=table_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="TableCapacity", + metric_namespace="microsoft.storage/storageaccounts/tableservices", + metric_normalization_name=MetricName.TableCapacity, + instance_id=table_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="TableCount", - metric_namespace="microsoft.storage/storageaccounts/tableservices", - instance_id=table_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="TableCount", + metric_namespace="microsoft.storage/storageaccounts/tableservices", + metric_normalization_name=MetricName.TableCount, + instance_id=table_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="QueueCapacity", - metric_namespace="microsoft.storage/storageaccounts/queueservices", - instance_id=queue_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="QueueCapacity", + metric_namespace="microsoft.storage/storageaccounts/queueservices", + metric_normalization_name=MetricName.QueueCapacity, + instance_id=queue_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="QueueCount", - metric_namespace="microsoft.storage/storageaccounts/queueservices", - instance_id=queue_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="QueueCount", + metric_namespace="microsoft.storage/storageaccounts/queueservices", + metric_normalization_name=MetricName.QueueCount, + instance_id=queue_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="FileCapacity", - metric_namespace="microsoft.storage/storageaccounts/fileservices", - instance_id=file_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="FileCapacity", + metric_namespace="microsoft.storage/storageaccounts/fileservices", + metric_normalization_name=MetricName.FileCapacity, + instance_id=file_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="FileCount", - metric_namespace="microsoft.storage/storageaccounts/fileservices", - instance_id=file_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="FileCount", + metric_namespace="microsoft.storage/storageaccounts/fileservices", + metric_normalization_name=MetricName.FileCount, + instance_id=file_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="BlobCapacity", - metric_namespace="microsoft.storage/storageaccounts/blobservices", - instance_id=blob_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="BlobCapacity", + metric_namespace="microsoft.storage/storageaccounts/blobservices", + metric_normalization_name=MetricName.BlobCapacity, + instance_id=blob_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="BlobCount", - metric_namespace="microsoft.storage/storageaccounts/blobservices", - instance_id=blob_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="BlobCount", + metric_namespace="microsoft.storage/storageaccounts/blobservices", + metric_normalization_name=MetricName.BlobCount, + instance_id=blob_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) + ) - metric_normalizers = { - "UsedCapacity": MetricNormalization(metric_name=MetricName.UsedCapacity, unit=MetricUnit.Bytes), - "TableCapacity": MetricNormalization(metric_name=MetricName.TableCapacity, unit=MetricUnit.Bytes), - "TableCount": MetricNormalization(metric_name=MetricName.TableCount, unit=MetricUnit.Count), - "QueueCapacity": MetricNormalization(metric_name=MetricName.QueueCapacity, unit=MetricUnit.Bytes), - "QueueCount": MetricNormalization(metric_name=MetricName.QueueCount, unit=MetricUnit.Count), - "FileCapacity": MetricNormalization(metric_name=MetricName.FileCapacity, unit=MetricUnit.Bytes), - "FileCount": MetricNormalization(metric_name=MetricName.FileCount, unit=MetricUnit.Count), - "BlobCapacity": MetricNormalization(metric_name=MetricName.BlobCapacity, unit=MetricUnit.Bytes), - "BlobCount": MetricNormalization(metric_name=MetricName.BlobCount, unit=MetricUnit.Count), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) - - update_resource_metrics(accounts, metric_result, metric_normalizers) + return queries @define(eq=False, slots=False) diff --git a/plugins/azure/fix_plugin_azure/utils.py b/plugins/azure/fix_plugin_azure/utils.py index 4f5b33f73a..f3cba0e59b 100644 --- a/plugins/azure/fix_plugin_azure/utils.py +++ b/plugins/azure/fix_plugin_azure/utils.py @@ -1,10 +1,8 @@ import logging from datetime import datetime -from typing import Callable, Dict, TypeVar, Any -from attr import frozen +from typing import Dict, TypeVar, Any import functools -from fixlib.baseresources import StatName, MetricName, MetricUnit from fixlib.json_bender import F T = TypeVar("T") @@ -36,10 +34,6 @@ def rgetvalue(data: Dict[str, Any], key_path: str, default: Any = None) -> Any: return nested_value -def identity(x: T) -> T: - return x - - def case_insensitive_eq(left: T, right: T) -> bool: if isinstance(left, str) and isinstance(right, str): return left.lower() == right.lower() @@ -70,15 +64,3 @@ def set_bool(val: str) -> bool: TimestampToIso = F(lambda x: datetime.fromtimestamp(x).isoformat()) NoneIfEmpty = F(lambda x: x if x else None) - - -@frozen(kw_only=True) -class MetricNormalization: - metric_name: MetricName - unit: MetricUnit - stat_map: Dict[str, StatName] = { - "minimum": StatName.min, - "average": StatName.avg, - "maximum": StatName.max, - } - normalize_value: Callable[[float], float] = identity diff --git a/plugins/azure/test/compute_test.py b/plugins/azure/test/compute_test.py index bf085444da..0319dcacdd 100644 --- a/plugins/azure/test/compute_test.py +++ b/plugins/azure/test/compute_test.py @@ -130,6 +130,7 @@ def test_virtual_machine(builder: GraphBuilder) -> None: def test_virtual_machine_resources(builder: GraphBuilder) -> None: collected = roundtrip_check(AzureComputeVirtualMachine, builder)[0] + builder.executor.wait_for_submitted_work() assert collected.instance_type == "Standard_A1_V2" assert collected.instance_status == InstanceStatus.RUNNING diff --git a/plugins/azure/test/metric_test.py b/plugins/azure/test/metric_test.py index 0a384795dd..313a562dae 100644 --- a/plugins/azure/test/metric_test.py +++ b/plugins/azure/test/metric_test.py @@ -1,7 +1,10 @@ from datetime import timedelta, datetime, timezone -from fix_plugin_azure.resource.base import GraphBuilder +from fix_plugin_azure.resource.base import GraphBuilder, AzureMetricQuery -from fix_plugin_azure.resource.metrics import AzureMetricQuery, AzureMetricData +from fix_plugin_azure.resource.compute import AzureComputeVirtualMachine +from fix_plugin_azure.resource.metrics import AzureMetricData, NormalizerFactory + +from fixlib.baseresources import MetricName def test_metric(builder: GraphBuilder) -> None: @@ -9,17 +12,22 @@ def test_metric(builder: GraphBuilder) -> None: earlier = now - timedelta(days=60) delta = now - earlier resource_id = "/subscriptions/rwqrr2-31f1-rwqrrw-5325-wrq2r/resourceGroups/FOO/providers/Microsoft.Compute/virtualMachines/test1" + vm = AzureComputeVirtualMachine(id=resource_id, name="test1") write = AzureMetricQuery.create( - "Disk Write Operations/Sec", - "Microsoft.Compute/virtualMachines", - resource_id, - resource_id, - ("average", "minimum", "maximum"), + metric_name="Disk Write Operations/Sec", + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=MetricName.DiskWrite, + normalization=NormalizerFactory.iops, + period=delta, + instance_id=resource_id, + ref_id=resource_id, + aggregation=("average", "minimum", "maximum"), unit="CountPerSecond", ) - result = AzureMetricData.query_for(builder=builder, queries=[write], start_time=earlier, end_time=now, delta=delta) - assert result[write].metric_values == { - "average": 247685.56222444447, - "minimum": 291286.29000000004, - "maximum": 193903.44666666666, + AzureMetricData.query_for(builder=builder, resource=vm, queries=[write], start_time=earlier, end_time=now) + builder.executor.wait_for_submitted_work() + assert vm._resource_usage["disk_write_iops"] == { + "avg": 247685.5622, + "min": 291286.2900, + "max": 193903.4467, } From eedfaa544fe1f7a34ff671e0dd00341bf96c95ce Mon Sep 17 00:00:00 2001 From: 1101-1 <70093559+1101-1@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:38:47 +0400 Subject: [PATCH 5/5] [aws][fix] Fix errors associated with AwsBedrock and AwsEcr resources (#2302) --- .../aws/fix_plugin_aws/resource/bedrock.py | 101 +++++++++--------- plugins/aws/fix_plugin_aws/resource/ecr.py | 4 +- .../get-knowledge-base__foo.json | 12 ++- 3 files changed, 59 insertions(+), 58 deletions(-) diff --git a/plugins/aws/fix_plugin_aws/resource/bedrock.py b/plugins/aws/fix_plugin_aws/resource/bedrock.py index c8e5ba4fff..59216fd0ae 100644 --- a/plugins/aws/fix_plugin_aws/resource/bedrock.py +++ b/plugins/aws/fix_plugin_aws/resource/bedrock.py @@ -225,7 +225,7 @@ def add_tags(job: AwsResource) -> None: job.tags.update({tag.get("key"): tag.get("value")}) for js in json: - for result in builder.client.list( + if result := builder.client.get( service_name, "get-custom-model", modelIdentifier=js["modelArn"], @@ -508,7 +508,7 @@ def add_tags(job: AwsResource) -> None: job.tags.update({tag.get("key"): tag.get("value")}) for js in json: - for result in builder.client.list( + if result := builder.client.get( service_name, "get-guardrail", guardrailIdentifier=js["id"], @@ -640,7 +640,7 @@ def add_tags(job: AwsResource) -> None: job.tags.update({tag.get("key"): tag.get("value")}) for js in json: - for result in builder.client.list( + if result := builder.client.get( service_name, "get-model-customization-job", jobIdentifier=js["jobArn"], @@ -840,7 +840,7 @@ def add_tags(job: AwsResource) -> None: job.tags.update({tag.get("key"): tag.get("value")}) for js in json: - for result in builder.client.list( + if result := builder.client.get( service_name, "get-evaluation-job", jobIdentifier=js["jobArn"], @@ -944,33 +944,32 @@ class AwsBedrockAgent(BedrockTaggable, AwsResource): } api_spec: ClassVar[AwsApiSpec] = AwsApiSpec("bedrock-agent", "list-agents", "agentSummaries") mapping: ClassVar[Dict[str, Bender]] = { - "id": S("agent", "agentId"), - "name": S("agent", "agentName"), - "ctime": S("agent", "createdAt"), - "mtime": S("agent", "updatedAt"), - "arn": S("agent", "agentArn"), - "agent_arn": S("agent", "agentArn"), - "agent_id": S("agent", "agentId"), - "agent_name": S("agent", "agentName"), - "agent_resource_role_arn": S("agent", "agentResourceRoleArn"), - "agent_status": S("agent", "agentStatus"), - "agent_version": S("agent", "agentVersion").or_else(S("latestAgentVersion")), - "client_token": S("agent", "clientToken"), - "created_at": S("agent", "createdAt"), - "customer_encryption_key_arn": S("agent", "customerEncryptionKeyArn"), - "description": S("agent", "description"), - "failure_reasons": S("agent", "failureReasons", default=[]), - "foundation_model": S("agent", "foundationModel"), - "guardrail_configuration": S("agent", "guardrailConfiguration") - >> Bend(AwsBedrockGuardrailConfiguration.mapping), - "idle_session_ttl_in_seconds": S("agent", "idleSessionTTLInSeconds"), - "instruction": S("agent", "instruction"), - "memory_configuration": S("agent", "memoryConfiguration") >> Bend(AwsBedrockMemoryConfiguration.mapping), - "prepared_at": S("agent", "preparedAt"), - "prompt_override_configuration": S("agent", "promptOverrideConfiguration") + "id": S("agentId"), + "name": S("agentName"), + "ctime": S("createdAt"), + "mtime": S("updatedAt"), + "arn": S("agentArn"), + "agent_arn": S("agentArn"), + "agent_id": S("agentId"), + "agent_name": S("agentName"), + "agent_resource_role_arn": S("agentResourceRoleArn"), + "agent_status": S("agentStatus"), + "agent_version": S("agentVersion"), + "client_token": S("clientToken"), + "created_at": S("createdAt"), + "customer_encryption_key_arn": S("customerEncryptionKeyArn"), + "description": S("description"), + "failure_reasons": S("failureReasons", default=[]), + "foundation_model": S("foundationModel"), + "guardrail_configuration": S("guardrailConfiguration") >> Bend(AwsBedrockGuardrailConfiguration.mapping), + "idle_session_ttl_in_seconds": S("idleSessionTTLInSeconds"), + "instruction": S("instruction"), + "memory_configuration": S("memoryConfiguration") >> Bend(AwsBedrockMemoryConfiguration.mapping), + "prepared_at": S("preparedAt"), + "prompt_override_configuration": S("promptOverrideConfiguration") >> Bend(AwsBedrockPromptOverrideConfiguration.mapping), - "agent_recommended_actions": S("agent", "recommendedActions", default=[]), - "updated_at": S("agent", "updatedAt"), + "agent_recommended_actions": S("recommendedActions", default=[]), + "updated_at": S("updatedAt"), } agent_arn: Optional[str] = field(default=None, metadata={"description": "The Amazon Resource Name (ARN) of the agent."}) # fmt: skip agent_id: Optional[str] = field(default=None, metadata={"description": "The unique identifier of the agent."}) # fmt: skip @@ -1038,13 +1037,13 @@ def add_tags(agent: AwsResource) -> None: agent.tags.update(tags[0]) for js in json: - for result in builder.client.list( + if result := builder.client.get( "bedrock-agent", "get-agent", + "agent", agentId=js["agentId"], ): if instance := AwsBedrockAgent.from_api(result, builder): - instance.agent_version = js["latestAgentVersion"] builder.add_node(instance, result) builder.submit_work("bedrock-agent", add_tags, instance) @@ -1269,23 +1268,22 @@ class AwsBedrockAgentKnowledgeBase(BedrockTaggable, AwsResource): } api_spec: ClassVar[AwsApiSpec] = AwsApiSpec("bedrock-agent", "list-knowledge-bases", "knowledgeBaseSummaries") mapping: ClassVar[Dict[str, Bender]] = { - "id": S("knowledgeBase", "knowledgeBaseId"), - "name": S("knowledgeBase", "name"), - "arn": S("knowledgeBase", "knowledgeBaseArn"), - "ctime": S("knowledgeBase", "createdAt"), - "mtime": S("knowledgeBase", "updatedAt"), - "created_at": S("knowledgeBase", "createdAt"), - "description": S("knowledgeBase", "description"), - "failure_reasons": S("knowledgeBase", "failureReasons", default=[]), - "knowledge_base_arn": S("knowledgeBase", "knowledgeBaseArn"), - "knowledge_base_configuration": S("knowledgeBase", "knowledgeBaseConfiguration") + "id": S("knowledgeBaseId"), + "name": S("name"), + "arn": S("knowledgeBaseArn"), + "ctime": S("createdAt"), + "mtime": S("updatedAt"), + "created_at": S("createdAt"), + "description": S("description"), + "failure_reasons": S("failureReasons", default=[]), + "knowledge_base_arn": S("knowledgeBaseArn"), + "knowledge_base_configuration": S("knowledgeBaseConfiguration") >> Bend(AwsBedrockKnowledgeBaseConfiguration.mapping), - "knowledge_base_id": S("knowledgeBase", "knowledgeBaseId"), - "role_arn": S("knowledgeBase", "roleArn"), - "status": S("knowledgeBase", "status"), - "storage_configuration": S("knowledgeBase", "storageConfiguration") - >> Bend(AwsBedrockStorageConfiguration.mapping), - "updated_at": S("knowledgeBase", "updatedAt"), + "knowledge_base_id": S("knowledgeBaseId"), + "role_arn": S("roleArn"), + "status": S("status"), + "storage_configuration": S("storageConfiguration") >> Bend(AwsBedrockStorageConfiguration.mapping), + "updated_at": S("updatedAt"), } created_at: Optional[datetime] = field(default=None, metadata={"description": "The time at which the knowledge base was created."}) # fmt: skip description: Optional[str] = field(default=None, metadata={"description": "The description of the knowledge base."}) # fmt: skip @@ -1326,9 +1324,10 @@ def add_tags(knowledge_base: AwsResource) -> None: knowledge_base.tags.update(tags[0]) for js in json: - for result in builder.client.list( + if result := builder.client.get( "bedrock-agent", "get-knowledge-base", + "knowledgeBase", knowledgeBaseId=js["knowledgeBaseId"], ): if instance := cls.from_api(result, builder): @@ -1493,7 +1492,7 @@ def add_tags(prompt: AwsResource) -> None: prompt.tags.update(tags[0]) for js in json: - for result in builder.client.list( + if result := builder.client.get( "bedrock-agent", "get-prompt", promptIdentifier=js["id"], @@ -1834,7 +1833,7 @@ def collect_flow_versions(flow: AwsBedrockAgentFlow) -> None: builder.submit_work("bedrock-agent", add_tags, instance) for js in json: - for result in builder.client.list( + if result := builder.client.get( "bedrock-agent", "get-flow", flowIdentifier=js["id"], diff --git a/plugins/aws/fix_plugin_aws/resource/ecr.py b/plugins/aws/fix_plugin_aws/resource/ecr.py index 2bf480ca1e..391bd996a2 100644 --- a/plugins/aws/fix_plugin_aws/resource/ecr.py +++ b/plugins/aws/fix_plugin_aws/resource/ecr.py @@ -71,8 +71,8 @@ def add_repository_policy(repository: AwsEcrRepository) -> None: service_name, "get-repository-policy", "policyText", - repositoryName=repository.name, expected_errors=["RepositoryPolicyNotFoundException", "RepositoryNotFoundException"], + repositoryName=repository.name, ): repository.repository_policy = sort_json(json_loads(raw_policy), sort_list=True) # type: ignore @@ -83,7 +83,7 @@ def fetch_lifecycle_policy(repository: AwsEcrRepository) -> None: "get-lifecycle-policy", "lifecyclePolicyText", repositoryName=repository.name, - expected_errors=["LifecyclePolicyNotFoundException"], + expected_errors=["LifecyclePolicyNotFoundException", "RepositoryNotFoundException"], ): repository.lifecycle_policy = sort_json(json.loads(policy), sort_list=True) # type: ignore diff --git a/plugins/aws/test/resources/files/bedrock_agent/get-knowledge-base__foo.json b/plugins/aws/test/resources/files/bedrock_agent/get-knowledge-base__foo.json index fa561e8029..e45487ca6d 100644 --- a/plugins/aws/test/resources/files/bedrock_agent/get-knowledge-base__foo.json +++ b/plugins/aws/test/resources/files/bedrock_agent/get-knowledge-base__foo.json @@ -1,7 +1,9 @@ { - "description": "foo", - "knowledgeBaseId": "foo", - "name": "foo", - "status": "ACTIVE", - "updatedAt": "2024-09-17T12:11:47Z" + "knowledgeBase": { + "description": "foo", + "knowledgeBaseId": "foo", + "name": "foo", + "status": "ACTIVE", + "updatedAt": "2024-09-17T12:11:47Z" + } } \ No newline at end of file