Skip to content

Commit

Permalink
[Integration][Snyk] Add rate limiting handling (#1092)
Browse files Browse the repository at this point in the history
  • Loading branch information
oiadebayo authored Nov 19, 2024
1 parent cb7734b commit 1893b43
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 17 deletions.
9 changes: 9 additions & 0 deletions integrations/snyk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->


## 0.1.102 (2024-11-14)


### Improvements

- Added rate limiting handling to the integration to prevent reaching the Snyk API rate limit


## 0.1.101 (2024-11-12)


Expand Down
5 changes: 4 additions & 1 deletion integrations/snyk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.context.ocean import ocean
from port_ocean.context.event import event

from aiolimiter import AsyncLimiter
from snyk.client import SnykClient
from snyk.overrides import ProjectResourceConfig

CONCURRENT_REQUESTS = 20
SNYK_LIMIT = 1320
RATELIMITER = AsyncLimiter(SNYK_LIMIT)


class ObjectKind(StrEnum):
Expand Down Expand Up @@ -46,6 +48,7 @@ def parse_list(value: str) -> Optional[list[str]]:
parse_list(ocean.integration_config.get("organization_id", "")),
parse_list(ocean.integration_config.get("groups", "")),
ocean.integration_config.get("webhook_secret"),
RATELIMITER,
)


