Skip to content

Commit

Permalink
[Integration][Terraform-Cloud] Add Rate Limiting Improvements (#1110)
Browse files Browse the repository at this point in the history
  • Loading branch information
phalbert authored Nov 26, 2024
1 parent 52c9dfb commit eded7ec
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 88 deletions.
8 changes: 6 additions & 2 deletions integrations/terraform-cloud/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.83 (2024-11-26)

### Improvements

- Added rate limit handling to the client

## 0.1.82 (2024-11-25)


Expand All @@ -17,12 +23,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## 0.1.81 (2024-11-25)


### Improvements

- Bumped ocean version to ^0.14.2


## 0.1.80 (2024-11-21)


Expand Down
51 changes: 29 additions & 22 deletions integrations/terraform-cloud/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from enum import StrEnum
from typing import Any, AsyncGenerator, Optional
from port_ocean.utils import http_async_client
import httpx
from aiolimiter import AsyncLimiter
from loguru import logger
from enum import StrEnum

from port_ocean.context.event import event
from port_ocean.utils import http_async_client

TERRAFORM_WEBHOOK_EVENTS = [
"run:applying",
Expand All @@ -21,6 +21,8 @@ class CacheKeys(StrEnum):


PAGE_SIZE = 100
NUMBER_OF_REQUESTS = 25
NUMBER_OF_SECONDS = 1


class TerraformClient:
Expand All @@ -34,39 +36,44 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None:
self.client = http_async_client
self.client.headers.update(self.base_headers)

self.rate_limiter = AsyncLimiter(NUMBER_OF_REQUESTS, NUMBER_OF_SECONDS)

async def send_api_request(
self,
endpoint: str,
method: str = "GET",
query_params: Optional[dict[str, Any]] = None,
json_data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
logger.info(f"Requesting Terraform Cloud data for endpoint: {endpoint}")
url = f"{self.api_url}/{endpoint}"

try:
url = f"{self.api_url}/{endpoint}"
logger.info(
f"URL: {url}, Method: {method}, Params: {query_params}, Body: {json_data}"
)
response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)
response.raise_for_status()
async with self.rate_limiter:
logger.debug(
f"Requesting {method} {url} with params: {query_params} and body: {json_data}"
)

logger.info(f"Successfully retrieved data for endpoint: {endpoint}")
response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)

return response.json()
response.raise_for_status()
logger.debug(f"Successfully retrieved data for endpoint: {endpoint}")
return response.json()

except httpx.HTTPStatusError as e:
except Exception as e:
logger.error(
f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}"
"Request failed",
error=str(e),
url=url,
method=method,
params=query_params,
body=json_data,
)
raise
except httpx.HTTPError as e:
logger.error(f"HTTP error on {endpoint}: {str(e)}")
raise

async def get_paginated_resources(
self, endpoint: str, params: Optional[dict[str, Any]] = None
Expand Down
113 changes: 51 additions & 62 deletions integrations/terraform-cloud/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
from asyncio import gather
import asyncio
from enum import StrEnum
from typing import Any, List, Dict

from typing import Any, List
from loguru import logger

from client import TerraformClient
Expand All @@ -19,7 +18,6 @@ class ObjectKind(StrEnum):


SKIP_WEBHOOK_CREATION = False
SEMAPHORE_LIMIT = 30


def init_terraform_client() -> TerraformClient:
Expand All @@ -40,51 +38,26 @@ def init_terraform_client() -> TerraformClient:
async def enrich_state_versions_with_output_data(
http_client: TerraformClient, state_versions: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
tasks = [
http_client.get_state_version_output(state_version["id"])
for state_version in state_versions
]

output_batches = []
for completed_task in asyncio.as_completed(tasks):
output = await completed_task
output_batches.append(output)

enriched_state_versions = [
{**state_version, "__output": output}
for state_version, output in zip(state_versions, output_batches)
]
async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]:
try:
output = await http_client.get_state_version_output(state_version["id"])
return {**state_version, "__output": output}
except Exception as e:
logger.warning(
f"Failed to fetch output for state version {state_version['id']}: {e}"
)
return {**state_version, "__output": {}}

return enriched_state_versions
enriched_versions = await gather(
*[fetch_output(state_version) for state_version in state_versions]
)
return list(enriched_versions)


async def enrich_workspaces_with_tags(
http_client: TerraformClient, workspaces: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(
f"Failed to fetch tags for workspace {workspace['id']}: {e}"
)
return {**workspace, "__tags": []}

tasks = [get_tags_for_workspace(workspace) for workspace in workspaces]
enriched_workspaces = [await task for task in asyncio.as_completed(tasks)]

return enriched_workspaces


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
Expand All @@ -94,6 +67,24 @@ async def enrich_workspace_with_tags(
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}

enriched_workspaces = await gather(
*[get_tags_for_workspace(workspace) for workspace in workspaces]
)
return list(enriched_workspaces)


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}


@ocean.on_resync(ObjectKind.ORGANIZATION)
async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
Expand Down Expand Up @@ -125,32 +116,30 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(ObjectKind.RUN)
async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
terraform_client = init_terraform_client()
BATCH_SIZE = 25 # Stay safely under 30 req/sec limit

async def fetch_runs_for_workspace(
workspace: dict[str, Any]
) -> List[List[Dict[str, Any]]]:
return [
run
async for run in terraform_client.get_paginated_runs_for_workspace(
workspace["id"]
)
]
async def process_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]:
runs = []
async for run_batch in terraform_client.get_paginated_runs_for_workspace(
workspace["id"]
):
if run_batch:
runs.extend(run_batch)
return runs

async def fetch_runs_for_all_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE:
async for workspaces in terraform_client.get_paginated_workspaces():
logger.info(
f"Received {len(workspaces)} batch workspaces... fetching its associated {kind}"
)
async for workspaces in terraform_client.get_paginated_workspaces():
logger.info(f"Processing batch of {len(workspaces)} workspaces")

# Process in batches to stay under rate limit
for i in range(0, len(workspaces), BATCH_SIZE):
batch = workspaces[i : i + BATCH_SIZE]
tasks = [process_workspace(workspace) for workspace in batch]

tasks = [fetch_runs_for_workspace(workspace) for workspace in workspaces]
for completed_task in asyncio.as_completed(tasks):
workspace_runs = await completed_task
for runs in workspace_runs:
runs = await completed_task
if runs:
yield runs

async for run_batch in fetch_runs_for_all_workspaces():
yield run_batch


@ocean.on_resync(ObjectKind.STATE_VERSION)
async def resync_state_versions(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
Expand Down
13 changes: 12 additions & 1 deletion integrations/terraform-cloud/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion integrations/terraform-cloud/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[tool.poetry]
name = "terraform-cloud"
version = "0.1.82"
version = "0.1.83"
description = "Terraform Cloud Integration for Port"
authors = ["Michael Armah <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.12"
port_ocean = {version = "^0.14.3", extras = ["cli"]}
aiolimiter = "^1.1.0"

[tool.poetry.group.dev.dependencies]
# uncomment this if you want to debug the ocean core together with your integration
Expand Down

0 comments on commit eded7ec

Please sign in to comment.