From 1c555cfca7c1fc0e97c1239912715c8e3713fcc6 Mon Sep 17 00:00:00 2001 From: Julien Pinchelimouroux Date: Mon, 28 Oct 2024 11:31:06 +0100 Subject: [PATCH] feat: add pagination configs for HTTP API Connector (TCTC-9227) (#1794) * feat: add pagination configs for HTTP API Connector * feat: add pagination type for front configuration * feat: add hyper_media pagination, options to page based pagination and apply review remarks * test: add tests for better coverage * fix: set pagination config base class as full abstract * fix: add kind const to pagination configs and hide Noop config to end-users * fix: mark pagination config kind as required * fix: remove ui hidden option and default value from kind attribute * feat(HttpAPI): use discriminator field for paginationsConfigs * feat: move pagination config from connector to data source * feat(HttpAPI): re-hide kind field from the UI * doc: add pagination config feature to changelog --------- Co-authored-by: David Nowinsky --- CHANGELOG.md | 4 + tests/http_api/test_http_api.py | 314 +++++++++++++++++- toucan_connectors/common.py | 12 +- .../http_api/http_api_connector.py | 163 +++++---- .../http_api/http_api_data_source.py | 97 ++++++ .../http_api/pagination_configs.py | 206 ++++++++++++ .../snowflake/snowflake_connector.py | 11 +- toucan_connectors/toucan_connector.py | 7 +- 8 files changed, 712 insertions(+), 102 deletions(-) create mode 100644 toucan_connectors/http_api/http_api_data_source.py create mode 100644 toucan_connectors/http_api/pagination_configs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index dc67a496a..dc8c78d21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- HTTP API: Add a `PaginationConfig` to `HttpAPIDataSource` in order to handle API pagination and fetch all data. It supports the following kinds of pagination: page-based, cursor-based, offset-limit and hypermedia. + ## [7.0.3] 2024-10-04 ### Fix diff --git a/tests/http_api/test_http_api.py b/tests/http_api/test_http_api.py index ee1d34d8a..4e09c8b4f 100644 --- a/tests/http_api/test_http_api.py +++ b/tests/http_api/test_http_api.py @@ -6,7 +6,18 @@ from pytest_mock import MockFixture from toucan_connectors.common import transform_with_jq -from toucan_connectors.http_api.http_api_connector import Auth, HttpAPIConnector, HttpAPIDataSource +from toucan_connectors.http_api.http_api_connector import ( + Auth, + HttpAPIConnector, + HttpAPIConnectorError, + HttpAPIDataSource, +) +from toucan_connectors.http_api.pagination_configs import ( + CursorBasedPaginationConfig, + HyperMediaPaginationConfig, + OffsetLimitPaginationConfig, + PageBasedPaginationConfig, +) from toucan_connectors.json_wrapper import JsonWrapper @@ -49,6 +60,32 @@ def auth(): return Auth(type="basic", args=["username", "password"]) +@pytest.fixture(scope="function") +def offset_pagination() -> OffsetLimitPaginationConfig: + return OffsetLimitPaginationConfig( + kind="OffsetLimitPaginationConfig", offset_name="super_offset", limit_name="super_limit", limit=5 + ) + + +@pytest.fixture(scope="function") +def page_pagination() -> PageBasedPaginationConfig: + return PageBasedPaginationConfig( + kind="PageBasedPaginationConfig", page_name="my_page", per_page_name="my_per_page", per_page=2, page=1 + ) + + +@pytest.fixture(scope="function") +def cursor_pagination() -> CursorBasedPaginationConfig: + return CursorBasedPaginationConfig( + kind="CursorBasedPaginationConfig", cursor_name="my_cursor", cursor_filter=".metadata.next_cursor" + ) + + +@pytest.fixture(scope="function") +def hyper_media_pagination() -> HyperMediaPaginationConfig: + return HyperMediaPaginationConfig(kind="HyperMediaPaginationConfig", next_link_filter=".metadata.next_link") + + def test_transform_with_jq(): assert transform_with_jq(data=[1, 2, 3], jq_filter=".[]+1") == [2, 3, 4] assert transform_with_jq([[1, 2, 3]], ".[]") == [1, 2, 3] @@ -82,7 +119,278 @@ def test_get_df_with_auth(connector, data_source, auth): @responses.activate -def test_get_df_with_parameters(connector, data_source, mocker): +def test_get_df_with_offset_pagination( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, offset_pagination: OffsetLimitPaginationConfig +) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?super_offset=0&super_limit=5", + json=[{"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}], + ) + + # second page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?super_offset=5&super_limit=5", + json=[ + {"a": 6}, + {"a": 7}, + {"a": 8}, + {"b": 9}, + {"b": 10}, + ], + ) + + # last page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?super_offset=10&super_limit=5", + json=[ + {"b": 11}, + {"b": 12}, + ], + ) + + data_source.http_pagination_config = offset_pagination + df = connector.get_df(data_source) + assert df.shape == (12, 2) + assert len(responses.calls) == 3 + + +@responses.activate +def test_get_df_with_page_pagination( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, page_pagination: PageBasedPaginationConfig +) -> None: + page_pagination.max_page_filter = ".metadata.number_of_pages" + + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_page=1&my_per_page=2", + json={ + "content": [ + {"a": 1}, + {"a": 2}, + ], + "metadata": {"number_of_pages": 2}, + }, + ) + + # next page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_page=2&my_per_page=2", + json={ + "content": [ + {"a": 3}, + {"a": 4}, + ], + "metadata": {"number_of_pages": 2}, + }, + ) + + data_source.filter = ".content" + data_source.http_pagination_config = page_pagination + df = connector.get_df(data_source) + assert df.shape == (4, 1) + assert len(responses.calls) == 2 + + +@responses.activate +def test_get_df_with_page_pagination_which_can_raise( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, page_pagination: PageBasedPaginationConfig +) -> None: + page_pagination.can_raise_not_found = True + + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_page=1&my_per_page=2", + json={ + "content": [ + {"a": 1}, + {"a": 2}, + ], + }, + ) + + # next page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_page=2&my_per_page=2", + json={ + "content": [ + {"a": 3}, + {"a": 4}, + ], + }, + ) + + # not found + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_page=3&my_per_page=2", + json={"error": "not found"}, + status=404, + ) + + data_source.filter = ".content" + data_source.http_pagination_config = page_pagination + df = connector.get_df(data_source) + assert df.shape == (4, 1) + assert len(responses.calls) == 3 + + +@responses.activate +def test_get_df_with_cursor_pagination( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, cursor_pagination: CursorBasedPaginationConfig +) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments", + json={ + "content": [ + {"a": 1}, + {"a": 2}, + ], + "metadata": {"next_cursor": "super_cursor_22222", "number_of_results": 4}, + }, + ) + + # next page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?my_cursor=super_cursor_22222", + json={ + "content": [ + {"a": 3}, + {"a": 4}, + ], + "metadata": {"number_of_results": 4}, + }, + ) + data_source.http_pagination_config = cursor_pagination + data_source.filter = ".content" + df = connector.get_df(data_source) + assert df.shape == (4, 1) + assert len(responses.calls) == 2 + + +@responses.activate +def test_get_df_with_hyper_media_pagination( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, hyper_media_pagination: HyperMediaPaginationConfig +) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?custom=yes", + json={ + "content": [ + {"a": 1}, + {"a": 2}, + ], + "metadata": { + "next_link": "https://jsonplaceholder.typicode.com/comments/next_link?token=12341243&custom=yes", + "number_of_results": 4, + }, + }, + ) + + # next page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments/next_link?token=12341243&custom=yes", + json={ + "content": [ + {"a": 3}, + {"a": 4}, + ], + "metadata": {"number_of_results": 4}, + }, + ) + data_source.http_pagination_config = hyper_media_pagination + data_source.filter = ".content" + data_source.params = {"custom": "yes"} + df = connector.get_df(data_source) + assert df.shape == (4, 1) + assert len(responses.calls) == 2 + + +@responses.activate +def test_hyper_media_pagination_raise_if_bad_next_link( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, hyper_media_pagination: HyperMediaPaginationConfig +) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments?custom=yes", + json={ + "content": [ + {"a": 1}, + {"a": 2}, + ], + "metadata": { + "next_link": {"real_link": "my_link"}, + "number_of_results": 4, + }, + }, + ) + + data_source.http_pagination_config = hyper_media_pagination + data_source.filter = ".content" + data_source.params = {"custom": "yes"} + with pytest.raises(ValueError) as exc: + connector.get_df(data_source) + assert str(exc.value) == ( + "Invalid next link value. Link can't be a complex value," " got: {'real_link': 'my_link'}" + ) + + +@responses.activate +def test_ignore_if_cant_parse_next_pagination_info( + connector: HttpAPIConnector, data_source: HttpAPIDataSource, hyper_media_pagination: HyperMediaPaginationConfig +) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments", + json=[ + {"a": 1}, + {"a": 2}, + ], + ) + + data_source.http_pagination_config = hyper_media_pagination # needs a 'metadata' field to retrieve the next link + # Ok even if 'metadata' is missing in the API response + df = connector.get_df(data_source) + assert df.shape == (2, 1) + assert len(responses.calls) == 1 + + +@responses.activate +def test_raises_http_error_on_too_many_requests(connector: HttpAPIConnector, data_source: HttpAPIDataSource) -> None: + # first page + responses.add( + responses.GET, + "https://jsonplaceholder.typicode.com/comments", + json=[ + {"a": 1}, + {"a": 2}, + ], + status=429, + ) + with pytest.raises(HttpAPIConnectorError) as exc: + connector.get_df(data_source) + assert str(exc.value) == ( + "Failed to retrieve data: the connector tried to perform too many requests." + " Please check your API call limitations." + ) + + +@responses.activate +def test_get_df_with_parameters(connector, data_source): data_source.parameters = {"first_name": "raphael"} data_source.headers = {"name": "%(first_name)s"} @@ -462,7 +770,7 @@ def test_get_cache_key(connector, auth, data_source): data_source.parameters = {"first_name": "raphael"} key = connector.get_cache_key(data_source) - assert key == "f24af0b5-f745-3961-8aec-a27d44543fb9" + assert key == "9ef95981-2aab-3f7f-89d1-b0a300d16f14" data_source.headers = {"name": "{{ first_name }}"} # change the templating style key2 = connector.get_cache_key(data_source) diff --git a/toucan_connectors/common.py b/toucan_connectors/common.py index d3d4727c5..d27173368 100644 --- a/toucan_connectors/common.py +++ b/toucan_connectors/common.py @@ -259,11 +259,15 @@ def transform_with_jq(data: object, jq_filter: str) -> list: return data +FilterSchemaDescription: str = ( + "You can apply filters to json response if data is nested. As we rely on a " + "library called jq, we suggest the refer to the dedicated " + 'documentation' +) + FilterSchema = Field( ".", - description="You can apply filters to json response if data is nested. As we rely on a " - "library called jq, we suggest the refer to the dedicated " - 'documentation', + description=FilterSchemaDescription, ) XpathSchema = Field( @@ -273,6 +277,8 @@ def transform_with_jq(data: object, jq_filter: str) -> list: 'documentation', ) +UI_HIDDEN: dict[str, Any] = {"ui.hidden": True} + def get_loop(): """Sets up event loop""" diff --git a/toucan_connectors/http_api/http_api_connector.py b/toucan_connectors/http_api/http_api_connector.py index a6dc49c54..586e7a94e 100644 --- a/toucan_connectors/http_api/http_api_connector.py +++ b/toucan_connectors/http_api/http_api_connector.py @@ -5,7 +5,13 @@ from xml.etree.ElementTree import ParseError, fromstring, tostring from pydantic import AnyHttpUrl, BaseModel, Field, FilePath -from pydantic.json_schema import DEFAULT_REF_TEMPLATE, GenerateJsonSchema, JsonSchemaMode +from requests.exceptions import HTTPError + +from toucan_connectors.http_api.http_api_data_source import HttpAPIDataSource, apply_pagination_to_data_source +from toucan_connectors.http_api.pagination_configs import ( + NoopPaginationConfig, + extract_pagination_info_from_result, +) try: import pandas as pd @@ -19,26 +25,29 @@ from toucan_connectors.auth import Auth from toucan_connectors.common import ( - FilterSchema, - XpathSchema, + HttpError, nosql_apply_parameters_to_query, transform_with_jq, ) from toucan_connectors.toucan_connector import ToucanConnector, ToucanDataSource from toucan_connectors.utils.json_to_table import json_to_table +TOO_MANY_REQUESTS = 429 + + +class HttpAPIConnectorError(Exception): + """Raised when an error occurs while fetching data from an HTTP API""" + + def __init__(self, message: str, original_exc: HttpError) -> None: + super().__init__(message) + self.original_exc = original_exc + class ResponseType(str, Enum): json = "json" xml = "xml" -class Method(str, Enum): - GET = "GET" - POST = "POST" - PUT = "PUT" - - class Template(BaseModel): headers: dict | None = Field( None, @@ -64,71 +73,6 @@ class Template(BaseModel): ) -class HttpAPIDataSource(ToucanDataSource): - url: str = Field( - ..., - title="Endpoint URL", - description="The URL path that will be appended to your baseroute URL. " 'For example "geo/countries"', - ) - method: Method = Field(Method.GET, title="HTTP Method") - headers: dict | None = Field( - None, - description="You can also setup headers in the Template section of your Connector see:
" - "https://docs.toucantoco.com/concepteur/tutorials/connectors/3-http-connector.html#template", - examples=['{ "content-type": "application/xml" }'], - ) - params: dict | None = Field( - None, - title="URL params", - description="JSON object of parameters to send in the query string of this HTTP request " - '(e.g. "offset" and "limit" in https://www/api-aseroute/data&offset=100&limit=50)', - examples=['{ "offset": 100, "limit": 50 }'], - ) - json_: dict | None = Field( - None, - alias="json", - title="Body", - description="JSON object of parameters to send in the body of every HTTP request", - examples=['{ "offset": 100, "limit": 50 }'], - ) - proxies: dict | None = Field( - None, - description="JSON object expressing a mapping of protocol or host to corresponding proxy", - examples=['{"http": "foo.bar:3128", "http://host.name": "foo.bar:4012"}'], - ) - data: str | dict | None = Field(None, description="JSON object to send in the body of the HTTP request") - xpath: str = XpathSchema - filter: str = FilterSchema - flatten_column: str | None = Field(None, description="Column containing nested rows") - - @classmethod - def model_json_schema( - cls, - by_alias: bool = True, - ref_template: str = DEFAULT_REF_TEMPLATE, - schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, - mode: JsonSchemaMode = "validation", - ) -> dict[str, Any]: - schema = super().model_json_schema( - by_alias=by_alias, - ref_template=ref_template, - schema_generator=schema_generator, - mode=mode, - ) - keys = schema["properties"].keys() - last_keys = [ - "proxies", - "flatten_column", - "data", - "xpath", - "filter", - "validation", - ] - new_keys = [k for k in keys if k not in last_keys] + last_keys - schema["properties"] = {k: schema["properties"][k] for k in new_keys} - return schema - - class HttpAPIConnector(ToucanConnector, data_source_model=HttpAPIDataSource): responsetype: ResponseType = Field(ResponseType.json, title="Content-type of response") baseroute: AnyHttpUrl = Field(..., title="Baseroute URL", description="Baseroute URL") @@ -148,7 +92,6 @@ def do_request(self, query, session): Returns: data (list): The response from the API in the form of a list of dict """ - jq_filter = query["filter"] xpath = query["xpath"] available_params = ["url", "method", "params", "data", "json", "headers", "proxies"] query = {k: v for k, v in query.items() if k in available_params} @@ -162,6 +105,8 @@ def do_request(self, query, session): res = session.request(**query) HttpAPIConnector.logger.debug(f"<< Response: status_code={res.status_code} reason={res.reason}") + res.raise_for_status() + if self.responsetype == "xml": try: data = fromstring(res.content) # noqa: S314 @@ -183,24 +128,72 @@ def do_request(self, query, session): f'Cannot decode response content from query: method={query.get("method")} url={query.get("url")} response_status_code={res.status_code} response_reason=${res.reason}' # noqa: E501 ) raise - try: - return transform_with_jq(data, jq_filter) - except ValueError: - HttpAPIConnector.logger.error(f"Could not transform {data} using {jq_filter}") - raise + return data + + def perform_requests(self, data_source: HttpAPIDataSource, session: Session) -> list[Any]: + results = [] + # Extract first http_pagination_config from data_source + pagination_config = data_source.http_pagination_config or NoopPaginationConfig() + while pagination_config is not None: + data_source = apply_pagination_to_data_source(data_source, pagination_config) + query = self._render_query(data_source) + jq_filter = query["filter"] + query.pop("http_pagination_config") + # Retrieve data + try: + raw_result = self.do_request(query, session) + except HTTPError as exc: + if whitelisted_status_codes := pagination_config.get_error_status_whitelist(): + if exc.response.status_code in whitelisted_status_codes: + # If a whitelisted error occurs, we want to stop paginated data retrieving iteration + break + else: + raise + else: + raise + # Parse retrieved data with JQ filter + try: + parsed_result = transform_with_jq(raw_result, jq_filter) + except ValueError: + HttpAPIConnector.logger.error(f"Could not transform {raw_result} using {jq_filter}") + raise + # Prepare next pagination config + parsed_pagination_info = None + # Extract pagination metadata from api response if needed + if jq_pagination_filter := pagination_config.get_pagination_info_filter(): + parsed_pagination_info = extract_pagination_info_from_result(raw_result, jq_pagination_filter) + pagination_config = pagination_config.get_next_pagination_config( + result=parsed_result, pagination_info=parsed_pagination_info + ) + results += parsed_result + return results def _retrieve_data(self, data_source: HttpAPIDataSource) -> "pd.DataFrame": if self.auth: session = self.auth.get_session() else: session = Session() + # Try retrieve dataset + try: + results = pd.DataFrame( + self.perform_requests( + data_source=data_source, + session=session, + ) + ) + except HTTPError as exc: + if exc.response.status_code == TOO_MANY_REQUESTS: + raise HttpAPIConnectorError( + message="Failed to retrieve data: the connector tried to perform too many requests." + " Please check your API call limitations.", + original_exc=exc, + ) from exc + else: + raise - query = self._render_query(data_source) - - res = pd.DataFrame(self.do_request(query, session)) if data_source.flatten_column: - return json_to_table(res, columns=[data_source.flatten_column]) - return res + return json_to_table(results, columns=[data_source.flatten_column]) + return results def _render_query(self, data_source): query = nosql_apply_parameters_to_query( diff --git a/toucan_connectors/http_api/http_api_data_source.py b/toucan_connectors/http_api/http_api_data_source.py new file mode 100644 index 000000000..a7c214f5a --- /dev/null +++ b/toucan_connectors/http_api/http_api_data_source.py @@ -0,0 +1,97 @@ +from enum import Enum +from typing import Any + +from pydantic import Field +from pydantic.json_schema import DEFAULT_REF_TEMPLATE, GenerateJsonSchema, JsonSchemaMode + +from toucan_connectors import ToucanDataSource +from toucan_connectors.common import ( + FilterSchema, + XpathSchema, +) +from toucan_connectors.http_api.pagination_configs import ( + HttpPaginationConfig, + PaginationConfig, +) + + +class Method(str, Enum): + GET = "GET" + POST = "POST" + PUT = "PUT" + + +class HttpAPIDataSource(ToucanDataSource): + url: str = Field( + ..., + title="Endpoint URL", + description="The URL path that will be appended to your baseroute URL. " 'For example "geo/countries"', + ) + method: Method = Field(Method.GET, title="HTTP Method") + headers: dict | None = Field( + None, + description="You can also setup headers in the Template section of your Connector see:
" + "https://docs.toucantoco.com/concepteur/tutorials/connectors/3-http-connector.html#template", + examples=['{ "content-type": "application/xml" }'], + ) + params: dict | None = Field( + None, + title="URL params", + description="JSON object of parameters to send in the query string of this HTTP request " + '(e.g. "offset" and "limit" in https://www/api-aseroute/data&offset=100&limit=50)', + examples=['{ "offset": 100, "limit": 50 }'], + ) + json_: dict | None = Field( + None, + alias="json", + title="Body", + description="JSON object of parameters to send in the body of every HTTP request", + examples=['{ "offset": 100, "limit": 50 }'], + ) + proxies: dict | None = Field( + None, + description="JSON object expressing a mapping of protocol or host to corresponding proxy", + examples=['{"http": "foo.bar:3128", "http://host.name": "foo.bar:4012"}'], + ) + data: str | dict | None = Field(None, description="JSON object to send in the body of the HTTP request") + xpath: str = XpathSchema + filter: str = FilterSchema + flatten_column: str | None = Field(None, description="Column containing nested rows") + http_pagination_config: HttpPaginationConfig | None = Field( + None, title="Pagination configuration", discriminator="kind" + ) + + @classmethod + def model_json_schema( + cls, + by_alias: bool = True, + ref_template: str = DEFAULT_REF_TEMPLATE, + schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, + mode: JsonSchemaMode = "validation", + ) -> dict[str, Any]: + schema = super().model_json_schema( + by_alias=by_alias, + ref_template=ref_template, + schema_generator=schema_generator, + mode=mode, + ) + keys = schema["properties"].keys() + last_keys = [ + "proxies", + "flatten_column", + "data", + "xpath", + "filter", + "validation", + ] + new_keys = [k for k in keys if k not in last_keys] + last_keys + schema["properties"] = {k: schema["properties"].get(k) for k in new_keys} + return schema + + +def apply_pagination_to_data_source( + data_source: HttpAPIDataSource, pagination_config: PaginationConfig +) -> HttpAPIDataSource: + """Apply http pagination config to its parameters""" + updates = pagination_config.plan_pagination_updates_to_data_source(request_params=data_source.params) + return data_source.model_copy(update=updates) diff --git a/toucan_connectors/http_api/pagination_configs.py b/toucan_connectors/http_api/pagination_configs.py new file mode 100644 index 000000000..2278700c3 --- /dev/null +++ b/toucan_connectors/http_api/pagination_configs.py @@ -0,0 +1,206 @@ +import logging +from abc import ABC, abstractmethod +from typing import Any, Literal, Optional +from urllib.parse import parse_qs, urlparse + +from pydantic import BaseModel, Field + +from toucan_connectors.common import UI_HIDDEN, FilterSchemaDescription + +_LOGGER = logging.getLogger(__name__) + + +class PaginationConfig(BaseModel, ABC): + """Base class for pagination configs""" + + @abstractmethod + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + """Plans pagination updates for data source""" + + @abstractmethod + def get_next_pagination_config(self, result: Any, pagination_info: Any | None) -> Optional["PaginationConfig"]: + """Computes next pagination config based on the parsed API responses""" + + @abstractmethod + def get_pagination_info_filter(self) -> str | None: + """Returns the JQ filter that must be applied to the raw API result to retrieve pagination info""" + + @abstractmethod + def get_error_status_whitelist(self) -> list[str] | None: + """Returns the list of the error statuses which means the end of data fetching, and so to ignore""" + + +class NoopPaginationConfig(PaginationConfig): + """Pagination config without effects + + Config applied for connectors without pagination configured. + Useful for connectors that can return all results at once. + """ + + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + return {} + + def get_next_pagination_config(self, result: Any, pagination_info: Any | None) -> Optional["PaginationConfig"]: + return None + + def get_pagination_info_filter(self) -> str | None: + return None + + def get_error_status_whitelist(self) -> list[str] | None: + return None + + +class OffsetLimitPaginationConfig(PaginationConfig): + kind: Literal["OffsetLimitPaginationConfig"] = Field(..., **UI_HIDDEN) + offset_name: str = "offset" + offset: int = Field(0, **UI_HIDDEN) + limit_name: str = "limit" + limit: int + + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + offset_limit_params = {self.offset_name: self.offset, self.limit_name: self.limit} + if request_params is None: + data_source_params = offset_limit_params + else: + data_source_params = request_params | offset_limit_params + return {"params": data_source_params} + + def get_next_pagination_config( + self, result: Any, pagination_info: Any | None + ) -> Optional["OffsetLimitPaginationConfig"]: + if len(result) < self.limit: + return None + else: + return self.model_copy(update={"offset": self.offset + self.limit}) + + def get_error_status_whitelist(self) -> list[str] | None: + return None + + def get_pagination_info_filter(self) -> str | None: + return None + + +class PageBasedPaginationConfig(PaginationConfig): + kind: Literal["PageBasedPaginationConfig"] = Field(..., **UI_HIDDEN) + page_name: str = "page" + page: int = 0 + per_page_name: str | None = None + per_page: int | None = None + max_page_filter: str | None = Field(None, description=FilterSchemaDescription) + can_raise_not_found: bool = Field( + False, + description="Some APIs can raise a not found error (404) when requesting the next page.", + ) + + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + page_based_params = {self.page_name: self.page} + if self.per_page_name: + page_based_params |= {self.per_page_name: self.per_page} + if request_params is None: + data_source_params = page_based_params + else: + data_source_params = request_params | page_based_params + return {"params": data_source_params} + + def get_next_pagination_config( + self, result: Any, pagination_info: Any | None + ) -> Optional["PageBasedPaginationConfig"]: + if self.max_page_filter: + if pagination_info is None: + return None + if self.page >= int(pagination_info): + return None + if self.per_page: + if len(result) < self.per_page: + return None + if len(result) < 1: + return None + else: + return self.model_copy(update={"page": self.page + 1}) + + def get_pagination_info_filter(self) -> str: + return self.max_page_filter + + def get_error_status_whitelist(self) -> list[int] | None: + if self.can_raise_not_found: + return [404] + return None + + +class CursorBasedPaginationConfig(PaginationConfig): + kind: Literal["CursorBasedPaginationConfig"] = Field(..., **UI_HIDDEN) + cursor_name: str = "cursor" + cursor: str | None = Field(None, **UI_HIDDEN) + cursor_filter: str = Field(..., description=FilterSchemaDescription) + + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + if self.cursor: + cursor_params = {self.cursor_name: self.cursor} + if request_params is None: + data_source_params = cursor_params + else: + data_source_params = request_params | cursor_params + return {"params": data_source_params} + else: + return {} + + def get_next_pagination_config( + self, result: Any, pagination_info: Any | None + ) -> Optional["CursorBasedPaginationConfig"]: + if pagination_info is None: + return None + if isinstance(pagination_info, dict) or isinstance(pagination_info, list): + raise ValueError(f"Invalid next cursor value. Cursor can't be a complex value, got: {pagination_info}") + else: + return self.model_copy(update={"cursor": str(pagination_info)}) + + def get_pagination_info_filter(self) -> str: + return self.cursor_filter + + def get_error_status_whitelist(self) -> list[str] | None: + return None + + +class HyperMediaPaginationConfig(PaginationConfig): + kind: Literal["HyperMediaPaginationConfig"] = Field(..., **UI_HIDDEN) + next_link_filter: str = Field(..., description=FilterSchemaDescription) + next_link: str | None = None + + def plan_pagination_updates_to_data_source(self, request_params: dict[str, Any] | None) -> dict[str, Any]: + if self.next_link: + url_chunks = urlparse(self.next_link) + url_parameters = parse_qs(url_chunks.query) | (request_params or {}) + return {"url": url_chunks.path, "params": url_parameters} + else: + return {} + + def get_next_pagination_config( + self, result: Any, pagination_info: Any | None + ) -> Optional["HyperMediaPaginationConfig"]: + if pagination_info is None: + return None + if isinstance(pagination_info, dict) or isinstance(pagination_info, list): + raise ValueError(f"Invalid next link value. Link can't be a complex value, got: {pagination_info}") + else: + return self.model_copy(update={"next_link": str(pagination_info)}) + + def get_pagination_info_filter(self) -> str: + return self.next_link_filter + + def get_error_status_whitelist(self) -> list[str] | None: + return None + + +HttpPaginationConfig = ( + CursorBasedPaginationConfig | PageBasedPaginationConfig | OffsetLimitPaginationConfig | HyperMediaPaginationConfig +) + + +def extract_pagination_info_from_result(api_response: dict | list, jq_pagination_filter: str): + import jq + + try: + return jq.first(jq_pagination_filter, api_response) + except ValueError: + _LOGGER.info(f"Could not extract pagination info with filter '{jq_pagination_filter}' on {api_response}") + return None diff --git a/toucan_connectors/snowflake/snowflake_connector.py b/toucan_connectors/snowflake/snowflake_connector.py index 65900666e..ed7b5d389 100644 --- a/toucan_connectors/snowflake/snowflake_connector.py +++ b/toucan_connectors/snowflake/snowflake_connector.py @@ -7,7 +7,7 @@ from pydantic import Field, create_model from pydantic.json_schema import DEFAULT_REF_TEMPLATE, GenerateJsonSchema, JsonSchemaMode -from toucan_connectors.common import ConnectorStatus +from toucan_connectors.common import UI_HIDDEN, ConnectorStatus from toucan_connectors.pagination import build_pagination_info from toucan_connectors.sql_query_helper import SqlQueryHelper from toucan_connectors.toucan_connector import ( @@ -49,9 +49,6 @@ CONNECTOR_OK = False -_UI_HIDDEN: dict[str, Any] = {"ui.hidden": True} - - class SnowflakeDataSource(ToucanDataSource["SnowflakeConnector"]): database: str = Field(..., description="The name of the database you want to query") warehouse: str | None = Field(None, description="The name of the warehouse you want to query") @@ -63,16 +60,16 @@ class SnowflakeDataSource(ToucanDataSource["SnowflakeConnector"]): widget="sql", # type:ignore[call-arg] ) - # Pydantic sees **_UI_HIDDEN as the third argument (the default factory) and raises an error + # Pydantic sees **UI_HIDDEN as the third argument (the default factory) and raises an error query_object: dict | None = Field( # type: ignore[pydantic-field] None, description="An object describing a simple select query" "For example " '{"schema": "SHOW_SCHEMA", "table": "MY_TABLE", "columns": ["col1", "col2"]}' "This field is used internally", - **_UI_HIDDEN, + **UI_HIDDEN, ) - language: str = Field("sql", **_UI_HIDDEN) # type: ignore[pydantic-field] + language: str = Field("sql", **UI_HIDDEN) # type: ignore[pydantic-field] @classmethod def _get_databases(cls, connector: "SnowflakeConnector"): diff --git a/toucan_connectors/toucan_connector.py b/toucan_connectors/toucan_connector.py index 71b5bcea6..5aaafd979 100644 --- a/toucan_connectors/toucan_connector.py +++ b/toucan_connectors/toucan_connector.py @@ -14,6 +14,7 @@ from pydantic.fields import ModelPrivateAttr from toucan_connectors.common import ( + UI_HIDDEN, ConnectorStatus, apply_query_parameters, nosql_apply_parameters_to_query, @@ -263,8 +264,6 @@ def get_connector_secrets_form(cls) -> ConnectorSecretsForm | None: return None -_UI_HIDDEN: dict[str, Any] = {"ui.hidden": True} - DS = TypeVar("DS", bound=ToucanDataSource) PlainJsonSecretStr = Annotated[ @@ -301,7 +300,7 @@ def __init_subclass__(cls, /, *, data_source_model: type[DS]): retry_policy: RetryPolicy | None = RetryPolicy() _retry_on: Iterable[Type[BaseException]] = () type: str | None = None - secrets_storage_version: str = Field("1", **_UI_HIDDEN) # type:ignore[pydantic-field] + secrets_storage_version: str = Field("1", **UI_HIDDEN) # type:ignore[pydantic-field] # Default ttl for all connector's queries (overridable at the data_source level) # /!\ cache ttl is used by the caching system which is not implemented in toucan_connectors. @@ -312,7 +311,7 @@ def __init_subclass__(cls, /, *, data_source_model: type[DS]): ) # Used to defined the connection - identifier: str | None = Field(None, **_UI_HIDDEN) # type:ignore[pydantic-field] + identifier: str | None = Field(None, **UI_HIDDEN) # type:ignore[pydantic-field] model_config = ConfigDict(extra="forbid", validate_assignment=True) @property