Skip to content

Commit

Permalink
[Integration][GCP] Fixed issues preventing the proper processing of r…
Browse files Browse the repository at this point in the history
…eal-time events (#1247)

# Description

What - Updated the `should_use_snake_case` function to support real time
event processing by allowing it to use matching_resource_configs when
processing real-time events. Resync processes continue using the
existing implementation with get_current_resource_config()

Why - The previous implementation relied solely on
get_current_resource_config(), which is unsuitable for webhook event
processing because the `event.resource_config` used for resync events
returns `None` during realtime events

How - Modified the `should_use_snake_case` function to accept an
optional `matching_resource_configs` parameter and also updated the
`feed_events_callback` function to `pass matching_resource_configs` to
`should_use_snake_case` during webhook processing

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: PagesCoffy <[email protected]>
Co-authored-by: Michael Kofi Armah <[email protected]>
Co-authored-by: MatanGevaPort <[email protected]>
  • Loading branch information
4 people authored Dec 28, 2024
1 parent 428ee8c commit 78bfa53
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 51 deletions.
7 changes: 7 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.1.83 (2024-12-29)


### Bug Fixes

- Fixed the issue with `get_current_resource_config()` and the `preserveApiResponseCaseStyle` selector in real-time event which leads to the failure of the event processing


## 0.1.82 (2024-12-26)

Expand Down
6 changes: 5 additions & 1 deletion integrations/gcp/gcp_core/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
ResourceConfig,
Selector,
)
from pydantic import Field
from pydantic import Field, BaseModel


class GCPCloudResourceSelector(Selector):
Expand Down Expand Up @@ -39,3 +39,7 @@ class GCPPortAppConfig(PortAppConfig):
resources: list[GCPCloudResourceConfig | GCPResourceConfig | ResourceConfig] = (
Field(default_factory=list)
)


class ProtoConfig(BaseModel):
preserving_proto_field_name: typing.Optional[bool] = None
71 changes: 49 additions & 22 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Optional
import typing

from google.api_core.exceptions import NotFound, PermissionDenied
Expand All @@ -15,7 +15,6 @@
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result

from gcp_core.errors import ResourceNotFoundError
from gcp_core.utils import (
EXTRA_PROJECT_FIELD,
Expand All @@ -28,6 +27,7 @@
from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT
from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS
from asyncio import BoundedSemaphore
from gcp_core.overrides import ProtoConfig

DEFAULT_SEMAPHORE = BoundedSemaphore(MAXIMUM_CONCURRENT_REQUESTS)

Expand Down Expand Up @@ -213,34 +213,46 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE:
yield organizations


async def get_single_project(project_name: str) -> RAW_ITEM:
async def get_single_project(
project_name: str, config: Optional[ProtoConfig] = None
) -> RAW_ITEM:
async with ProjectsAsyncClient() as projects_client:
return parse_protobuf_message(
await projects_client.get_project(
name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT
)
),
config,
)


async def get_single_folder(folder_name: str) -> RAW_ITEM:
async def get_single_folder(
folder_name: str, config: Optional[ProtoConfig] = None
) -> RAW_ITEM:
async with FoldersAsyncClient() as folders_client:
return parse_protobuf_message(
await folders_client.get_folder(
name=folder_name, timeout=DEFAULT_REQUEST_TIMEOUT
)
),
config,
)


async def get_single_organization(organization_name: str) -> RAW_ITEM:
async def get_single_organization(
organization_name: str, config: Optional[ProtoConfig] = None
) -> RAW_ITEM:
async with OrganizationsAsyncClient() as organizations_client:
return parse_protobuf_message(
await organizations_client.get_organization(
name=organization_name, timeout=DEFAULT_REQUEST_TIMEOUT
)
),
config,
)