Expand Down
13 changes: 12 additions & 1 deletion integrations/snyk/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion integrations/snyk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[tool.poetry]
name = "snyk"
version = "0.1.101"
version = "0.1.102"
description = "Snyk integration powered by Ocean"
authors = ["Isaac Coffie <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.12"
aiolimiter = "^1.1.0"
port_ocean = {version = "^0.14.0", extras = ["cli"]}

[tool.poetry.group.dev.dependencies]
Expand Down
31 changes: 17 additions & 14 deletions integrations/snyk/snyk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import httpx
from httpx import Timeout
from loguru import logger

from port_ocean.context.event import event
from port_ocean.utils import http_async_client
from aiolimiter import AsyncLimiter


class CacheKeys(StrEnum):
Expand All @@ -31,6 +31,7 @@ def __init__(
organization_ids: list[str] | None,
group_ids: list[str] | None,
webhook_secret: str | None,
rate_limiter: AsyncLimiter,
):
self.token = token
self.api_url = f"{api_url}/v1"
Expand All @@ -43,6 +44,7 @@ def __init__(
self.http_client.headers.update(self.api_auth_header)
self.http_client.timeout = Timeout(30)
self.snyk_api_version = "2024-06-21"
self.rate_limiter = rate_limiter

@property
def api_auth_header(self) -> dict[str, Any]:
Expand All @@ -60,20 +62,21 @@ async def _send_api_request(
**(query_params or {}),
**({"version": version} if version is not None else {}),
}
try:
response = await self.http_client.request(
method=method, url=url, params=query_params, json=json_data
)
response.raise_for_status()
return response.json()
async with self.rate_limiter:
try:
response = await self.http_client.request(
method=method, url=url, params=query_params, json=json_data
)
response.raise_for_status()
return response.json()

except httpx.HTTPStatusError as e:
logger.error(
f"Encountered an error while sending a request to {method} {url} with query_params: {query_params}, "
f"version: {version}, json: {json_data}. "
f"Got HTTP error with status code: {e.response.status_code} and response: {e.response.text}"
)
raise
except httpx.HTTPStatusError as e:
logger.error(
f"Encountered an error while sending a request to {method} {url} with query_params: {query_params}, "
f"version: {version}, json: {json_data}. "
f"Got HTTP error with status code: {e.response.status_code} and response: {e.response.text}"
)
raise

async def _get_paginated_resources(
self,
Expand Down
5 changes: 5 additions & 0 deletions integrations/snyk/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# ruff: noqa
from port_ocean.tests.helpers.fixtures import (
get_mocked_ocean_app,
get_mock_ocean_resource_configs,
)
114 changes: 114 additions & 0 deletions integrations/snyk/tests/snyk/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pytest
from unittest.mock import patch, AsyncMock, MagicMock
from typing import Any, Dict, List, Generator
from snyk.client import SnykClient
from port_ocean.exceptions.context import PortOceanContextAlreadyInitializedError
from port_ocean.context.ocean import initialize_port_ocean_context
from port_ocean.context.event import EventContext
from aiolimiter import AsyncLimiter
import time
import asyncio

MOCK_API_URL = "https://api.snyk.io/v1"
MOCK_TOKEN = "dummy_token"
MOCK_PROJECT_ID = "12345"
MOCK_ISSUES = [{"id": "issue1"}, {"id": "issue2"}]
MOCK_ORG_URL = "https://your_organization_url.com"
MOCK_PERSONAL_ACCESS_TOKEN = "personal_access_token"


# Port Ocean Mocks
@pytest.fixture(autouse=True)
def mock_ocean_context() -> None:
"""Fixture to mock the Ocean context initialization."""
try:
mock_ocean_app = MagicMock()
mock_ocean_app.config.integration.config = {
"organization_url": MOCK_ORG_URL,
"personal_access_token": MOCK_PERSONAL_ACCESS_TOKEN,
}
mock_ocean_app.integration_router = MagicMock()
mock_ocean_app.port_client = MagicMock()
initialize_port_ocean_context(mock_ocean_app)
except PortOceanContextAlreadyInitializedError:
pass


@pytest.fixture
def mock_event_context() -> Generator[MagicMock, None, None]:
"""Fixture to mock the event context."""
mock_event = MagicMock(spec=EventContext)
mock_event.event_type = "test_event"
mock_event.trigger_type = "manual"
mock_event.attributes = {}
mock_event._deadline = 999999999.0
mock_event._aborted = False

with patch("port_ocean.context.event.event", mock_event):
yield mock_event


@pytest.fixture
def snyk_client() -> SnykClient:
"""Fixture to create a SnykClient instance for testing."""
return SnykClient(
token=MOCK_TOKEN,
api_url=MOCK_API_URL,
app_host=None,
organization_ids=None,
group_ids=None,
webhook_secret=None,
rate_limiter=AsyncLimiter(5, 1),
)


@pytest.mark.asyncio
async def test_send_api_request_rate_limit(snyk_client: SnykClient) -> None:
"""Test rate limit enforcement on API request."""
with patch.object(
snyk_client.http_client, "request", new_callable=AsyncMock
) as mock_request:
mock_request.return_value.json = AsyncMock(return_value={})
mock_request.return_value.raise_for_status = AsyncMock()

async def make_request() -> None:
await snyk_client._send_api_request(url=f"{MOCK_API_URL}/test")
await mock_request.return_value.raise_for_status()

start_time = time.monotonic()

await asyncio.gather(*[make_request() for _ in range(15)])

elapsed_time = time.monotonic() - start_time

assert (
elapsed_time >= 1.0
), "Rate limiter did not properly enforce the rate limit."


@pytest.mark.asyncio
async def test_get_paginated_resources(
snyk_client: SnykClient, mock_event_context: MagicMock
) -> None:
"""Test getting paginated resources with mocked response."""

async def mock_send_api_request(*args: Any, **kwargs: Any) -> Dict[str, Any]:
url = kwargs.get("url")
if url and url.endswith("/page1"):
return {"data": [{"id": "item1"}], "links": {"next": "/rest/page2"}}
elif url and url.endswith("/page2"):
return {"data": [{"id": "item2"}], "links": {"next": ""}}
return {}

with patch.object(
snyk_client, "_send_api_request", side_effect=mock_send_api_request
):
url_path = "/page1"

resources: List[Dict[str, Any]] = []
async for resource_batch in snyk_client._get_paginated_resources(
url_path=url_path
):
resources.extend(resource_batch)

assert resources == [{"id": "item1"}, {"id": "item2"}]

0 comments on commit 1893b43

Please sign in to comment.