From 8d64d72fb5c5d331bcac7f0b40d7aa1e1cdb30b3 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 21 Aug 2024 15:31:09 +0100 Subject: [PATCH 1/4] [ADO] Add work items to get issues, tasks, and epics (#893) # Description This PR introduces the functionality to retrieve issues, tasks, and epics from Azure DevOps (ADO) projects within the organization. The implementation involves a two-step process to efficiently fetch the required data: **Fetch Work Item IDs using WIQL:** A WIQL (Work Item Query Language) query is executed to retrieve the IDs of the work items (issues, tasks, and epics) that match the specified criteria. This step helps optimize data retrieval by fetching only the necessary IDs, avoiding unnecessary data transfer. **Fetch Detailed Work Item Data:** Using the obtained work item IDs, the API is called to fetch the complete details of each work item. Batch processing is employed to handle potential API limitations on the number of IDs that can be fetched in a single request. ## Type of change Please leave one option from the following and delete the rest: - [ ] New feature (non-breaking change which adds functionality) ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. Screenshot 2024-08-09 at 21 39 38 Screenshot 2024-08-09 at 21 40 36 Screenshot 2024-08-09 at 21 39 00 ## API Documentation Provide links to the API documentation used for this integration. --- .../.port/resources/blueprints.json | 86 +++++++++++++++ .../.port/resources/port-app-config.yaml | 23 +++- integrations/azure-devops/CHANGELOG.md | 5 + .../client/azure_devops_client.py | 103 +++++++++++++++++- .../azure-devops/azure_devops/misc.py | 22 +++- integrations/azure-devops/main.py | 8 ++ integrations/azure-devops/pyproject.toml | 2 +- 7 files changed, 240 insertions(+), 9 deletions(-) diff --git a/integrations/azure-devops/.port/resources/blueprints.json b/integrations/azure-devops/.port/resources/blueprints.json index 65641f1c89..e9afea2d7c 100644 --- a/integrations/azure-devops/.port/resources/blueprints.json +++ b/integrations/azure-devops/.port/resources/blueprints.json @@ -115,5 +115,91 @@ "many": false } } + }, + { + "identifier": "workItem", + "title": "Work Item", + "icon": "AzureDevops", + "schema": { + "properties": { + "type": { + "title": "Type", + "type": "string", + "icon": "AzureDevops", + "description": "The type of work item (e.g., Bug, Task, User Story)", + "enum": ["Issue", "Epic", "Task"], + "enumColors": { + "Issue": "green", + "Epic": "orange", + "Task": "blue" + } + }, + "state": { + "title": "State", + "type": "string", + "icon": "AzureDevops", + "description": "The current state of the work item (e.g., New, Active, Closed)" + }, + "reason": { + "title": "Reason", + "type": "string", + "description": "The title of the work item" + }, + "effort": { + "title": "Effort", + "type": "number", + "description": "The estimated effort for the work item" + }, + "description": { + "title": "Description", + "type": "string", + "format": "markdown", + "description": "A detailed description of the work item" + }, + "link": { + "title": "Link", + "type": "string", + "format": "url", + "icon": "AzureDevops", + "description": "Link to the work item in Azure DevOps" + }, + "createdBy": { + "title": "Created By", + "type": "string", + "icon": "User", + "description": "The person who created the work item" + }, + "changedBy": { + "title": "Changed By", + "type": "string", + "icon": "User", + "description": "The person who last changed the work item" + }, + "createdDate": { + "title": "Created Date", + "type": "string", + "format": "date-time", + "description": "The date and time when the work item was created" + }, + "changedDate": { + "title": "Changed Date", + "type": "string", + "format": "date-time", + "description": "The date and time when the work item was last changed" + } + }, + "required": [] + }, + "mirrorProperties": {}, + "calculationProperties": {}, + "aggregationProperties": {}, + "relations": { + "project": { + "title": "Project", + "target": "project", + "required": true, + "many": false + } + } } ] diff --git a/integrations/azure-devops/.port/resources/port-app-config.yaml b/integrations/azure-devops/.port/resources/port-app-config.yaml index d8328d7428..764db12bb5 100644 --- a/integrations/azure-devops/.port/resources/port-app-config.yaml +++ b/integrations/azure-devops/.port/resources/port-app-config.yaml @@ -17,7 +17,6 @@ resources: visibility: '.visibility' defaultTeam: '.defaultTeam.name' link: '.url | gsub("_apis/projects/"; "")' - - kind: repository selector: query: "true" @@ -54,3 +53,25 @@ resources: blueprint: '"service"' properties: workItemLinking: '.isEnabled and .isBlocking' + - kind: work-item + selector: + query: 'true' + port: + entity: + mappings: + identifier: '.id | tostring' + blueprint: '"workItem"' + title: '.fields."System.Title"' + properties: + type: '.fields."System.WorkItemType"' + state: '.fields."System.State"' + effort: '.fields."Microsoft.VSTS.Scheduling.Effort"' + description: '.fields."System.Description"' + link: '.url' + reason: '.fields."System.Reason"' + createdBy: '.fields."System.CreatedBy".displayName' + changedBy: '.fields."System.ChangedBy".displayName' + createdDate: '.fields."System.CreatedDate"' + changedDate: '.fields."System.ChangedDate"' + relations: + project: '.fields."System.TeamProject" | gsub(" "; "")' diff --git a/integrations/azure-devops/CHANGELOG.md b/integrations/azure-devops/CHANGELOG.md index 30e2794100..5ec9867a22 100644 --- a/integrations/azure-devops/CHANGELOG.md +++ b/integrations/azure-devops/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.1.54 (2024-08-21) + +### Features + +- Added work items to get issues, tasks, and epics ## 0.1.53 (2024-08-20) diff --git a/integrations/azure-devops/azure_devops/client/azure_devops_client.py b/integrations/azure-devops/azure_devops/client/azure_devops_client.py index 5ea976c478..3b04ea8859 100644 --- a/integrations/azure-devops/azure_devops/client/azure_devops_client.py +++ b/integrations/azure-devops/azure_devops/client/azure_devops_client.py @@ -1,16 +1,26 @@ import json +import asyncio +import typing + from typing import Any, AsyncGenerator, Optional -from azure_devops.webhooks.webhook_event import WebhookEvent from httpx import HTTPStatusError +from loguru import logger + from port_ocean.context.event import event from port_ocean.context.ocean import ocean -from loguru import logger -from .base_client import HTTPBaseClient from port_ocean.utils.cache import cache_iterator_result -import asyncio + +from azure_devops.misc import AzureDevopsWorkItemResourceConfig +from azure_devops.webhooks.webhook_event import WebhookEvent + +from .base_client import HTTPBaseClient + API_URL_PREFIX = "_apis" WEBHOOK_API_PARAMS = {"api-version": "7.1-preview.1"} +# Maximum number of work item IDs allowed in a single API request +# (based on Azure DevOps API limitations) https://learn.microsoft.com/en-us/rest/api/azure/devops/wit/work-items/list?view=azure-devops-rest-7.1&tabs=HTTP +MAX_WORK_ITEMS_PER_REQUEST = 200 class AzureDevopsClient(HTTPBaseClient): @@ -137,6 +147,91 @@ async def generate_repository_policies( policy["__repository"] = repo yield repo_policies + async def generate_work_items(self) -> AsyncGenerator[list[dict[str, Any]], None]: + """ + Retrieves a paginated list of work items within the Azure DevOps organization based on a WIQL query. + """ + async for projects in self.generate_projects(): + for project in projects: + # 1. Execute WIQL query to get work item IDs + work_item_ids = await self._fetch_work_item_ids(project["id"]) + + # 2. Fetch work items using the IDs (in batches if needed) + work_items = await self._fetch_work_items_in_batches( + project["id"], work_item_ids + ) + + # Call the private method to add __projectId to each work item + self._add_project_id_to_work_items(work_items, project["id"]) + + yield work_items + + async def _fetch_work_item_ids(self, project_id: str) -> list[int]: + """ + Executes a WIQL query to fetch work item IDs for a given project. + + :param project_id: The ID of the project. + :return: A list of work item IDs. + """ + config = typing.cast(AzureDevopsWorkItemResourceConfig, event.resource_config) + + wiql_query = "SELECT [Id] from WorkItems" + + if config.selector.wiql: + # Append the user-provided wiql to the WHERE clause + wiql_query += f" WHERE {config.selector.wiql}" + logger.info(f"Found and appended WIQL filter: {config.selector.wiql}") + + wiql_url = ( + f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/wit/wiql" + ) + wiql_response = await self.send_request( + "POST", + wiql_url, + params={"api-version": "7.1-preview.2"}, + data=json.dumps({"query": wiql_query}), + headers={"Content-Type": "application/json"}, + ) + wiql_response.raise_for_status() + return [item["id"] for item in wiql_response.json()["workItems"]] + + async def _fetch_work_items_in_batches( + self, project_id: str, work_item_ids: list[int] + ) -> list[dict[str, Any]]: + """ + Fetches work items in batches based on the list of work item IDs. + + :param project_id: The ID of the project. + :param work_item_ids: A list of work item IDs to fetch. + :return: A list of work items. + """ + work_items = [] + for i in range(0, len(work_item_ids), MAX_WORK_ITEMS_PER_REQUEST): + batch_ids = work_item_ids[i : i + MAX_WORK_ITEMS_PER_REQUEST] + work_items_url = f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/wit/workitems" + params = { + "ids": ",".join(map(str, batch_ids)), + "api-version": "7.1-preview.3", + } + work_items_response = await self.send_request( + "GET", work_items_url, params=params + ) + work_items_response.raise_for_status() + work_items.extend(work_items_response.json()["value"]) + return work_items + + def _add_project_id_to_work_items( + self, work_items: list[dict[str, Any]], project_id: str + ) -> None: + """ + Adds the project ID to each work item in the list. + + :param work_items: List of work items to modify. + :param project_id: The project ID to add to each work item. + """ + for work_item in work_items: + work_item["__projectId"] = project_id + async def get_pull_request(self, pull_request_id: str) -> dict[Any, Any]: get_single_pull_request_url = f"{self._organization_base_url}/{API_URL_PREFIX}/git/pullrequests/{pull_request_id}" response = await self.send_request("GET", get_single_pull_request_url) diff --git a/integrations/azure-devops/azure_devops/misc.py b/integrations/azure-devops/azure_devops/misc.py index b600a238f6..a873231e70 100644 --- a/integrations/azure-devops/azure_devops/misc.py +++ b/integrations/azure-devops/azure_devops/misc.py @@ -18,6 +18,7 @@ class Kind(StrEnum): MEMBER = "member" TEAM = "team" PROJECT = "project" + WORK_ITEM = "work-item" PULL_REQUEST_SEARCH_CRITERIA: list[dict[str, Any]] = [ @@ -46,12 +47,27 @@ class AzureDevopsSelector(Selector): selector: AzureDevopsSelector +class AzureDevopsWorkItemResourceConfig(ResourceConfig): + class AzureDevopsSelector(Selector): + query: str + wiql: str | None = Field( + default=None, + description="WIQL query to filter work items. If not provided, all work items will be fetched.", + alias="wiql", + ) + + kind: Literal["work-item"] + selector: AzureDevopsSelector + + class GitPortAppConfig(PortAppConfig): spec_path: List[str] | str = Field(alias="specPath", default="port.yml") branch: str = "main" - resources: list[AzureDevopsProjectResourceConfig | ResourceConfig] = Field( - default_factory=list - ) + resources: list[ + AzureDevopsProjectResourceConfig + | AzureDevopsWorkItemResourceConfig + | ResourceConfig + ] = Field(default_factory=list) def extract_branch_name_from_ref(ref: str) -> str: diff --git a/integrations/azure-devops/main.py b/integrations/azure-devops/main.py index e7958b26d4..89ca64a858 100644 --- a/integrations/azure-devops/main.py +++ b/integrations/azure-devops/main.py @@ -103,6 +103,14 @@ async def resync_repository_policies(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: yield policies +@ocean.on_resync(Kind.WORK_ITEM) +async def resync_workitems(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + azure_devops_client = AzureDevopsClient.create_from_ocean_config() + async for work_items in azure_devops_client.generate_work_items(): + logger.info(f"Resyncing {len(work_items)} work items") + yield work_items + + @ocean.router.post("/webhook") async def webhook(request: Request) -> dict[str, Any]: body = await request.json() diff --git a/integrations/azure-devops/pyproject.toml b/integrations/azure-devops/pyproject.toml index d616772f70..3479388fba 100644 --- a/integrations/azure-devops/pyproject.toml +++ b/integrations/azure-devops/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "azure-devops" -version = "0.1.53" +version = "0.1.54" description = "An Azure Devops Ocean integration" authors = ["Matan Geva "] From f96ea28db347a186b9f8e13d7dba8be3cb2479f0 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 18:34:01 +0300 Subject: [PATCH 2/4] Configure Renovate (#922) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) Welcome to [Renovate](https://togithub.com/renovatebot/renovate)! This is an onboarding PR to help you understand and configure settings before regular Pull Requests begin. 🚦 To activate Renovate, merge this Pull Request. To disable Renovate, simply close this Pull Request unmerged. --- ### Detected Package Files * `integrations/_infra/Dockerfile` (dockerfile) * `port_ocean/cli/cookiecutter/{{cookiecutter.integration_slug}}/Dockerfile` (dockerfile) * `.github/workflows/apply-release.yml` (github-actions) * `.github/workflows/ci.yml` (github-actions) * `.github/workflows/combine-dependabot-prs.yml` (github-actions) * `.github/workflows/create-new-sonarcloud-project.yml` (github-actions) * `.github/workflows/lint.yml` (github-actions) * `.github/workflows/pr-labeler.yml` (github-actions) * `.github/workflows/release-framework.yml` (github-actions) * `.github/workflows/release-integrations.yml` (github-actions) * `.github/workflows/sonarcloud-framework.yml` (github-actions) * `.github/workflows/sonarcloud-integrations.yml` (github-actions) * `.github/workflows/test.yml` (github-actions) * `.github/workflows/upgrade-integrations.yml` (github-actions) * `.github/workflows/validate-integration-files.yml` (github-actions) * `.github/workflows/verify-docs-build.yml` (github-actions) * `docs/framework-guides/package.json` (npm) * `integrations/argocd/pyproject.toml` (pep621) * `integrations/aws/pyproject.toml` (pep621) * `integrations/azure-devops/pyproject.toml` (pep621) * `integrations/azure/pyproject.toml` (pep621) * `integrations/datadog/pyproject.toml` (pep621) * `integrations/dynatrace/pyproject.toml` (pep621) * `integrations/firehydrant/pyproject.toml` (pep621) * `integrations/gcp/pyproject.toml` (pep621) * `integrations/gitlab/pyproject.toml` (pep621) * `integrations/jenkins/pyproject.toml` (pep621) * `integrations/jira/pyproject.toml` (pep621) * `integrations/kafka/pyproject.toml` (pep621) * `integrations/kubecost/pyproject.toml` (pep621) * `integrations/launchdarkly/pyproject.toml` (pep621) * `integrations/linear/pyproject.toml` (pep621) * `integrations/newrelic/pyproject.toml` (pep621) * `integrations/opencost/pyproject.toml` (pep621) * `integrations/opsgenie/pyproject.toml` (pep621) * `integrations/pagerduty/pyproject.toml` (pep621) * `integrations/sentry/pyproject.toml` (pep621) * `integrations/servicenow/pyproject.toml` (pep621) * `integrations/snyk/pyproject.toml` (pep621) * `integrations/sonarqube/pyproject.toml` (pep621) * `integrations/statuspage/pyproject.toml` (pep621) * `integrations/terraform-cloud/pyproject.toml` (pep621) * `integrations/wiz/pyproject.toml` (pep621) * `port_ocean/cli/cookiecutter/{{cookiecutter.integration_slug}}/pyproject.toml` (pep621) * `pyproject.toml` (pep621) * `integrations/argocd/pyproject.toml` (poetry) * `integrations/aws/pyproject.toml` (poetry) * `integrations/azure-devops/pyproject.toml` (poetry) * `integrations/azure/pyproject.toml` (poetry) * `integrations/datadog/pyproject.toml` (poetry) * `integrations/dynatrace/pyproject.toml` (poetry) * `integrations/firehydrant/pyproject.toml` (poetry) * `integrations/gcp/pyproject.toml` (poetry) * `integrations/gitlab/pyproject.toml` (poetry) * `integrations/jenkins/pyproject.toml` (poetry) * `integrations/jira/pyproject.toml` (poetry) * `integrations/kafka/pyproject.toml` (poetry) * `integrations/kubecost/pyproject.toml` (poetry) * `integrations/launchdarkly/pyproject.toml` (poetry) * `integrations/linear/pyproject.toml` (poetry) * `integrations/newrelic/pyproject.toml` (poetry) * `integrations/opencost/pyproject.toml` (poetry) * `integrations/opsgenie/pyproject.toml` (poetry) * `integrations/pagerduty/pyproject.toml` (poetry) * `integrations/sentry/pyproject.toml` (poetry) * `integrations/servicenow/pyproject.toml` (poetry) * `integrations/snyk/pyproject.toml` (poetry) * `integrations/sonarqube/pyproject.toml` (poetry) * `integrations/statuspage/pyproject.toml` (poetry) * `integrations/terraform-cloud/pyproject.toml` (poetry) * `integrations/wiz/pyproject.toml` (poetry) * `port_ocean/cli/cookiecutter/{{cookiecutter.integration_slug}}/pyproject.toml` (poetry) * `pyproject.toml` (poetry) * `.python-version` (pyenv) * `deployment/terraform/aws/ecs/main.tf` (terraform) * `deployment/terraform/aws/ecs/providers.tf` (terraform) ### Configuration Summary Based on the default config's presets, Renovate will: - Start dependency updates only once this onboarding PR is merged - Show all Merge Confidence badges for pull requests. - Enable Renovate Dependency Dashboard creation. - Use semantic commit type `fix` for dependencies and `chore` for all others if semantic commits are in use. - Ignore `node_modules`, `bower_components`, `vendor` and various test/tests directories. - Group known monorepo packages together. - Use curated list of recommended non-monorepo package groupings. - Apply crowd-sourced package replacement rules. - Apply crowd-sourced workarounds for known problems with packages. 🔡 Do you want to change how Renovate upgrades your dependencies? Add your custom config to `renovate.json` in this branch. Renovate will update the Pull Request description the next time it runs. --- ### What to Expect With your current configuration, Renovate will create 36 Pull Requests:
Update dependency @mdx-js/react to v3.0.1 - Schedule: ["at any time"] - Branch name: `renovate/mdx-monorepo` - Merge into: `main` - Upgrade [@mdx-js/react](https://togithub.com/mdx-js/mdx) to `3.0.1`
Update dependency aiohttp to v3.10.5 - Schedule: ["at any time"] - Branch name: `renovate/aiohttp-3.x-lockfile` - Merge into: `main` - Upgrade [aiohttp](https://togithub.com/aio-libs/aiohttp) to `3.10.5`
Update dependency prettier to v2.8.8 - Schedule: ["at any time"] - Branch name: `renovate/prettier-2.x-lockfile` - Merge into: `main` - Upgrade [prettier](https://togithub.com/prettier/prettier) to `2.8.8`
Update dependency python-dateutil to ^2.9.0-post.0 - Schedule: ["at any time"] - Branch name: `renovate/python-dateutil-2.x` - Merge into: `main` - Upgrade [python-dateutil](https://togithub.com/dateutil/dateutil) to `^2.9.0-post.0`
Update dependency stream to ^0.0.3 - Schedule: ["at any time"] - Branch name: `renovate/stream-0.x` - Merge into: `main` - Upgrade [stream](https://togithub.com/juliangruber/stream) to `^0.0.3`
Update tj-actions/changed-files action to v44.5.7 - Schedule: ["at any time"] - Branch name: `renovate/tj-actions-changed-files-44.x` - Merge into: `main` - Upgrade [tj-actions/changed-files](https://togithub.com/tj-actions/changed-files) to `v44.5.7`
Update dependency @easyops-cn/docusaurus-search-local to ^0.44.0 - Schedule: ["at any time"] - Branch name: `renovate/easyops-cn-docusaurus-search-local-0.x` - Merge into: `main` - Upgrade [@easyops-cn/docusaurus-search-local](https://togithub.com/easyops-cn/docusaurus-search-local) to `^0.44.0`
Update dependency @​stackql/docusaurus-plugin-hubspot to v1.1.0 - Schedule: ["at any time"] - Branch name: `renovate/stackql-docusaurus-plugin-hubspot-1.x-lockfile` - Merge into: `main` - Upgrade [@stackql/docusaurus-plugin-hubspot](https://togithub.com/stackql/docusaurus-plugin-hubspot) to `1.1.0`
Update dependency aiofiles to ^0.8.0 - Schedule: ["at any time"] - Branch name: `renovate/aiofiles-0.x` - Merge into: `main` - Upgrade [aiofiles](https://togithub.com/Tinche/aiofiles#history) to `^0.8.0`
Update dependency aiostream to ^0.6.0 - Schedule: ["at any time"] - Branch name: `renovate/aiostream-0.x` - Merge into: `main` - Upgrade [aiostream](https://togithub.com/vxgmichel/aiostream) to `^0.6.0`
Update dependency azure-mgmt-resource to v23.1.1 - Schedule: ["at any time"] - Branch name: `renovate/azure-mgmt-resource-23.x` - Merge into: `main` - Upgrade [azure-mgmt-resource](https://togithub.com/Azure/azure-sdk-for-python) to `23.1.1`
Update dependency boto3-stubs to v1.35.2 - Schedule: ["at any time"] - Branch name: `renovate/boto3-stubs-1.x` - Merge into: `main` - Upgrade [boto3-stubs](https://togithub.com/youtype/mypy_boto3_builder) to `1.35.2`
Update dependency fastapi to >=0.112,<0.113 - Schedule: ["at any time"] - Branch name: `renovate/fastapi-0.x` - Merge into: `main` - Upgrade [fastapi](https://togithub.com/fastapi/fastapi) to `>=0.112,<0.113`
Update dependency httpx to ^0.27.0 - Schedule: ["at any time"] - Branch name: `renovate/httpx-0.x` - Merge into: `main` - Upgrade [httpx](https://togithub.com/encode/httpx) to `^0.27.0`
Update dependency pretty-quick to v3.3.1 - Schedule: ["at any time"] - Branch name: `renovate/pretty-quick-3.x-lockfile` - Merge into: `main` - Upgrade [pretty-quick](https://togithub.com/prettier/pretty-quick) to `3.3.1`
Update dependency redocusaurus to v2.1.1 - Schedule: ["at any time"] - Branch name: `renovate/redocusaurus-2.x-lockfile` - Merge into: `main` - Upgrade [redocusaurus](https://togithub.com/rohit-gohri/redocusaurus) to `2.1.1`
Update dependency ruff to v0.6.1 - Schedule: ["at any time"] - Branch name: `renovate/ruff-0.x` - Merge into: `main` - Upgrade [ruff](https://togithub.com/astral-sh/ruff) to `>=0.6.1,<0.7.0` - Upgrade [ruff](https://togithub.com/astral-sh/ruff) to `^0.6.0`
Update dependency typescript to v5.5.4 - Schedule: ["at any time"] - Branch name: `renovate/typescript-5.x-lockfile` - Merge into: `main` - Upgrade [typescript](https://togithub.com/Microsoft/TypeScript) to `5.5.4`
Update docusaurus monorepo to v3.5.2 - Schedule: ["at any time"] - Branch name: `renovate/docusaurus-monorepo` - Merge into: `main` - Upgrade [@docusaurus/core](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/module-type-aliases](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/plugin-client-redirects](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/plugin-content-blog](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/plugin-ideal-image](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/preset-classic](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/theme-common](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/theme-live-codeblock](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/tsconfig](https://togithub.com/facebook/docusaurus) to `3.5.2` - Upgrade [@docusaurus/types](https://togithub.com/facebook/docusaurus) to `3.5.2`
Update python Docker tag to v3.12 - Schedule: ["at any time"] - Branch name: `renovate/python-3.x` - Merge into: `main` - Upgrade python to `3.12` - Upgrade python to `3.12-slim-bookworm`
Update react monorepo to v18.3.1 - Schedule: ["at any time"] - Branch name: `renovate/react-monorepo` - Merge into: `main` - Upgrade [react](https://togithub.com/facebook/react) to `18.3.1` - Upgrade [react-dom](https://togithub.com/facebook/react) to `18.3.1`
Update dependency aioboto3 to v13 - Schedule: ["at any time"] - Branch name: `renovate/aioboto3-13.x` - Merge into: `main` - Upgrade [aioboto3](https://togithub.com/terrycain/aioboto3) to `^13.0.0`
Update dependency aiofiles to v24 - Schedule: ["at any time"] - Branch name: `renovate/aiofiles-24.x` - Merge into: `main` - Upgrade [aiofiles](https://togithub.com/Tinche/aiofiles#history) to `^24.0.0`
Update dependency black to v24 - Schedule: ["at any time"] - Branch name: `renovate/black-24.x` - Merge into: `main` - Upgrade [black](https://togithub.com/psf/black) to `^24.0.0`
Update dependency clsx to v2 - Schedule: ["at any time"] - Branch name: `renovate/clsx-2.x` - Merge into: `main` - Upgrade [clsx](https://togithub.com/lukeed/clsx) to `^2.0.0`
Update dependency pip to v24 - Schedule: ["at any time"] - Branch name: `renovate/pip-24.x-lockfile` - Merge into: `main` - Upgrade [pip](https://togithub.com/pypa/pip) to `24.2`
Update dependency prettier to v3 - Schedule: ["at any time"] - Branch name: `renovate/prettier-3.x` - Merge into: `main` - Upgrade [prettier](https://togithub.com/prettier/prettier) to `^3.0.0`
Update dependency pretty-quick to v4 - Schedule: ["at any time"] - Branch name: `renovate/pretty-quick-4.x` - Merge into: `main` - Upgrade [pretty-quick](https://togithub.com/prettier/pretty-quick) to `^4.0.0`
Update dependency pydantic to v2 - Schedule: ["at any time"] - Branch name: `renovate/pydantic-2.x` - Merge into: `main` - Upgrade [pydantic](https://togithub.com/pydantic/pydantic) to `^2.0.0`
Update dependency pylint to v3 - Schedule: ["at any time"] - Branch name: `renovate/pylint-3.x` - Merge into: `main` - Upgrade [pylint](https://togithub.com/pylint-dev/pylint) to `^3.0.0`
Update dependency pytest to v8 - Schedule: ["at any time"] - Branch name: `renovate/pytest-8.x` - Merge into: `main` - Upgrade [pytest](https://togithub.com/pytest-dev/pytest) to `^8.0.0`
Update dependency python-gitlab to v4 - Schedule: ["at any time"] - Branch name: `renovate/python-gitlab-4.x` - Merge into: `main` - Upgrade [python-gitlab](https://togithub.com/python-gitlab/python-gitlab) to `^4.0.0`
Update dependency remark-math to v6 - Schedule: ["at any time"] - Branch name: `renovate/major-remark` - Merge into: `main` - Upgrade [remark-math](https://togithub.com/remarkjs/remark-math) to `^6.0.0`
Update dependency towncrier to v24 - Schedule: ["at any time"] - Branch name: `renovate/towncrier-24.x` - Merge into: `main` - Upgrade [towncrier](https://togithub.com/twisted/towncrier) to `^24.0.0`
Update dependency types-aioboto3 to v13 - Schedule: ["at any time"] - Branch name: `renovate/types-aioboto3-13.x` - Merge into: `main` - Upgrade [types-aioboto3](https://togithub.com/youtype/mypy_boto3_builder) to `^13.0.0`
Update tj-actions/changed-files action to v45 - Schedule: ["at any time"] - Branch name: `renovate/tj-actions-changed-files-45.x` - Merge into: `main` - Upgrade [tj-actions/changed-files](https://togithub.com/tj-actions/changed-files) to `v45.0.0` - Upgrade [tj-actions/changed-files](https://togithub.com/tj-actions/changed-files) to `v45`
🚸 Branch creation will be limited to maximum 2 per hour, so it doesn't swamp any CI resources or overwhelm the project. See docs for `prhourlylimit` for details. --- ❓ Got questions? Check out Renovate's [Docs](https://docs.renovatebot.com/), particularly the Getting Started section. If you need any further assistance then you can also [request help here](https://togithub.com/renovatebot/renovate/discussions). --- This PR was generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View the [repository job log](https://developer.mend.io/github/port-labs/ocean). Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- renovate.json | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 renovate.json diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000000..5db72dd6a9 --- /dev/null +++ b/renovate.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:recommended" + ] +} From d54b97b9ebd1b187fdf8c8ae337e569c14cf0788 Mon Sep 17 00:00:00 2001 From: Shalev Avhar <51760613+shalev007@users.noreply.github.com> Date: Wed, 21 Aug 2024 18:37:47 +0300 Subject: [PATCH 3/4] [Core] ocean core next resync (#835) # Description **What:** - Integrated functionality to send the state of the integration on each sync. - Implemented prediction of the next sync date. **Why:** - To ensure that the integration state is consistently updated and monitored. - To enhance the accuracy and reliability of the next sync date prediction by utilizing server data for SaaS applications, rather than relying solely on environment variables. **How:** - Updated the sync logic to include the current integration state in the payload sent to our monitoring system. - Modified the sync prediction mechanism for SaaS applications to use data from our servers, providing more accurate and context-aware predictions. ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) --------- Co-authored-by: Shalev Avhar Co-authored-by: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com> --- CHANGELOG.md | 7 ++ port_ocean/clients/port/client.py | 17 ++++ port_ocean/config/settings.py | 6 +- port_ocean/core/event_listener/base.py | 37 +++++++ port_ocean/core/event_listener/http.py | 2 +- port_ocean/core/event_listener/kafka.py | 2 +- port_ocean/core/event_listener/once.py | 97 ++++++++++++++++++- port_ocean/core/event_listener/polling.py | 24 +++-- .../handlers/resync_state_updater/__init__.py | 5 + .../handlers/resync_state_updater/updater.py | 84 ++++++++++++++++ port_ocean/core/models.py | 7 +- port_ocean/core/utils.py | 5 +- port_ocean/ocean.py | 32 ++++-- port_ocean/utils/misc.py | 8 +- port_ocean/utils/time.py | 54 +++++++++++ pyproject.toml | 2 +- 16 files changed, 360 insertions(+), 29 deletions(-) create mode 100644 port_ocean/core/handlers/resync_state_updater/__init__.py create mode 100644 port_ocean/core/handlers/resync_state_updater/updater.py create mode 100644 port_ocean/utils/time.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5926a9c8af..d640ceabea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.10.0 (2024-08-19) + +### Improvements + +- Add support for reporting the integration resync state to expose more information about the integration state in the portal + + ## 0.9.14 (2024-08-19) diff --git a/port_ocean/clients/port/client.py b/port_ocean/clients/port/client.py index 1a805bfd79..2b323f139a 100644 --- a/port_ocean/clients/port/client.py +++ b/port_ocean/clients/port/client.py @@ -13,6 +13,7 @@ get_internal_http_client, ) from port_ocean.exceptions.clients import KafkaCredentialsNotFound +from typing import Any class PortClient( @@ -75,3 +76,19 @@ async def get_org_id(self) -> str: handle_status_code(response) return response.json()["organization"]["id"] + + async def update_integration_state( + self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True + ) -> dict[str, Any]: + if should_log: + logger.debug(f"Updating integration resync state with: {state}") + response = await self.client.patch( + f"{self.api_url}/integration/{self.integration_identifier}/resync-state", + headers=await self.auth.headers(), + json=state, + ) + handle_status_code(response, should_raise, should_log) + if response.is_success and should_log: + logger.info("Integration resync state updated successfully") + + return response.json().get("integration", {}) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 1955ed7364..1de4829211 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -76,7 +76,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): integration: IntegrationSettings = Field( default_factory=lambda: IntegrationSettings(type="", identifier="") ) - runtime: Runtime = "OnPrem" + runtime: Runtime = Runtime.OnPrem @root_validator() def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]: @@ -100,8 +100,8 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel: return values @validator("runtime") - def validate_runtime(cls, runtime: Literal["OnPrem", "Saas"]) -> Runtime: - if runtime == "Saas": + def validate_runtime(cls, runtime: Runtime) -> Runtime: + if runtime == Runtime.Saas: spec = get_spec_file() if spec is None: raise ValueError( diff --git a/port_ocean/core/event_listener/base.py b/port_ocean/core/event_listener/base.py index cf3934764e..04c43d87f1 100644 --- a/port_ocean/core/event_listener/base.py +++ b/port_ocean/core/event_listener/base.py @@ -5,6 +5,8 @@ from port_ocean.config.base import BaseOceanModel from port_ocean.utils.signal import signal_handler +from port_ocean.context.ocean import ocean +from port_ocean.utils.misc import IntegrationStateStatus class EventListenerEvents(TypedDict): @@ -36,6 +38,41 @@ def _stop(self) -> None: """ pass + async def _before_resync(self) -> None: + """ + Can be used for event listeners that need to perform some action before resync. + """ + await ocean.app.resync_state_updater.update_before_resync() + + async def _after_resync(self) -> None: + """ + Can be used for event listeners that need to perform some action after resync. + """ + await ocean.app.resync_state_updater.update_after_resync() + + async def _on_resync_failure(self, e: Exception) -> None: + """ + Can be used for event listeners that need to handle resync failures. + """ + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed + ) + + async def _resync( + self, + resync_args: dict[Any, Any], + ) -> None: + """ + Triggers the "on_resync" event. + """ + await self._before_resync() + try: + await self.events["on_resync"](resync_args) + await self._after_resync() + except Exception as e: + await self._on_resync_failure(e) + raise e + class EventListenerSettings(BaseOceanModel, extra=Extra.allow): type: str diff --git a/port_ocean/core/event_listener/http.py b/port_ocean/core/event_listener/http.py index 69083daa00..ca9bd4a3cb 100644 --- a/port_ocean/core/event_listener/http.py +++ b/port_ocean/core/event_listener/http.py @@ -64,6 +64,6 @@ async def _start(self) -> None: @target_channel_router.post("/resync") async def resync() -> None: - await self.events["on_resync"]({}) + await self._resync({}) ocean.app.fast_api_app.include_router(target_channel_router) diff --git a/port_ocean/core/event_listener/kafka.py b/port_ocean/core/event_listener/kafka.py index ba93915266..f9c749e767 100644 --- a/port_ocean/core/event_listener/kafka.py +++ b/port_ocean/core/event_listener/kafka.py @@ -122,7 +122,7 @@ async def _handle_message(self, raw_msg: Message) -> None: if "change.log" in topic and message is not None: try: - await self.events["on_resync"](message) + await self._resync(message) except Exception as e: _type, _, tb = sys.exc_info() logger.opt(exception=(_type, None, tb)).error( diff --git a/port_ocean/core/event_listener/once.py b/port_ocean/core/event_listener/once.py index 15154a4c35..9952b19f32 100644 --- a/port_ocean/core/event_listener/once.py +++ b/port_ocean/core/event_listener/once.py @@ -1,3 +1,4 @@ +import datetime import signal from typing import Literal, Any @@ -9,6 +10,9 @@ EventListenerSettings, ) from port_ocean.utils.repeat import repeat_every +from port_ocean.context.ocean import ocean +from port_ocean.utils.time import convert_str_to_utc_datetime, convert_to_minutes +from port_ocean.utils.misc import IntegrationStateStatus class OnceEventListenerSettings(EventListenerSettings): @@ -41,6 +45,97 @@ def __init__( ): super().__init__(events) self.event_listener_config = event_listener_config + self.cached_integration: dict[str, Any] | None = None + + async def get_current_integration_cached(self) -> dict[str, Any]: + if self.cached_integration: + return self.cached_integration + + self.cached_integration = await ocean.port_client.get_current_integration() + return self.cached_integration + + async def get_saas_resync_initialization_and_interval( + self, + ) -> tuple[int | None, datetime.datetime | None]: + """ + Get the scheduled resync interval and the last updated time of the integration config for the saas application. + interval is the saas configured resync interval time. + start_time is the last updated time of the integration config. + return: (interval, start_time) + """ + if not ocean.app.is_saas(): + return (None, None) + + try: + integration = await self.get_current_integration_cached() + except Exception as e: + logger.exception(f"Error occurred while getting current integration {e}") + return (None, None) + + interval_str = ( + integration.get("spec", {}) + .get("appSpec", {}) + .get("scheduledResyncInterval") + ) + + if not interval_str: + logger.error( + "Unexpected: scheduledResyncInterval not found for Saas integration, Cannot predict the next resync" + ) + return (None, None) + + last_updated_saas_integration_config_str = integration.get( + "statusInfo", {} + ).get("updatedAt") + + # we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal + if not last_updated_saas_integration_config_str: + logger.error( + "Unexpected: updatedAt not found for Saas integration, Cannot predict the next resync" + ) + return (None, None) + + return ( + convert_to_minutes(interval_str), + convert_str_to_utc_datetime(last_updated_saas_integration_config_str), + ) + + async def _before_resync(self) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._before_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_before_resync(interval, start_time) + + async def _after_resync(self) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._after_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Completed, interval, start_time + ) + + async def _on_resync_failure(self, e: Exception) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._after_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed, interval, start_time + ) async def _start(self) -> None: """ @@ -53,7 +148,7 @@ async def _start(self) -> None: async def resync_and_exit() -> None: logger.info("Once event listener started") try: - await self.events["on_resync"]({}) + await self._resync({}) except Exception: # we catch all exceptions here to make sure the application will exit gracefully logger.exception("Error occurred while resyncing") diff --git a/port_ocean/core/event_listener/polling.py b/port_ocean/core/event_listener/polling.py index 867c1aa439..c7fef4b8e0 100644 --- a/port_ocean/core/event_listener/polling.py +++ b/port_ocean/core/event_listener/polling.py @@ -49,7 +49,16 @@ def __init__( ): super().__init__(events) self.event_listener_config = event_listener_config - self._last_updated_at = None + + def should_resync(self, last_updated_at: str) -> bool: + _last_updated_at = ( + ocean.app.resync_state_updater.last_integration_state_updated_at + ) + + if _last_updated_at is None: + return self.event_listener_config.resync_on_start + + return _last_updated_at != last_updated_at async def _start(self) -> None: """ @@ -69,17 +78,12 @@ async def resync() -> None: integration = await ocean.app.port_client.get_current_integration() last_updated_at = integration["updatedAt"] - should_resync = ( - self._last_updated_at is not None - or self.event_listener_config.resync_on_start - ) and self._last_updated_at != last_updated_at - - if should_resync: + if self.should_resync(last_updated_at): logger.info("Detected change in integration, resyncing") - self._last_updated_at = last_updated_at - running_task: Task[Any] = get_event_loop().create_task( - self.events["on_resync"]({}) # type: ignore + ocean.app.resync_state_updater.last_integration_state_updated_at = ( + last_updated_at ) + running_task: Task[Any] = get_event_loop().create_task(self._resync({})) signal_handler.register(running_task.cancel) await running_task diff --git a/port_ocean/core/handlers/resync_state_updater/__init__.py b/port_ocean/core/handlers/resync_state_updater/__init__.py new file mode 100644 index 0000000000..162ff8a61b --- /dev/null +++ b/port_ocean/core/handlers/resync_state_updater/__init__.py @@ -0,0 +1,5 @@ +from .updater import ResyncStateUpdater + +__all__ = [ + "ResyncStateUpdater", +] diff --git a/port_ocean/core/handlers/resync_state_updater/updater.py b/port_ocean/core/handlers/resync_state_updater/updater.py new file mode 100644 index 0000000000..0e4c8b011c --- /dev/null +++ b/port_ocean/core/handlers/resync_state_updater/updater.py @@ -0,0 +1,84 @@ +import datetime +from typing import Any, Literal +from port_ocean.clients.port.client import PortClient +from port_ocean.utils.misc import IntegrationStateStatus +from port_ocean.utils.time import get_next_occurrence + + +class ResyncStateUpdater: + def __init__(self, port_client: PortClient, scheduled_resync_interval: int | None): + self.port_client = port_client + self.initiated_at = datetime.datetime.now(tz=datetime.timezone.utc) + self.scheduled_resync_interval = scheduled_resync_interval + + # This is used to differ between integration changes that require a full resync and state changes + # So that the polling event-listener can decide whether to perform a full resync or not + # TODO: remove this once we separate the state from the integration + self.last_integration_state_updated_at: str = "" + + def _calculate_next_scheduled_resync( + self, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> str | None: + if interval is None: + return None + return get_next_occurrence( + interval * 60, custom_start_time or self.initiated_at + ).isoformat() + + async def update_before_resync( + self, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> None: + _interval = interval or self.scheduled_resync_interval + nest_resync = self._calculate_next_scheduled_resync( + _interval, custom_start_time + ) + state: dict[str, Any] = { + "status": IntegrationStateStatus.Running.value, + "lastResyncEnd": None, + "lastResyncStart": datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat(), + "nextResync": nest_resync, + "intervalInMinuets": _interval, + } + + integration = await self.port_client.update_integration_state( + state, should_raise=False + ) + if integration: + self.last_integration_state_updated_at = integration["resyncState"][ + "updatedAt" + ] + + async def update_after_resync( + self, + status: Literal[ + IntegrationStateStatus.Completed, IntegrationStateStatus.Failed + ] = IntegrationStateStatus.Completed, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> None: + _interval = interval or self.scheduled_resync_interval + nest_resync = self._calculate_next_scheduled_resync( + _interval, custom_start_time + ) + state: dict[str, Any] = { + "status": status.value, + "lastResyncEnd": datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat(), + "nextResync": nest_resync, + "intervalInMinuets": _interval, + } + + integration = await self.port_client.update_integration_state( + state, should_raise=False + ) + if integration: + self.last_integration_state_updated_at = integration["resyncState"][ + "updatedAt" + ] diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index 9535fa6c9d..8da040c912 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -1,11 +1,14 @@ from dataclasses import dataclass, field -from typing import Any, Literal +from enum import Enum +from typing import Any from pydantic import BaseModel from pydantic.fields import Field -Runtime = Literal["OnPrem", "Saas"] +class Runtime(Enum): + Saas = "Saas" + OnPrem = "OnPrem" class Entity(BaseModel): diff --git a/port_ocean/core/utils.py b/port_ocean/core/utils.py index ce26318d97..4adc576412 100644 --- a/port_ocean/core/utils.py +++ b/port_ocean/core/utils.py @@ -35,12 +35,13 @@ def is_same_entity(first_entity: Entity, second_entity: Entity) -> bool: async def validate_integration_runtime( - port_client: PortClient, requested_runtime: Runtime + port_client: PortClient, + requested_runtime: Runtime, ) -> None: logger.debug("Validating integration runtime") current_integration = await port_client.get_current_integration(should_raise=False) current_runtime = current_integration.get("installationType", "OnPrem") - if current_integration and current_runtime != requested_runtime: + if current_integration and current_runtime != requested_runtime.value: raise IntegrationRuntimeException( f"Invalid Runtime! Requested to run existing {current_runtime} integration in {requested_runtime} runtime." ) diff --git a/port_ocean/ocean.py b/port_ocean/ocean.py index 6c157a7b89..3c6827552a 100644 --- a/port_ocean/ocean.py +++ b/port_ocean/ocean.py @@ -9,6 +9,8 @@ from pydantic import BaseModel from starlette.types import Scope, Receive, Send +from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater +from port_ocean.core.models import Runtime from port_ocean.clients.port.client import PortClient from port_ocean.config.settings import ( IntegrationConfiguration, @@ -24,6 +26,7 @@ from port_ocean.utils.repeat import repeat_every from port_ocean.utils.signal import signal_handler from port_ocean.version import __integration_version__ +from port_ocean.utils.misc import IntegrationStateStatus class Ocean: @@ -63,16 +66,27 @@ def __init__( integration_class(ocean) if integration_class else BaseIntegration(ocean) ) + self.resync_state_updater = ResyncStateUpdater( + self.port_client, self.config.scheduled_resync_interval + ) + + def is_saas(self) -> bool: + return self.config.runtime == Runtime.Saas + async def _setup_scheduled_resync( self, ) -> None: - def execute_resync_all() -> None: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - + async def execute_resync_all() -> None: + await self.resync_state_updater.update_before_resync() logger.info("Starting a new scheduled resync") - loop.run_until_complete(self.integration.sync_raw_all()) - loop.close() + try: + await self.integration.sync_raw_all() + await self.resync_state_updater.update_after_resync() + except Exception as e: + await self.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed + ) + raise e interval = self.config.scheduled_resync_interval if interval is not None: @@ -83,7 +97,11 @@ def execute_resync_all() -> None: seconds=interval * 60, # Not running the resync immediately because the event listener should run resync on startup wait_first=True, - )(lambda: threading.Thread(target=execute_resync_all).start()) + )( + lambda: threading.Thread( + target=lambda: asyncio.run(execute_resync_all()) + ).start() + ) await repeated_function() async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: diff --git a/port_ocean/utils/misc.py b/port_ocean/utils/misc.py index c21cccc577..59029144b1 100644 --- a/port_ocean/utils/misc.py +++ b/port_ocean/utils/misc.py @@ -1,3 +1,4 @@ +from enum import Enum import inspect from importlib.util import spec_from_file_location, module_from_spec from pathlib import Path @@ -5,11 +6,16 @@ from types import ModuleType from typing import Callable, Any from uuid import uuid4 - import tomli import yaml +class IntegrationStateStatus(Enum): + Running = "running" + Failed = "failed" + Completed = "completed" + + def get_time(seconds_precision: bool = True) -> float: """Return current time as Unix/Epoch timestamp, in seconds. :param seconds_precision: if True, return with seconds precision as integer (default). diff --git a/port_ocean/utils/time.py b/port_ocean/utils/time.py new file mode 100644 index 0000000000..4b3553aeb4 --- /dev/null +++ b/port_ocean/utils/time.py @@ -0,0 +1,54 @@ +import datetime +from loguru import logger + + +def convert_str_to_utc_datetime(time_str: str) -> datetime.datetime | None: + """ + Convert a string representing time to a datetime object. + :param time_str: a string representing time in the format "2021-09-01T12:00:00Z" + """ + aware_date = datetime.datetime.fromisoformat(time_str) + if time_str.endswith("Z"): + aware_date = datetime.datetime.fromisoformat(time_str.replace("Z", "+00:00")) + return aware_date.astimezone(datetime.timezone.utc) + + +def convert_to_minutes(s: str) -> int: + minutes_per_unit = {"s": 1 / 60, "m": 1, "h": 60, "d": 1440, "w": 10080} + try: + return int(int(s[:-1]) * minutes_per_unit[s[-1]]) + except Exception: + logger.error(f"Failed converting string to minutes, {s}") + raise ValueError( + f"Invalid format. Expected a string ending with {minutes_per_unit.keys()}" + ) + + +def get_next_occurrence( + interval_seconds: int, + start_time: datetime.datetime, + now: datetime.datetime | None = None, +) -> datetime.datetime: + """ + Predict the next occurrence of an event based on interval, start time, and current time. + + :param interval_minutes: Interval between occurrences in minutes. + :param start_time: Start time of the event as a datetime object. + :param now: Current time as a datetime object. + :return: The next occurrence time as a datetime object. + """ + + if now is None: + now = datetime.datetime.now(tz=datetime.timezone.utc) + # Calculate the total seconds elapsed since the start time + elapsed_seconds = (now - start_time).total_seconds() + + # Calculate the number of intervals that have passed + intervals_passed = int(elapsed_seconds // interval_seconds) + + # Calculate the next occurrence time + next_occurrence = start_time + datetime.timedelta( + seconds=(intervals_passed + 1) * interval_seconds + ) + + return next_occurrence diff --git a/pyproject.toml b/pyproject.toml index 638b265ea1..c0bf6bab0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.9.14" +version = "0.10.0" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io" From 750dbe94b7ce5fd0e0b4a3f2ac2ed278625bd66c Mon Sep 17 00:00:00 2001 From: Shalev Avhar <51760613+shalev007@users.noreply.github.com> Date: Wed, 21 Aug 2024 21:00:38 +0300 Subject: [PATCH 4/4] fix resync-state kafka listener bug (#931) - **fix: kafka listener constant resync loop** - **feat: bump version** # Description *Bugfix* ```diff ! What - Fix kafka listener never ending loop of resyncs bug - Why - This bug was created when we started to updated the resyncState along with integration data, - therefor each update would create a new audit-log change which triggered the Kafka listener to start a new resync + How - we validate that it was not an update created by the resyc-state by comparing them ``` ## Type of change Please leave one option from the following and delete the rest: - [X] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Shalev Avhar --- CHANGELOG.md | 1 + port_ocean/core/event_listener/kafka.py | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d640ceabea..380c41d6bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Improvements - Add support for reporting the integration resync state to expose more information about the integration state in the portal +- Fix kafka listener never ending resync loop due to resyncState updates ## 0.9.14 (2024-08-19) diff --git a/port_ocean/core/event_listener/kafka.py b/port_ocean/core/event_listener/kafka.py index f9c749e767..9efb7c9874 100644 --- a/port_ocean/core/event_listener/kafka.py +++ b/port_ocean/core/event_listener/kafka.py @@ -99,9 +99,13 @@ def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: return False integration_identifier = after.get("identifier") - if integration_identifier == self.integration_identifier and ( - "change.log" in topic - ): + if integration_identifier != self.integration_identifier: + return False + + if after.get("updatedAt") == after.get("resyncState", {}).get("updatedAt"): + return False + + if "change.log" in topic: return msg_value.get("changelogDestination", {}).get("type", "") == "KAFKA" return False