async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
async def get_single_topic(
topic_id: str,
config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
"""
The Topics are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the PublisherAsyncClient is used, ignoring state in assets inventory
Expand All @@ -249,11 +261,15 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
return parse_protobuf_message(
await async_publisher_client.get_topic(
topic=topic_id, timeout=DEFAULT_REQUEST_TIMEOUT
)
),
config,
)


async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM:
async def get_single_subscription(
subscription_id: str,
config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the SubscriberAsyncClient is used, ignoring state in assets inventory
Expand All @@ -262,7 +278,8 @@ async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_
return parse_protobuf_message(
await async_subscriber_client.get_subscription(
subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT
)
),
config,
)


Expand All @@ -287,35 +304,45 @@ async def search_single_resource(


async def feed_event_to_resource(
asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any]
asset_type: str,
asset_name: str,
project_id: str,
asset_data: dict[str, Any],
config: Optional[ProtoConfig] = None,
) -> RAW_ITEM:
resource = None
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id, config)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
resource = await get_single_topic(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
resource = await get_single_subscription(topic_name, config)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_folder(folder_id)
resource = await get_single_folder(folder_id, config)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_organization(organization_id)
resource = await get_single_organization(organization_id, config)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id)
resource = await get_single_project(project_id, config)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(
project_id, config
)
return resource
16 changes: 12 additions & 4 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import os
import typing
from collections.abc import MutableSequence
from typing import Any, TypedDict, Tuple

from typing import Any, TypedDict, Tuple, Optional
from gcp_core.errors import ResourceNotFoundError
from loguru import logger
import proto # type: ignore
from port_ocean.context.event import event
from port_ocean.core.handlers.port_app_config.models import ResourceConfig

from gcp_core.overrides import GCPCloudResourceConfig
from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
from port_ocean.context.ocean import ocean
import json
from pathlib import Path
Expand Down Expand Up @@ -82,15 +81,24 @@ def should_use_snake_case() -> bool:
Returns:
bool: True to use snake_case, False to preserve API's original case style
"""

selector = get_current_resource_config().selector
preserve_api_case = getattr(selector, "preserve_api_response_case_style", False)
return not preserve_api_case


def parse_protobuf_message(message: proto.Message) -> dict[str, Any]:
def parse_protobuf_message(
message: proto.Message,
config: Optional[ProtoConfig] = None,
) -> dict[str, Any]:
"""
Parse protobuf message to dict, controlling field name case style.
"""
if config and config.preserving_proto_field_name is not None:
use_snake_case = not config.preserving_proto_field_name
return proto.Message.to_dict(
message, preserving_proto_field_name=use_snake_case
)
use_snake_case = should_use_snake_case()
return proto.Message.to_dict(message, preserving_proto_field_name=use_snake_case)

Expand Down
53 changes: 41 additions & 12 deletions integrations/gcp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from fastapi import Request, Response
from loguru import logger

from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE

Expand All @@ -13,7 +14,13 @@
GotFeedCreatedSuccessfullyMessageError,
)
from gcp_core.feed_event import get_project_name_from_ancestors, parse_asset_data
from gcp_core.overrides import GCPCloudResourceSelector
from gcp_core.overrides import (
GCPCloudResourceSelector,
GCPPortAppConfig,
GCPResourceSelector,
ProtoConfig,
)
from port_ocean.context.event import event
from gcp_core.search.iterators import iterate_per_available_project
from gcp_core.search.resource_searches import (
feed_event_to_resource,
Expand Down Expand Up @@ -180,19 +187,41 @@ async def feed_events_callback(request: Request) -> Response:
logger.info(
f"Got Real-Time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}"
)
asset_resource_data = await feed_event_to_resource(
asset_type, asset_name, asset_project, asset_data
)
if asset_data.get("deleted") is True:
logger.info(
f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
resource_configs = typing.cast(
GCPPortAppConfig, event.port_app_config
).resources
matching_resource_configs = [
resource_config
for resource_config in resource_configs
if (
resource_config.kind == asset_type
and isinstance(resource_config.selector, GCPResourceSelector)
)
]
for matching_resource_config in matching_resource_configs:
selector = matching_resource_config.selector
config = ProtoConfig(
preserving_proto_field_name=bool(
getattr(selector, "preserve_api_response_case_style", False)
)
)
await ocean.unregister_raw(asset_type, [asset_resource_data])
else:
logger.info(
f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
asset_resource_data = await feed_event_to_resource(
asset_type,
asset_name,
asset_project,
asset_data,
config,
)
await ocean.register_raw(asset_type, [asset_resource_data])
if asset_data.get("deleted") is True:
logger.info(
f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port"
)
await ocean.unregister_raw(asset_type, [asset_resource_data])
else:
logger.info(
f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port"
)
await ocean.register_raw(asset_type, [asset_resource_data])
except AssetHasNoProjectAncestorError:
logger.exception(
f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet."
Expand Down
2 changes: 1 addition & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
version = "0.1.82"
version = "0.1.83"
description = "A GCP ocean integration"
authors = ["Matan Geva <[email protected]>"]

Expand Down
14 changes: 3 additions & 11 deletions integrations/gcp/tests/gcp_core/search/test_resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,10 @@ async def test_get_single_subscription(
"state": 0,
"topic": "",
}
mock_project = "project_name"

# Act within event context
async with event_context("test_event"):
actual_subscription = await get_single_subscription(
mock_project, "subscription_name"
)
actual_subscription = await get_single_subscription("subscription_name")

# Assert
assert actual_subscription == expected_subscription
Expand Down Expand Up @@ -231,13 +228,10 @@ async def test_preserve_case_style_combined(
"state": 0,
"topic": "projects/project_name/topics/topic_name",
}
mock_project = "project_name"

# Act within event context for preserve_case_style = True
async with event_context("test_event"):
actual_subscription_true = await get_single_subscription(
mock_project, "subscription_name"
)
actual_subscription_true = await get_single_subscription("subscription_name")

# Assert for preserve_case_style = True
assert actual_subscription_true == expected_subscription_true
Expand All @@ -264,9 +258,7 @@ async def test_preserve_case_style_combined(

# Act within event context for preserve_case_style = False
async with event_context("test_event"):
actual_subscription_false = await get_single_subscription(
mock_project, "subscription_name"
)
actual_subscription_false = await get_single_subscription("subscription_name")

# Assert for preserve_case_style = False
assert actual_subscription_false == expected_subscription_false

0 comments on commit 78bfa53

Please sign in to comment.