From 45c6c61c75de67124870933e855b45859188dbfa Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 14:23:43 +0200 Subject: [PATCH 01/29] Add an example for post paginators --- .../docs/general-usage/http/rest-client.md | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index 556dbfcac6..efeeb9e4f0 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -311,7 +311,7 @@ When working with APIs that use non-standard pagination schemes, or when you nee - `update_request(request: Request) -> None`: Before making the next API call in `RESTClient.paginate` method, `update_request` is used to modify the request with the necessary parameters to fetch the next page (based on the current state of the paginator). For example, you can add query parameters to the request, or modify the URL. -#### Example: creating a query parameter paginator +#### Example 1: creating a query parameter paginator Suppose an API uses query parameters for pagination, incrementing an page parameter for each subsequent page, without providing direct links to next pages in its responses. E.g. `https://api.example.com/posts?page=1`, `https://api.example.com/posts?page=2`, etc. Here's how you could implement a paginator for this scheme: @@ -354,6 +354,38 @@ def get_data(): yield page ``` +:::tip +[`PageNumberPaginator`](#pagenumberpaginator) that ships with dlt does the same thing, but with more flexibility and error handling. This example is meant to demonstrate how to implement a custom paginator. For most use cases, you should use the [built-in paginators](#paginators). +::: + +#### Example 2: creating a paginator for POST requests + +Some APIs use POST requests for pagination, where the next page is fetched by sending a POST request with a cursor or other parameters in the request body. This is frequently used in "search" API endpoints or other endpoints with big payloads. Here's how you could implement a paginator for a case like this: + +```py +from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.requests import Response, Request + +class PostBodyPaginator(BasePaginator): + def __init__(self): + super().__init__() + self.cursor = None + + def update_state(self, response: Response) -> None: + # Assuming the API returns an empty list when no more data is available + if not response.json(): + self._has_next_page = False + else: + self.cursor = response.json().get("cursor") + + def update_request(self, request: Request) -> None: + if request.json is None: + request.json = {} + + # Add the cursor to the request body + request.json["cursor"] = self.cursor +``` + ## Authentication The RESTClient supports various authentication strategies, such as bearer tokens, API keys, and HTTP basic auth, configured through the `auth` parameter of both the `RESTClient` and the `paginate()` method. From be03338a74f9ca1a238eed82c0034903b6b22333 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 14:26:26 +0200 Subject: [PATCH 02/29] update the example cursor key --- docs/website/docs/general-usage/http/rest-client.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index efeeb9e4f0..d3f4f379f1 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -376,7 +376,7 @@ class PostBodyPaginator(BasePaginator): if not response.json(): self._has_next_page = False else: - self.cursor = response.json().get("cursor") + self.cursor = response.json().get("next_page_cursor") def update_request(self, request: Request) -> None: if request.json is None: From 7f3f0f0808363fc0df3651555a0bb1cdd3c994b9 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 14:43:31 +0200 Subject: [PATCH 03/29] Update docs/website/docs/general-usage/http/rest-client.md Co-authored-by: VioletM --- docs/website/docs/general-usage/http/rest-client.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index d3f4f379f1..fea8c189de 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -384,6 +384,16 @@ class PostBodyPaginator(BasePaginator): # Add the cursor to the request body request.json["cursor"] = self.cursor + +client = RESTClient( + base_url="https://api.example.com", + paginator= PostBodyPaginator() +) + +@dlt.resource +def get_data(): + for page in client.paginate("/data"): + yield page ``` ## Authentication From c8f413d73d6d360069481028e012afe9ef424a00 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 14:46:01 +0200 Subject: [PATCH 04/29] Update docs/website/docs/general-usage/http/rest-client.md Co-authored-by: VioletM --- docs/website/docs/general-usage/http/rest-client.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index fea8c189de..c32a864759 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -364,6 +364,7 @@ Some APIs use POST requests for pagination, where the next page is fetched by se ```py from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.rest_client import RESTClient from dlt.sources.helpers.requests import Response, Request class PostBodyPaginator(BasePaginator): From 4c7fee7736ec703e4b8287ef27e8fb4d06124a70 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 14:48:36 +0200 Subject: [PATCH 05/29] Update docs/website/docs/general-usage/http/rest-client.md --- docs/website/docs/general-usage/http/rest-client.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index c32a864759..ca39046d35 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -388,7 +388,7 @@ class PostBodyPaginator(BasePaginator): client = RESTClient( base_url="https://api.example.com", - paginator= PostBodyPaginator() + paginator=PostBodyPaginator() ) @dlt.resource From c825759e07dc44e7e704c296a759f6438fd4b713 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 May 2024 17:05:00 +0200 Subject: [PATCH 06/29] Add rest_api verified source documentation (#1308) * Add rest_api source docs * Expand rest_api documentation * Update snippets * Update string aliases * Link dlt source * Reordered code in the example and added a new section * Mention auto detection * Reorder the sentence about paginator types and instances * Elaborate on dependent resources; link the transformer docs * Link incremental loading * Update the example to use rest_api_resources * Rename github_config --- .../verified-sources/rest_api.md | 578 ++++++++++++++++++ docs/website/sidebars.js | 1 + 2 files changed, 579 insertions(+) create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md new file mode 100644 index 0000000000..1f79055d06 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -0,0 +1,578 @@ +--- +title: REST API generic source +description: dlt verified source for REST APIs +keywords: [rest api, restful api] +--- +import Header from './_source-info-header.md'; + +
+ +This is a generic dlt source you can use to extract data from any REST API. It uses [declarative configuration](#source-configuration) to define the API endpoints, their [relationships](#define-resource-relationships), how to handle [pagination](#pagination), and [authentication](#authentication). + +## Setup guide + +### Initialize the verified source + +Enter the following command in your terminal: + +```sh +dlt init rest_api duckdb +``` + +[dlt init](../../reference/command-line-interface) will initialize the pipeline examples for REST API as the [source](../../general-usage/source) and [duckdb](../destinations/duckdb.md) as the [destination](../destinations). + +Running `dlt init` creates the following in the current folder: +- `rest_api_pipeline.py` file with a sample pipelines definition: + - GitHub API example + - Pokemon API example +- `.dlt` folder with: + - `secrets.toml` file to store your access tokens and other sensitive information + - `config.toml` file to store the configuration settings +- `requirements.txt` file with the required dependencies + +Change the REST API source to your needs by modifying the `rest_api_pipeline.py` file. See the detailed [source configuration](#source-configuration) section below. + +:::note +For the rest of the guide, we will use the [GitHub API](https://docs.github.com/en/rest?apiVersion=2022-11-28) and [Pokemon API](https://pokeapi.co/) as example sources. +::: + +This source is based on the [RESTClient class](../../general-usage/http/rest-client.md). + +### Add credentials + +In the `.dlt` folder, you'll find a file called `secrets.toml`, where you can securely store your access tokens and other sensitive information. It's important to handle this file with care and keep it safe. + +The GitHub API [requires an access token](https://docs.github.com/en/rest/authentication/authenticating-to-the-rest-api?apiVersion=2022-11-28) to access some of its endpoints and to increase the rate limit for the API calls. To get a GitHub token, follow the GitHub documentation on [managing your personal access tokens](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens). + +After you get the token, add it to the `secrets.toml` file: + +```toml +[sources.rest_api.github] +github_token = "your_github_token" +``` + +## Run the pipeline + +1. Install the required dependencies by running the following command: + + ```sh + pip install -r requirements.txt + ``` + +2. Run the pipeline: + + ```sh + python rest_api_pipeline.py + ``` + +3. Verify that everything loaded correctly by using the following command: + + ```sh + dlt pipeline rest_api show + ``` + +## Source configuration + +### Quick example + +Let's take a look at the GitHub example in `rest_api_pipeline.py` file: + +```py +from rest_api import RESTAPIConfig, rest_api_resources + +@dlt.source +def github_source(github_token=dlt.secrets.value): + config: RESTAPIConfig = { + "client": { + "base_url": "https://api.github.com/repos/dlt-hub/dlt/", + "auth": { + "token": github_token, + }, + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": "merge", + "endpoint": { + "params": { + "per_page": 100, + }, + }, + }, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "issues", + "params": { + "sort": "updated", + "direction": "desc", + "state": "open", + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + }, + }, + }, + { + "name": "issue_comments", + "endpoint": { + "path": "issues/{issue_number}/comments", + "params": { + "issue_number": { + "type": "resolve", + "resource": "issues", + "field": "number", + } + }, + }, + "include_from_parent": ["id"], + }, + ], + } + + yield from rest_api_resources(config) + +def load_github() -> None: + pipeline = dlt.pipeline( + pipeline_name="rest_api_github", + destination="duckdb", + dataset_name="rest_api_data", + ) + + load_info = pipeline.run(github_source()) + print(load_info) +``` + +The declarative resource configuration is defined in the `config` dictionary. It contains the following key components: + +1. `client`: Defines the base URL and authentication method for the API. In this case it uses token-based authentication. The token is stored in the `secrets.toml` file. + +2. `resource_defaults`: Contains default settings for all [resources](#resource-configuration). In this example, we define that all resources: + - Have `id` as the [primary key](../../general-usage/resource#define-schema) + - Use the `merge` [write disposition](../../general-usage/incremental-loading#choosing-a-write-disposition) to merge the data with the existing data in the destination. + - Send a `per_page` query parameter with each request to 100 to get more results per page. + +3. `resources`: A list of [resources](#resource-configuration) to be loaded. Here, we have two resources: `issues` and `issue_comments`, which correspond to the GitHub API endpoints for [repository issues](https://docs.github.com/en/rest/issues/issues?apiVersion=2022-11-28#list-repository-issues) and [issue comments](https://docs.github.com/en/rest/issues/comments?apiVersion=2022-11-28#list-issue-comments). Note that we need a in issue number to fetch comments for each issue. This number is taken from the `issues` resource. More on this in the [resource relationships](#define-resource-relationships) section. + +Let's break down the configuration in more detail. + +### Configuration structure + +:::tip +Import the `RESTAPIConfig` type from the `rest_api` module to have convenient hints in your editor/IDE and use it to define the configuration object. + +```py +from rest_api import RESTAPIConfig +``` +::: + + +The configuration object passed to the REST API Generic Source has three main elements: + +```py +config: RESTAPIConfig = { + "client": { + ... + }, + "resource_defaults": { + ... + }, + "resources": [ + ... + ], +} +``` + +#### `client` + +`client` contains the configuration to connect to the API's endpoints. It includes the following fields: + +- `base_url` (str): The base URL of the API. This string is prepended to all endpoint paths. For example, if the base URL is `https://api.example.com/v1/`, and the endpoint path is `users`, the full URL will be `https://api.example.com/v1/users`. +- `headers` (dict, optional): Additional headers to be sent with each request. +- `auth` (optional): Authentication configuration. It can be a simple token, a `AuthConfigBase` object, or a more complex authentication method. +- `paginator` (optional): Configuration for the default pagination to be used for resources that support pagination. See the [pagination](#pagination) section for more details. + +#### `resource_defaults` (optional) + +`resource_defaults` contains the default values to [configure the dlt resources](#resource-configuration). This configuration is applied to all resources unless overridden by the resource-specific configuration. + +For example, you can set the primary key, write disposition, and other default settings here: + +```py +config = { + "client": { + ... + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": "merge", + "endpoint": { + "params": { + "per_page": 100, + }, + }, + }, + "resources": [ + "resource1", + "resource2": { + "name": "resource2_name", + "write_disposition": "append", + "endpoint": { + "params": { + "param1": "value1", + }, + }, + }, + ], +} +``` + +Above, all resources will have `primary_key` set to `id`, `resource1` will have `write_disposition` set to `merge`, and `resource2` will override the default `write_disposition` with `append`. +Both `resource1` and `resource2` will have the `per_page` parameter set to 100. + +#### `resources` + +This is a list of resource configurations that define the API endpoints to be loaded. Each resource configuration can be: +- a dictionary with the [resource configuration](#resource-configuration). +- a string. In this case, the string is used as the both as the endpoint path and the resource name, and the resource configuration is taken from the `resource_defaults` configuration if it exists. + +### Resource configuration + +A resource configuration is used to define a [dlt resource](../../general-usage/resource.md) for the data to be loaded from an API endpoint. It contains the following key fields: + +- `endpoint`: The endpoint configuration for the resource. It can be a string or a dict representing the endpoint settings. See the [endpoint configuration](#endpoint-configuration) section for more details. +- `write_disposition`: The write disposition for the resource. +- `primary_key`: The primary key for the resource. +- `include_from_parent`: A list of fields from the parent resource to be included in the resource output. See the [resource relationships](#include-fields-from-the-parent-resource) section for more details. +- `selected`: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource. + +You can also pass additional resource parameters that will be used to configure the dlt resource. See [dlt resource API reference](../../api_reference/extract/decorators.md#resource) for more details. + +### Endpoint configuration + +The endpoint configuration defines how to query the API endpoint. Quick example: + +```py +{ + "path": "issues", + "method": "GET", + "params": { + "sort": "updated", + "direction": "desc", + "state": "open", + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + }, + "data_selector": "results", +} +``` + +The fields in the endpoint configuration are: + +- `path`: The path to the API endpoint. +- `method`: The HTTP method to be used. Default is `GET`. +- `params`: Query parameters to be sent with each request. For example, `sort` to order the results or `since` to specify [incremental loading](#incremental-loading). This is also used to define [resource relationships](#define-resource-relationships). +- `json`: The JSON payload to be sent with the request (for POST and PUT requests). +- `paginator`: Pagination configuration for the endpoint. See the [pagination](#pagination) section for more details. +- `data_selector`: A JSONPath to select the data from the response. See the [data selection](#data-selection) section for more details. +- `response_actions`: A list of actions that define how to process the response data. +- `incremental`: Configuration for [incremental loading](#incremental-loading). + +### Pagination + +The REST API source will try to automatically handle pagination for you. This works by detecting the pagination details from the first API response. + +In some special cases, you may need to specify the pagination configuration explicitly. + +:::note +Currently pagination is supported only for GET requests. To handle POST requests with pagination, you need to implement a [custom paginator](../../general-usage/http/rest-client.md#custom-paginator). +::: + +These are the available paginators: + +| Paginator class | String Alias (`type`) | Description | +| -------------- | ------------ | ----------- | +| [JSONResponsePaginator](../../general-usage/http/rest-client.md#jsonresponsepaginator) | `json_response` | The links to the next page are in the body (JSON) of the response. | +| [HeaderLinkPaginator](../../general-usage/http/rest-client.md#headerlinkpaginator) | `header_link` | The links to the next page are in the response headers. | +| [OffsetPaginator](../../general-usage/http/rest-client.md#offsetpaginator) | `offset` | The pagination is based on an offset parameter. With total items count either in the response body or explicitly provided. | +| [PageNumberPaginator](../../general-usage/http/rest-client.md#pagenumberpaginator) | `page_number` | The pagination is based on a page number parameter. With total pages count either in the response body or explicitly provided. | +| [JSONCursorPaginator](../../general-usage/http/rest-client.md#jsonresponsecursorpaginator) | `cursor` | The pagination is based on a cursor parameter. The value of the cursor is in the response body (JSON). | +| SinglePagePaginator | `single_page` | The response will be interpreted as a single-page response, ignoring possible pagination metadata. | +| `None` | `auto` | Explicitly specify that the source should automatically detect the pagination method. | + +To specify the pagination configuration, use the `paginator` field in the [client](#client) or [endpoint](#endpoint-configuration) configurations. You may either use a dictionary with a string alias in the `type` field along with the required parameters, or use the paginator instance directly: + +```py +{ + ... + "paginator": { + "type": "json_links", + "next_url_path": "paging.next", + } +} +``` + +Or using the paginator instance: + +```py +{ + ... + "paginator": JSONResponsePaginator( + next_url_path="paging.next" + ), +} +``` + +This is useful when you're [implementing and using a custom paginator](../../general-usage/http/rest-client.md#custom-paginator). + +### Data selection + +The `data_selector` field in the endpoint configuration allows you to specify a JSONPath to select the data from the response. By default, the source will try to detect locations of the data automatically. + +Use this field when you need to specify the location of the data in the response explicitly. + +For example, if the API response looks like this: + +```json +{ + "posts": [ + {"id": 1, "title": "Post 1"}, + {"id": 2, "title": "Post 2"}, + {"id": 3, "title": "Post 3"} + ] +} +``` + +You can use the following endpoint configuration: + +```py +{ + "path": "posts", + "data_selector": "posts", +} +``` + +For a nested structure like this: + +```json +{ + "results": { + "posts": [ + {"id": 1, "title": "Post 1"}, + {"id": 2, "title": "Post 2"}, + {"id": 3, "title": "Post 3"} + ] + } +} +``` + +You can use the following endpoint configuration: + +```py +{ + "path": "posts", + "data_selector": "results.posts", +} +``` + +Read more about [JSONPath syntax](https://github.com/h2non/jsonpath-ng?tab=readme-ov-file#jsonpath-syntax) to learn how to write selectors. + + +### Authentication + +Many APIs require authentication to access their endpoints. The REST API source supports various authentication methods, such as token-based, query parameters, basic auth, etc. + +#### Quick example + +One of the most common method is token-based authentication. To authenticate with a token, you can use the `token` field in the `auth` configuration: + +```py +{ + "client": { + ... + "auth": { + "token": dlt.secrets["your_api_token"], + }, + ... + }, +} +``` + +:::warning +Make sure to store your access tokens and other sensitive information in the `secrets.toml` file and never commit it to the version control system. +::: + +Available authentication types: + +| Authentication class | String Alias (`type`) | Description | +| ------------------- | ----------- | ----------- | +| [BearTokenAuth](../../general-usage/http/rest-client.md#bearer-token-authentication) | `bearer` | Bearer token authentication. | +| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `api_key` | Basic HTTP authentication. | +| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `http_basic` | API key authentication with key defined in the query parameters or in the headers. | + +To specify the authentication configuration, use the `auth` field in the [client](#client) configuration: + +```py +{ + "client": { + "auth": { + "type": "bearer", + "token": dlt.secrets["your_api_token"], + }, + ... + }, +} +``` + +Alternatively, you can use the authentication class directly: + +```py +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +config = { + "client": { + "auth": BearTokenAuth(dlt.secrets["your_api_token"]), + }, + ... +} +``` + +### Define resource relationships + +When you have a resource that depends on another resource, you can define the relationship using the `resolve` configuration. With it you link a path parameter in the child resource to a field in the parent resource's data. + +In the GitHub example, the `issue_comments` resource depends on the `issues` resource. The `issue_number` parameter in the `issue_comments` endpoint configuration is resolved from the `number` field of the `issues` resource: + +```py +{ + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "issues", + ... + }, + }, + { + "name": "issue_comments", + "endpoint": { + "path": "issues/{issue_number}/comments", + "params": { + "issue_number": { + "type": "resolve", + "resource": "issues", + "field": "number", + } + }, + }, + "include_from_parent": ["id"], + }, + ], +} +``` + +This configuration tells the source to get issue numbers from the `issues` resource and use them to fetch comments for each issue. So if the `issues` resource yields the following data: + +```json +[ + {"id": 1, "number": 123}, + {"id": 2, "number": 124}, + {"id": 3, "number": 125} +] +``` + +The `issue_comments` resource will make requests to the following endpoints: + +- `issues/123/comments` +- `issues/124/comments` +- `issues/125/comments` + +The syntax for the `resolve` field in parameter configuration is: + +```py +"": { + "type": "resolve", + "resource": "", + "field": "", +} +``` + +Under the hood, dlt handles this by using a [transformer resource](../../general-usage/resource.md#process-resources-with-dlttransformer). + +#### Include fields from the parent resource + +You can include data from the parent resource in the child resource by using the `include_from_parent` field in the resource configuration. For example: + +```py +{ + "name": "issue_comments", + "endpoint": { + ... + }, + "include_from_parent": ["id", "title", "created_at"], +} +``` + +This will include the `id`, `title`, and `created_at` fields from the `issues` resource in the `issue_comments` resource data. The name of the included fields will be prefixed with the parent resource name and an underscore (`_`) like so: `_issues_id`, `_issues_title`, `_issues_created_at`. + +## Incremental loading + +Some APIs provide a way to fetch only new or changed data (most often by using a timestamp field like `updated_at`, `created_at`, or incremental IDs). +This is called [incremental loading](../../general-usage/incremental-loading.md) and is very useful as it allows you to reduce the load time and the amount of data transferred. + +When the API endpoint supports incremental loading, you can configure the source to load only the new or changed data using these two methods: + +1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration): + + ```py + "": { + "type": "incremental", + "cursor_path": "", + "initial_value": "", + }, + ``` + + For example, in the `issues` resource configuration in the GitHub example, we have: + + ```py + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + ``` + + This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests. + +2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration): + + ```py + "incremental": { + "start_param": "", + "end_param": "", + "cursor_path": "", + "initial_value": "", + "end_value": "", + }, + ``` + + This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading. + +See the [incremental loading](../../general-usage/incremental-loading.md#incremental-loading-with-a-cursor-field) guide for more details. + +## Advanced configuration + +`rest_api_source()` function creates the [dlt source](../../general-usage/source.md) and lets you configure the following parameters: + +- `config`: The REST API configuration dictionary. +- `name`: An optional name for the source. +- `section`: An optional section name in the configuration file. +- `max_table_nesting`: Sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON. +- `root_key` (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False. +- `schema_contract`: Schema contract settings that will be applied to this resource. +- `spec`: A specification of configuration and secret values required by the source. diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 728c3b6593..a3fe12c8fb 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -83,6 +83,7 @@ const sidebars = { 'dlt-ecosystem/verified-sources/notion', 'dlt-ecosystem/verified-sources/personio', 'dlt-ecosystem/verified-sources/pipedrive', + 'dlt-ecosystem/verified-sources/rest_api', 'dlt-ecosystem/verified-sources/salesforce', 'dlt-ecosystem/verified-sources/scrapy', 'dlt-ecosystem/verified-sources/shopify', From 511df6ee6b97bcff8e605b1bac0098814faf2dfe Mon Sep 17 00:00:00 2001 From: Maxime Lemaitre Date: Thu, 16 May 2024 10:46:59 +0200 Subject: [PATCH 07/29] Fix typo in Slack Docs (#1369) --- docs/website/docs/dlt-ecosystem/verified-sources/slack.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md index 970a891e60..38eda15c94 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps: [This command](../../reference/command-line-interface) will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py) - with Google Sheets as the [source](../../general-usage/source) and + with Slack as the [source](../../general-usage/source) and [duckdb](../destinations/duckdb.md) as the [destination](../destinations). 1. If you'd like to use a different destination, simply replace `duckdb` with the name of your From 80e78204d6a81e3c02cdd481a5caa50e30c88bf6 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Thu, 16 May 2024 10:52:25 +0200 Subject: [PATCH 08/29] Add the troubleshooting section (#1367) --- .../docs/general-usage/http/rest-client.md | 70 ++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index ca39046d35..481670ae4b 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -385,7 +385,7 @@ class PostBodyPaginator(BasePaginator): # Add the cursor to the request body request.json["cursor"] = self.cursor - + client = RESTClient( base_url="https://api.example.com", paginator=PostBodyPaginator() @@ -527,4 +527,70 @@ from dlt.sources.helpers.rest_client import paginate for page in paginate("https://api.example.com/posts"): print(page) -``` \ No newline at end of file +``` + +## Troubleshooting + +### `RESTClient.get()` and `RESTClient.post()` methods + +These methods work similarly to the [get()](https://docs.python-requests.org/en/latest/api/#requests.get) and [post()](https://docs.python-requests.org/en/latest/api/#requests.post) functions +from the Requests library. They return a [Response](https://docs.python-requests.org/en/latest/api/#requests.Response) object that contains the response data. +You can inspect the `Response` object to get the `response.status_code`, `response.headers`, and `response.content`. For example: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +client = RESTClient(base_url="https://api.example.com") +response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) + +print(response.status_code) +print(response.headers) +print(response.content) +``` + +### `RESTClient.paginate()` + +Debugging `paginate()` is trickier because it's a generator function that yields [`PageData`](#pagedata) objects. Here's several ways to debug the `paginate()` method: + +1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests: + +```bash +RUNTIME__LOG_LEVEL=INFO python my_script.py +``` + +2. Use the [`PageData`](#pagedata) instance to inspect the [request](https://docs.python-requests.org/en/latest/api/#requests.Request) +and [response](https://docs.python-requests.org/en/latest/api/#requests.Response) objects: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator + +client = RESTClient( + base_url="https://api.example.com", + paginator=JSONResponsePaginator(next_url_path="pagination.next") +) + +for page in client.paginate("/posts"): + print(page.request) + print(page.response) +``` + +3. Use the `hooks` parameter to add custom response handlers to the `paginate()` method: + +```py +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +def response_hook(response, **kwargs): + print(response.status_code) + print(f"Content: {response.content}") + print(f"Request: {response.request.body}") + # Or import pdb; pdb.set_trace() to debug + +for page in client.paginate( + "/posts", + auth=BearerTokenAuth(token="your_access_token") + hooks={"response": [response_hook]} +): + print(page) +``` From 314e7a026619c5fc793ca7408581baa2d71d7e13 Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Thu, 16 May 2024 14:53:53 +0200 Subject: [PATCH 09/29] Replace weather api example with github in create a pipeline walkthrough (#1351) Co-authored-by: AstrakhantsevaAA Co-authored-by: Anton Burnashev --- .../docs/walkthroughs/create-a-pipeline.md | 142 +++++++++++------- 1 file changed, 85 insertions(+), 57 deletions(-) diff --git a/docs/website/docs/walkthroughs/create-a-pipeline.md b/docs/website/docs/walkthroughs/create-a-pipeline.md index 1d5974efbe..bba78dc6cb 100644 --- a/docs/website/docs/walkthroughs/create-a-pipeline.md +++ b/docs/website/docs/walkthroughs/create-a-pipeline.md @@ -1,31 +1,46 @@ --- title: Create a pipeline description: How to create a pipeline -keywords: [how to, create a pipeline] +keywords: [how to, create a pipeline, rest client] --- # Create a pipeline -Follow the steps below to create a [pipeline](../general-usage/glossary.md#pipeline) from the -WeatherAPI.com API to DuckDB from scratch. The same steps can be repeated for any source and -destination of your choice—use `dlt init ` and then build the pipeline for -that API instead. +This guide walks you through creating a pipeline that uses our [REST API Client](../general-usage/http/rest-client) +to connect to [DuckDB](../dlt-ecosystem/destinations/duckdb). +:::tip +We're using DuckDB as a destination here, but you can adapt the steps to any [source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/) and [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) by +using the [command](../reference/command-line-interface#dlt-init) `dlt init ` and tweaking the pipeline accordingly. +::: -Please make sure you have [installed `dlt`](../reference/installation.md) before following the +Please make sure you have [installed `dlt`](../reference/installation) before following the steps below. +## Task overview + +Imagine you want to analyze issues from a GitHub project locally. +To achieve this, you need to write code that accomplishes the following: + +1. Constructs a correct request. +2. Authenticates your request. +3. Fetches and handles paginated issue data. +4. Stores the data for analysis. + +This may sound complicated, but dlt provides a [REST API Client](../general-usage/http/rest-client) that allows you to focus more on your data rather than on managing API interactions. + + ## 1. Initialize project Create a new empty directory for your `dlt` project by running: ```sh -mkdir weatherapi_duckdb && cd weatherapi_duckdb +mkdir github_api_duckdb && cd github_api_duckdb ``` Start a `dlt` project with a pipeline template that loads data to DuckDB by running: ```sh -dlt init weatherapi duckdb +dlt init github_api duckdb ``` Install the dependencies necessary for DuckDB: @@ -34,114 +49,127 @@ Install the dependencies necessary for DuckDB: pip install -r requirements.txt ``` -## 2. Add WeatherAPI.com API credentials +## 2. Obtain and add API credentials from GitHub -You will need to [sign up for the WeatherAPI.com API](https://www.weatherapi.com/signup.aspx). +You will need to [sign in](https://github.com/login) to your GitHub account and create your access token via [Personal access tokens page](https://github.com/settings/tokens). -Once you do this, you should see your `API Key` at the top of your -[user page](https://www.weatherapi.com/my/). - -Copy the value of the API key into `.dlt/secrets.toml`: +Copy your new access token over to `.dlt/secrets.toml`: ```toml [sources] api_secret_key = '' ``` -The **secret name** corresponds to the **argument name** in the source function. Below `api_secret_key` [will get its value](../general-usage/credentials/configuration.md#general-usage-and-an-example) from `secrets.toml` when `weatherapi_source()` is called. + +This token will be used by `github_api_source()` to authenticate requests. + +The **secret name** corresponds to the **argument name** in the source function. +Below `api_secret_key` [will get its value](../general-usage/credentials/configuration#allow-dlt-to-pass-the-config-and-secrets-automatically) +from `secrets.toml` when `github_api_source()` is called. + ```py @dlt.source -def weatherapi_source(api_secret_key=dlt.secrets.value): - ... +def github_api_source(api_secret_key: str = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) ``` -Run the `weatherapi.py` pipeline script to test that authentication headers look fine: +Run the `github_api.py` pipeline script to test that authentication headers look fine: ```sh -python3 weatherapi.py +python github_api.py ``` Your API key should be printed out to stdout along with some test data. -## 3. Request data from the WeatherAPI.com API +## 3. Request project issues from then GitHub API -Replace the definition of the `weatherapi_resource` function definition in the `weatherapi.py` -pipeline script with a call to the WeatherAPI.com API: -```py -@dlt.resource(write_disposition="append") -def weatherapi_resource(api_secret_key=dlt.secrets.value): - url = "https://api.weatherapi.com/v1/current.json" - params = { - "q": "NYC", - "key": api_secret_key - } - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() -``` +:::tip +We will use `dlt` repository as an example GitHub project https://github.com/dlt-hub/dlt, feel free to replace it with your own repository. +::: -Run the `weatherapi.py` pipeline script to test that the API call works: +Modify `github_api_resource` in `github_api.py` to request issues data from your GitHub project's API: -```sh -python3 weatherapi.py +```py +from dlt.sources.helpers.rest_client import paginate +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: str = dlt.secrets.value): + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + for page in paginate( + url, + auth=BearerTokenAuth(api_secret_key), + paginator=HeaderLinkPaginator(), + params={"state": "open"} + ): + yield page ``` -This should print out the weather in New York City right now. - ## 4. Load the data -Remove the `exit()` call from the `main` function in `weatherapi.py`, so that running the -`python3 weatherapi.py` command will now also run the pipeline: +Uncomment the commented out code in `main` function in `github_api.py`, so that running the +`python github_api.py` command will now also run the pipeline: ```py if __name__=='__main__': - # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name='weatherapi', + pipeline_name='github_api_pipeline', destination='duckdb', - dataset_name='weatherapi_data' + dataset_name='github_api_data' ) # print credentials by running the resource - data = list(weatherapi_resource()) + data = list(github_api_resource()) # print the data yielded from resource print(data) # run the pipeline with your parameters - load_info = pipeline.run(weatherapi_source()) + load_info = pipeline.run(github_api_source()) # pretty print the information on data that was loaded print(load_info) ``` -Run the `weatherapi.py` pipeline script to load data into DuckDB: + +Run the `github_api.py` pipeline script to test that the API call works: ```sh -python3 weatherapi.py +python github_api.py ``` -Then this command to see that the data loaded: +This should print out JSON data containing the issues in the GitHub project. + +It also prints `load_info` object. + +Let's explore the loaded data with the [command](../reference/command-line-interface#show-tables-and-data-in-the-destination) `dlt pipeline show`. + +:::info +Make sure you have `streamlit` installed `pip install streamlit` +::: ```sh -dlt pipeline weatherapi show +dlt pipeline github_api_pipeline show ``` This will open a Streamlit app that gives you an overview of the data loaded. ## 5. Next steps -Now that you have a working pipeline, you have options for what to learn next: +With a functioning pipeline, consider exploring: +- Our [REST Client](../general-usage/http/rest-client). - [Deploy this pipeline with GitHub Actions](deploy-a-pipeline/deploy-with-github-actions), so that the data is automatically loaded on a schedule. - Transform the [loaded data](../dlt-ecosystem/transformations) with dbt or in Pandas DataFrames. -- Learn how to [run](../running-in-production/running.md), - [monitor](../running-in-production/monitoring.md), and - [alert](../running-in-production/alerting.md) when you put your pipeline in production. +- Learn how to [run](../running-in-production/running), + [monitor](../running-in-production/monitoring), and + [alert](../running-in-production/alerting) when you put your pipeline in production. - Try loading data to a different destination like - [Google BigQuery](../dlt-ecosystem/destinations/bigquery.md), - [Amazon Redshift](../dlt-ecosystem/destinations/redshift.md), or - [Postgres](../dlt-ecosystem/destinations/postgres.md). + [Google BigQuery](../dlt-ecosystem/destinations/bigquery), + [Amazon Redshift](../dlt-ecosystem/destinations/redshift), or + [Postgres](../dlt-ecosystem/destinations/postgres). From ca154015d8cdde3d0a0922389839463467d50ee9 Mon Sep 17 00:00:00 2001 From: Ilya Gurov Date: Thu, 16 May 2024 20:25:46 +0400 Subject: [PATCH 10/29] feat(pipeline): add an ability to auto truncate (#1292) * feat(pipeline): add an ability to auto truncate staging destination after load * lint fix * fix typo * improve tests * truncate dataset * do truncation after all the load is finished * fix the test, which already expects warnings * add docs, tests * lint fix * lint fix * fixes * fix typo * delete excess comment * fix the test * additional conditions for assert * use qualified name * lint fix * lint fix * fix tests * fix the test * fix test * if staging is not used, don't test it * test fix for clickhouse * test fix * uses with_staging_dataset correctly --------- Co-authored-by: Marcin Rudolf --- dlt/load/configuration.py | 3 ++ dlt/load/load.py | 35 ++++++++++++++++++- dlt/pipeline/pipeline.py | 1 + .../docs/running-in-production/running.md | 6 ++++ .../airflow_tests/test_airflow_wrapper.py | 12 ++++++- tests/load/pipeline/test_pipelines.py | 17 +++++++++ tests/pipeline/test_pipeline.py | 32 ++++++++++++++++- 7 files changed, 103 insertions(+), 3 deletions(-) diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 97cf23fdfc..b3fc2fbcd4 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration): raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" _load_storage_config: LoadStorageConfiguration = None + # if set to `True`, the staging dataset will be + # truncated after loading the data + truncate_staging_dataset: bool = False def on_resolved(self) -> None: self.pool_type = "none" if self.workers == 1 else "thread" diff --git a/dlt/load/load.py b/dlt/load/load.py index 66ddb1c308..9d898bc54d 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -53,7 +53,7 @@ LoadClientUnsupportedWriteDisposition, LoadClientUnsupportedFileFormats, ) -from dlt.load.utils import get_completed_table_chain, init_client +from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]): @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) ) ): job_client.complete_load(load_id) + self._maybe_trancate_staging_dataset(schema, job_client) + self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: return TRunMetrics(False, len(self.load_storage.list_normalized_packages())) + def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None: + """ + Truncate the staging dataset if one used, + and configuration requests truncation. + + Args: + schema (Schema): Schema to use for the staging dataset. + job_client (JobClientBase): + Job client to use for the staging dataset. + """ + if not ( + isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset + ): + return + + data_tables = schema.data_table_names() + tables = _extend_tables_with_table_chain( + schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset + ) + + try: + with self.get_destination_client(schema) as client: + with client.with_staging_dataset(): # type: ignore + client.initialize_storage(truncate_tables=tables) + + except Exception as exc: + logger.warn( + f"Staging dataset truncate failed due to the following error: {exc}" + " However, it didn't affect the data integrity." + ) + def get_step_info( self, pipeline: SupportsPipeline, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a2ea1936a9..53770f332d 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -554,6 +554,7 @@ def load( with signals.delayed_signals(): runner.run_pool(load_step.config, load_step) info: LoadInfo = self._get_step_info(load_step) + self.first_run = False return info except Exception as l_ex: diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index 253a27d942..9c52f58caa 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -108,6 +108,12 @@ behind. In `config.toml`: load.delete_completed_jobs=true ``` +Also, by default, `dlt` leaves data in staging dataset, used during merge and replace load for deduplication. In order to clear it, put the following line in `config.toml`: + +```toml +load.truncate_staging_dataset=true +``` + ## Using slack to send messages `dlt` provides basic support for sending slack messages. You can configure Slack incoming hook via diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 845800e47f..533d16c998 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -384,7 +384,17 @@ def dag_parallel(): with mock.patch("dlt.helpers.airflow_helper.logger.warn") as warn_mock: dag_def = dag_parallel() dag_def.test() - warn_mock.assert_called_once() + warn_mock.assert_has_calls( + [ + mock.call( + "The resource resource2 in task" + " mock_data_incremental_source_resource1-resource2 is using incremental loading" + " and may modify the state. Resources that modify the state should not run in" + " parallel within the single pipeline as the state will not be correctly" + " merged. Please use 'serialize' or 'parallel-isolated' modes instead." + ) + ] + ) def test_parallel_isolated_run(): diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a498b570a0..d98f335d16 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -10,6 +10,7 @@ from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME @@ -896,6 +897,7 @@ def test_pipeline_upfront_tables_two_loads( # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + os.environ["TRUNCATE_STAGING_DATASET"] = "True" pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", @@ -1001,6 +1003,21 @@ def table_3(make_data=False): is True ) + job_client, _ = pipeline._get_destination_clients(schema) + + if destination_config.staging and isinstance(job_client, WithStagingDataset): + for i in range(1, 4): + with pipeline.sql_client() as client: + table_name = f"table_{i}" + + if job_client.should_load_data_to_staging_dataset( + job_client.schema.tables[table_name] + ): + with client.with_staging_dataset(staging=True): + tab_name = client.make_qualified_table_name(table_name) + with client.execute_query(f"SELECT * FROM {tab_name}") as cur: + assert len(cur.fetchall()) == 0 + # @pytest.mark.skip(reason="Finalize the test: compare some_data values to values from database") # @pytest.mark.parametrize( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index a828de40fd..1c4383405b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,9 +5,9 @@ import logging import os import random +import threading from time import sleep from typing import Any, Tuple, cast -import threading from tenacity import retry_if_exception, Retrying, stop_after_attempt import pytest @@ -2230,3 +2230,33 @@ def stateful_resource(): assert len(fs_client.list_table_files("_dlt_loads")) == 2 assert len(fs_client.list_table_files("_dlt_version")) == 1 assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1 + + +@pytest.mark.parametrize("truncate", (True, False)) +def test_staging_dataset_truncate(truncate) -> None: + dlt.config["truncate_staging_dataset"] = truncate + + @dlt.resource(write_disposition="merge", merge_key="id") + def test_data(): + yield [{"field": 1, "id": 1}, {"field": 2, "id": 2}, {"field": 3, "id": 3}] + + pipeline = dlt.pipeline( + pipeline_name="test_staging_cleared", + destination="duckdb", + full_refresh=True, + ) + + info = pipeline.run(test_data, table_name="staging_cleared") + assert_load_info(info) + + with pipeline.sql_client() as client: + with client.execute_query( + f"SELECT * FROM {pipeline.dataset_name}_staging.staging_cleared" + ) as cur: + if truncate: + assert len(cur.fetchall()) == 0 + else: + assert len(cur.fetchall()) == 3 + + with client.execute_query(f"SELECT * FROM {pipeline.dataset_name}.staging_cleared") as cur: + assert len(cur.fetchall()) == 3 From 920d41a773879acd26c24445ee8fd127385c434f Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 16 May 2024 12:39:07 -0400 Subject: [PATCH 11/29] Add recommended_file_size cap to limit data writer file size (#1368) --- dlt/common/data_writers/buffered.py | 3 ++ dlt/common/destination/capabilities.py | 2 + dlt/destinations/impl/bigquery/__init__.py | 2 + tests/common/data_writers/utils.py | 6 +-- .../data_writers/test_buffered_writer.py | 38 ++++++++++++++++++- 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index fdd5b50111..bd32c68c49 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -55,7 +55,10 @@ def __init__( self.closed_files: List[DataWriterMetrics] = [] # all fully processed files # buffered items must be less than max items in file self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items) + # Explicitly configured max size supersedes destination limit self.file_max_bytes = file_max_bytes + if self.file_max_bytes is None and _caps: + self.file_max_bytes = _caps.recommended_file_size self.file_max_items = file_max_items # the open function is either gzip.open or open self.open = ( diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e74f5a980d..089b4a1d5e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + recommended_file_size: Optional[int] = None + """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None supported_staging_file_formats: Sequence[TLoaderFileFormat] = None escape_identifier: Callable[[str], str] = None diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index d33466ed5e..39322b43a0 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] + # BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size + caps.recommended_file_size = int(1024 * 1024 * 1024) caps.escape_identifier = escape_bigquery_identifier caps.escape_literal = None caps.format_datetime_literal = format_bigquery_datetime_literal diff --git a/tests/common/data_writers/utils.py b/tests/common/data_writers/utils.py index 2cb440bde1..e6e377b7d0 100644 --- a/tests/common/data_writers/utils.py +++ b/tests/common/data_writers/utils.py @@ -1,5 +1,5 @@ import os -from typing import Type +from typing import Type, Optional from dlt.common.data_writers.buffered import BufferedDataWriter from dlt.common.data_writers.writers import TWriter, ALL_WRITERS @@ -18,8 +18,8 @@ def get_writer( writer: Type[TWriter], buffer_max_items: int = 10, - file_max_items: int = 10, - file_max_bytes: int = None, + file_max_items: Optional[int] = 10, + file_max_bytes: Optional[int] = None, disable_compression: bool = False, caps: DestinationCapabilitiesContext = None, ) -> BufferedDataWriter[TWriter]: diff --git a/tests/extract/data_writers/test_buffered_writer.py b/tests/extract/data_writers/test_buffered_writer.py index 82b81a1cd7..b6da132de9 100644 --- a/tests/extract/data_writers/test_buffered_writer.py +++ b/tests/extract/data_writers/test_buffered_writer.py @@ -2,6 +2,7 @@ import pytest import time from typing import Iterator, Type +from uuid import uuid4 from dlt.common.data_writers.exceptions import BufferedDataWriterClosed from dlt.common.data_writers.writers import ( @@ -11,7 +12,7 @@ JsonlWriter, ALL_WRITERS, ) -from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.storages.file_storage import FileStorage @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data metrics = writer.import_file( "tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0) ) + + +@pytest.mark.parametrize( + "disable_compression", [True, False], ids=["no_compression", "compression"] +) +@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS) +def test_rotation_on_destination_caps_recommended_file_size( + disable_compression: bool, writer_type: Type[DataWriter] +) -> None: + caps = DestinationCapabilitiesContext.generic_capabilities() + caps.recommended_file_size = int(250 * 1024) + columns = {"id": new_column("id", "text")} + with get_writer( + writer_type, + disable_compression=disable_compression, + buffer_max_items=100, + file_max_items=None, + file_max_bytes=None, + caps=caps, + ) as writer: + for i in range(8): + # Data chunk approximately 40kb serialized + items = [{"id": str(uuid4())} for _ in range(1000)] + writer.write_data_item(items, columns) + if i < 5: + assert not writer.closed_files + + if i > 5: + # We should have written atleast 250kb by now and have rotated the file + assert len(writer.closed_files) == 1 + + # Check the files that were written are all within the recommended size + 1 chunk + assert len(writer.closed_files) == 2 + for file in writer.closed_files: + assert file.file_size < caps.recommended_file_size + 1024 * 50 From 5b0afa490112e842baa497583138bac3ce169699 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Thu, 16 May 2024 21:14:29 +0200 Subject: [PATCH 12/29] limits mssql query size to fit network buffer (#1372) --- dlt/destinations/impl/mssql/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/destinations/impl/mssql/__init__.py b/dlt/destinations/impl/mssql/__init__.py index e9d9fe24fd..f7768d9238 100644 --- a/dlt/destinations/impl/mssql/__init__.py +++ b/dlt/destinations/impl/mssql/__init__.py @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext: # https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN caps.max_identifier_length = 128 caps.max_column_identifier_length = 128 - caps.max_query_length = 4 * 1024 * 64 * 1024 + # A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet + caps.max_query_length = 65536 * 10 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False From cb38702398bd8ea76de1770acdc3561dc3a6bd29 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Fri, 17 May 2024 10:07:25 +0200 Subject: [PATCH 13/29] Link REST API generic source from the docs intro (#1376) --- docs/website/docs/intro.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 776329bcf4..0374802b7d 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -32,6 +32,10 @@ The library will create or update tables, infer data types, and handle nested da ]}> +:::tip +Looking to use a REST API as a source? Explore our new [REST API generic source](dlt-ecosystem/verified-sources/rest_api) for a declarative way to load data. +::: + From 359ec72c61628cb24b1e401a34f0528de1a657d3 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Fri, 17 May 2024 10:08:07 +0200 Subject: [PATCH 14/29] RESTClient: docs: Fixed snippet definition (#1373) --- docs/website/docs/general-usage/http/rest-client.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index 481670ae4b..3f29182044 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -555,7 +555,7 @@ Debugging `paginate()` is trickier because it's a generator function that yields 1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests: -```bash +```sh RUNTIME__LOG_LEVEL=INFO python my_script.py ``` @@ -589,7 +589,7 @@ def response_hook(response, **kwargs): for page in client.paginate( "/posts", - auth=BearerTokenAuth(token="your_access_token") + auth=BearerTokenAuth(token="your_access_token"), hooks={"response": [response_hook]} ): print(page) From e789093f94d6fea17e372bce3bae0533ee09ad9f Mon Sep 17 00:00:00 2001 From: rudolfix Date: Fri, 17 May 2024 20:06:41 +0200 Subject: [PATCH 15/29] allows to bubble up exceptions when standalone resource returns (#1374) --- dlt/extract/decorators.py | 15 ++++++--------- tests/extract/test_decorators.py | 11 +++++++++++ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index fac6391e01..9c4076cfa7 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -567,16 +567,13 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl: compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs), incremental, ) - except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex: + except InvalidResourceDataTypeFunctionNotAGenerator: # we allow an edge case: resource can return another resource - try: - # actually call the function to see if it contains DltResource - data_ = conf_f(*args, **kwargs) - if not isinstance(data_, DltResource): - raise - r = data_ # type: ignore[assignment] - except Exception: - raise gen_ex from None + # actually call the function to see if it contains DltResource + data_ = conf_f(*args, **kwargs) + if not isinstance(data_, DltResource): + raise + r = data_ # type: ignore[assignment] # consider transformer arguments bound r._args_bound = True # keep explicit args passed diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 5e85552d73..c6a675a8d3 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -880,6 +880,17 @@ def rv_resource(name: str): assert list(r) == [1, 2, 3] +def test_standalone_resource_returning_resource_exception() -> None: + @dlt.resource(standalone=True) + def rv_resource(uniq_name: str = dlt.config.value): + return dlt.resource([1, 2, 3], name=uniq_name, primary_key="value") + + # pass through of the exception in `rv_resource` when it returns, not yields + with pytest.raises(ConfigFieldMissingException) as conf_ex: + rv_resource() + assert conf_ex.value.fields == ["uniq_name"] + + def test_resource_rename_credentials_separation(): os.environ["SOURCES__TEST_DECORATORS__STANDALONE_SIGNATURE__SECRET_END"] = "5" assert list(standalone_signature(1)) == [1, 2, 3, 4] From 4c6f928c491fcb46b7c03a14d9f4fae4dd32c2c3 Mon Sep 17 00:00:00 2001 From: Daniel-Vetter-Coverwhale <120594412+Daniel-Vetter-Coverwhale@users.noreply.github.com> Date: Fri, 17 May 2024 15:31:51 -0400 Subject: [PATCH 16/29] fix: use .get on column in mssql destination for cases where the yaml does not contain the nullable property, like other sql destinations (#1380) --- dlt/destinations/impl/mssql/mssql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 8de15e2bd9..6f364c8af1 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -181,7 +181,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] From 7a996337021032bac4e74e5f659f2fdc45da5b64 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink <47451109+jorritsandbrink@users.noreply.github.com> Date: Mon, 20 May 2024 20:55:41 +0400 Subject: [PATCH 17/29] Make path tests Windows compatible (#1384) * make path tests windows compatible --------- Co-authored-by: Jorrit Sandbrink --- .../load/filesystem/test_filesystem_client.py | 51 ++++++++++--------- .../load/pipeline/test_filesystem_pipeline.py | 4 +- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index ca962adb16..4519f1ea83 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -1,6 +1,7 @@ import posixpath import os from unittest import mock +from pathlib import Path import pytest @@ -117,16 +118,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id1 = load_info layout = client.config.layout # this path will be kept after replace - job_2_load_1_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[1], - client.schema.name, - load_id1, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_2_load_1_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[1], + client.schema.name, + load_id1, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) with perform_load( @@ -135,16 +138,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id2 = load_info # this one we expect to be replaced with - job_1_load_2_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[0], - client.schema.name, - load_id2, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_1_load_2_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[0], + client.schema.name, + load_id2, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) # First file from load1 remains, second file is replaced by load2 @@ -159,7 +164,7 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) ls = set(paths) assert ls == {job_2_load_1_path, job_1_load_2_path} @@ -210,7 +215,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None ) for job in jobs2 ] - expected_files = sorted([posixpath.join(root_path, fn) for fn in expected_files]) + expected_files = sorted([Path(posixpath.join(root_path, fn)) for fn in expected_files]) # type: ignore[misc] paths = [] for basedir, _dirs, files in client.fs_client.walk( @@ -222,5 +227,5 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 7680bc6e90..5f24daf57f 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -301,7 +301,7 @@ def count(*args, **kwargs) -> Any: for file in files: if ".jsonl" in file: - expected_files.add(posixpath.join(basedir, file)) + expected_files.add(Path(posixpath.join(basedir, file))) for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] @@ -321,7 +321,7 @@ def count(*args, **kwargs) -> Any: full_path = posixpath.join(client.dataset_path, path) # type: ignore[attr-defined] assert client.fs_client.exists(full_path) # type: ignore[attr-defined] if ".jsonl" in full_path: - known_files.add(full_path) + known_files.add(Path(full_path)) assert expected_files == known_files assert known_files From db23c7123bec1f3b831b88314d354600b1bc8189 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 21 May 2024 12:31:04 +0200 Subject: [PATCH 18/29] Update destination-tables.md (#1386) --- docs/website/docs/general-usage/destination-tables.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 8e1f771e47..4d31b8440b 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -74,7 +74,8 @@ pipeline = dlt.pipeline( load_info = pipeline.run(users) ``` -The result will be the same, but the table is implicitly named `users` based on the resource name. +The result will be the same, note that we do not explicitly pass `table_name="users"` to `pipeline.run`, +and the table is implicitly named `users` based on the resource name (e.g. `users()` decorated with `@dlt.resource`). :::note From 5b1f5adb3de4572b8dd44305c8ffd38ae002611a Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 21 May 2024 13:18:41 +0200 Subject: [PATCH 19/29] Fix typos in docs: destination tables (#1389) --- .../docs/general-usage/destination-tables.md | 56 ++++++------------- 1 file changed, 18 insertions(+), 38 deletions(-) diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 4d31b8440b..4780d4be20 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -74,8 +74,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(users) ``` -The result will be the same, note that we do not explicitly pass `table_name="users"` to `pipeline.run`, -and the table is implicitly named `users` based on the resource name (e.g. `users()` decorated with `@dlt.resource`). +The result will be the same; note that we do not explicitly pass `table_name="users"` to `pipeline.run`, and the table is implicitly named `users` based on the resource name (e.g., `users()` decorated with `@dlt.resource`). :::note @@ -118,9 +117,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(data, table_name="users") ``` -Running this pipeline will create two tables in the destination, `users` and `users__pets`. The -`users` table will contain the top level data, and the `users__pets` table will contain the child -data. Here is what the tables may look like: +Running this pipeline will create two tables in the destination, `users` and `users__pets`. The `users` table will contain the top-level data, and the `users__pets` table will contain the child data. Here is what the tables may look like: **mydata.users** @@ -142,21 +139,14 @@ creating and linking children and parent tables. This is how it works: -1. Each row in all (top level and child) data tables created by `dlt` contains UNIQUE column named - `_dlt_id`. -1. Each child table contains FOREIGN KEY column `_dlt_parent_id` linking to a particular row - (`_dlt_id`) of a parent table. -1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in - `_dlt_list_idx`. -1. For tables that are loaded with the `merge` write disposition, we add a ROOT KEY column - `_dlt_root_id`, which links child table to a row in top level table. - +1. Each row in all (top level and child) data tables created by `dlt` contains a `UNIQUE` column named `_dlt_id`. +1. Each child table contains a `FOREIGN KEY` column `_dlt_parent_id` linking to a particular row (`_dlt_id`) of a parent table. +1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`. +1. For tables that are loaded with the `merge` write disposition, we add a root key column `_dlt_root_id`, which links the child table to a row in the top-level table. :::note -If you define your own primary key in a child table, it will be used to link to parent table -and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even in -case the primary key or other unique columns are defined. +If you define your own primary key in a child table, it will be used to link to the parent table, and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even if the primary key or other unique columns are defined. ::: @@ -165,17 +155,15 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. ### Variant columns -If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (ie json file) with a filed with name **answer** and your data contains boolean values, you will get get a column with name **answer** of type **BOOLEAN** in your destination. If for some reason, on next load you get integer value and string value in **answer**, the inconsistent data will go to **answer__v_bigint** and **answer__v_text** columns respectively. -The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of data type stored in the variant. - +If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (i.e., JSON file) with a field with name `answer` and your data contains boolean values, you will get a column with name `answer` of type `BOOLEAN` in your destination. If for some reason, on the next load, you get integer and string values in `answer`, the inconsistent data will go to `answer__v_bigint` and `answer__v_text` columns respectively. +The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of the data type stored in the variant. ## Load Packages and Load IDs Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from all the [resources](glossary.md#resource) of a particular [source](glossary.md#source). These packages are uniquely identified by a `load_id`. The `load_id` of a particular package is added to the top data tables -(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status 0 -(when the load process is fully completed). +(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status of 0 (when the load process is fully completed). To illustrate this, let's load more data into the same destination: @@ -190,8 +178,7 @@ data = [ ``` The rest of the pipeline definition remains the same. Running this pipeline will create a new load -package with a new `load_id` and add the data to the existing tables. The `users` table will now -look like this: +package with a new `load_id` and add the data to the existing tables. The `users` table will now look like this: **mydata.users** @@ -211,12 +198,12 @@ The `_dlt_loads` table will look like this: | **1234563456.12345** | quick_start | 0 | 2023-09-12 16:46:03.10662+00 | aOEb...Qekd/58= | The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. -Many destinations do not support distributed and long-running transactions (e.g. Amazon Redshift). +Many destinations do not support distributed and long-running transactions (e.g., Amazon Redshift). In that case, the user may see the partially loaded data. It is possible to filter such data out: any row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to identify and delete data for packages that never got completed. -For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g. +For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g., no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab of the [Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md#exploring-the-data) mentioned above. @@ -232,8 +219,7 @@ Data lineage can be super relevant for architectures like the [data vault architecture](https://www.data-vault.co.uk/what-is-data-vault/) or when troubleshooting. The data vault architecture is a data warehouse that large organizations use when representing the same process across multiple systems, which adds data lineage requirements. Using the pipeline name -and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of -data. +and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of data. You can [save](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) complete lineage info for a particular `load_id` including a list of loaded files, error messages @@ -243,11 +229,7 @@ problems. ## Staging dataset So far we've been using the `append` write disposition in our example pipeline. This means that -each time we run the pipeline, the data is appended to the existing tables. When you use [the -merge write disposition](incremental-loading.md), dlt creates a staging database schema for -staging data. This schema is named `_staging` and contains the same tables as the -destination schema. When you run the pipeline, the data from the staging tables is loaded into the -destination tables in a single atomic transaction. +each time we run the pipeline, the data is appended to the existing tables. When you use the [merge write disposition](incremental-loading.md), dlt creates a staging database schema for staging data. This schema is named `_staging` and contains the same tables as the destination schema. When you run the pipeline, the data from the staging tables is loaded into the destination tables in a single atomic transaction. Let's illustrate this with an example. We change our pipeline to use the `merge` write disposition: @@ -271,8 +253,7 @@ load_info = pipeline.run(users) ``` Running this pipeline will create a schema in the destination database with the name `mydata_staging`. -If you inspect the tables in this schema, you will find `mydata_staging.users` table identical to the -`mydata.users` table in the previous example. +If you inspect the tables in this schema, you will find the `mydata_staging.users` table identical to the`mydata.users` table in the previous example. Here is what the tables may look like after running the pipeline: @@ -291,8 +272,7 @@ Here is what the tables may look like after running the pipeline: | 2 | Bob 2 | rX8ybgTeEmAmmA | 2345672350.98417 | | 3 | Charlie | h8lehZEvT3fASQ | 1234563456.12345 | -Notice that the `mydata.users` table now contains the data from both the previous pipeline run and -the current one. +Notice that the `mydata.users` table now contains the data from both the previous pipeline run and the current one. ## Versioned datasets @@ -323,4 +303,4 @@ load_info = pipeline.run(data, table_name="users") Every time you run this pipeline, a new schema will be created in the destination database with a datetime-based suffix. The data will be loaded into tables in this schema. For example, the first time you run the pipeline, the schema will be named -`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. +`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. \ No newline at end of file From 1ce556ae8503bbf567d36d92221b2a77878a5d7e Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 21 May 2024 17:30:03 +0200 Subject: [PATCH 20/29] add naming rules to contributing (#1291) * branch naming rules in contributing * add motivation to branch naming rule * formatting and typo --- CONTRIBUTING.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a8a8cc37ae..85dbf37c97 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -52,6 +52,29 @@ We use **master** branch for hot fixes (including documentation) that needs to b On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**. +### Branch naming rules + +We want to make sure that our git history explains in a human readable way what has been changed with which Branch or PR. To this end, we are using the following branch naming pattern (all lowercase and dashes, no underscores): + +```sh +{category}/{ticket-id}-description-of-the-branch +# example: +feat/4922-add-avro-support +``` + +#### Branch categories + +* **feat** - a new feature that is being implemented (ticket required) +* **fix** - a change that fixes a bug (ticket required) +* **exp** - an experiment where we are testing a new idea or want to demonstrate something to the team, might turn into a `feat` later (ticket encouraged) +* **test** - anything related to the tests (ticket encouraged) +* **blogs** - a new entry to our blog (ticket optional) +* **docs** - a change to our docs (ticket optional) + +#### Ticket Numbers + +We encourage you to attach your branches to a ticket, if none exists, create one and explain what you are doing. For `feat` and `fix` branches, tickets are mandatory, for `exp` and `test` branches encouraged and for `blogs` and `docs` branches optional. + ### Submitting a hotfix We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it. @@ -166,3 +189,4 @@ Once the version has been bumped, follow these steps to publish the new release - [Poetry Documentation](https://python-poetry.org/docs/) If you have any questions or need help, don't hesitate to reach out to us. We're here to help you succeed in contributing to `dlt`. Happy coding! +**** \ No newline at end of file From b1e0f7760920355050b9c1dd6324ece695830434 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 21 May 2024 21:28:30 +0200 Subject: [PATCH 21/29] Fix snippet linting errors (#1392) * fix snippets * fix additional mypy errors * fix another auth type checker error --- .../verified-sources/rest_api.md | 84 +++++++++++-------- .../docs/general-usage/http/rest-client.md | 4 +- .../docs/walkthroughs/create-a-pipeline.md | 2 +- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 1f79055d06..0022850987 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -203,7 +203,7 @@ For example, you can set the primary key, write disposition, and other default s ```py config = { "client": { - ... + # ... }, "resource_defaults": { "primary_key": "id", @@ -216,15 +216,17 @@ config = { }, "resources": [ "resource1", - "resource2": { - "name": "resource2_name", - "write_disposition": "append", - "endpoint": { - "params": { - "param1": "value1", + { + "resource2": { + "name": "resource2_name", + "write_disposition": "append", + "endpoint": { + "params": { + "param1": "value1", + }, }, - }, - }, + } + } ], } ``` @@ -309,7 +311,7 @@ To specify the pagination configuration, use the `paginator` field in the [clien ```py { - ... + # ... "paginator": { "type": "json_links", "next_url_path": "paging.next", @@ -321,7 +323,7 @@ Or using the paginator instance: ```py { - ... + # ... "paginator": JSONResponsePaginator( next_url_path="paging.next" ), @@ -394,11 +396,11 @@ One of the most common method is token-based authentication. To authenticate wit ```py { "client": { - ... + # ... "auth": { "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -424,7 +426,7 @@ To specify the authentication configuration, use the `auth` field in the [client "type": "bearer", "token": dlt.secrets["your_api_token"], }, - ... + # ... }, } ``` @@ -438,7 +440,7 @@ config = { "client": { "auth": BearTokenAuth(dlt.secrets["your_api_token"]), }, - ... + # ... } ``` @@ -455,7 +457,7 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res "name": "issues", "endpoint": { "path": "issues", - ... + # ... }, }, { @@ -495,10 +497,12 @@ The `issue_comments` resource will make requests to the following endpoints: The syntax for the `resolve` field in parameter configuration is: ```py -"": { - "type": "resolve", - "resource": "", - "field": "", +{ + "": { + "type": "resolve", + "resource": "", + "field": "", + } } ``` @@ -530,21 +534,25 @@ When the API endpoint supports incremental loading, you can configure the source 1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration): ```py - "": { - "type": "incremental", - "cursor_path": "", - "initial_value": "", - }, + { + "": { + "type": "incremental", + "cursor_path": "", + "initial_value": "", + }, + } ``` For example, in the `issues` resource configuration in the GitHub example, we have: ```py - "since": { - "type": "incremental", - "cursor_path": "updated_at", - "initial_value": "2024-01-25T11:21:28Z", - }, + { + "since": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", + }, + } ``` This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests. @@ -552,13 +560,15 @@ When the API endpoint supports incremental loading, you can configure the source 2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration): ```py - "incremental": { - "start_param": "", - "end_param": "", - "cursor_path": "", - "initial_value": "", - "end_value": "", - }, + { + "incremental": { + "start_param": "", + "end_param": "", + "cursor_path": "", + "initial_value": "", + "end_value": "", + } + } ``` This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading. diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index 3f29182044..19cc95bf78 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -542,7 +542,7 @@ from dlt.sources.helpers.rest_client import RESTClient from dlt.sources.helpers.rest_client.auth import BearerTokenAuth client = RESTClient(base_url="https://api.example.com") -response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) +response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) # type: ignore print(response.status_code) print(response.headers) @@ -589,7 +589,7 @@ def response_hook(response, **kwargs): for page in client.paginate( "/posts", - auth=BearerTokenAuth(token="your_access_token"), + auth=BearerTokenAuth(token="your_access_token"), # type: ignore hooks={"response": [response_hook]} ): print(page) diff --git a/docs/website/docs/walkthroughs/create-a-pipeline.md b/docs/website/docs/walkthroughs/create-a-pipeline.md index bba78dc6cb..cbbbd73fc3 100644 --- a/docs/website/docs/walkthroughs/create-a-pipeline.md +++ b/docs/website/docs/walkthroughs/create-a-pipeline.md @@ -100,7 +100,7 @@ def github_api_resource(api_secret_key: str = dlt.secrets.value): for page in paginate( url, - auth=BearerTokenAuth(api_secret_key), + auth=BearerTokenAuth(api_secret_key), # type: ignore paginator=HeaderLinkPaginator(), params={"state": "open"} ): From bf92adaaa56e99d4a9a591789bf3763d1f321545 Mon Sep 17 00:00:00 2001 From: mucio Date: Thu, 23 May 2024 10:27:18 +0200 Subject: [PATCH 22/29] Added values to the data pattern of the rest_api helper (#1399) --- dlt/sources/helpers/rest_client/detector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlt/sources/helpers/rest_client/detector.py b/dlt/sources/helpers/rest_client/detector.py index 857f6bbb4e..d004ca173c 100644 --- a/dlt/sources/helpers/rest_client/detector.py +++ b/dlt/sources/helpers/rest_client/detector.py @@ -25,6 +25,7 @@ "payload", "content", "objects", + "values", ] ) From bab9e90306446e6ae4238b8753bab59b19c87955 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Thu, 23 May 2024 14:47:08 +0530 Subject: [PATCH 23/29] Added info about how to reorder the columns to adjust a schema (#1364) * Added info about how to reorder the columns * Updated rest_api.md with configuration examples * Update docs/website/docs/walkthroughs/adjust-a-schema.md * Updated ../website/docs/dlt-ecosystem/verified-sources/rest_api.md * fix naming convention for bigquery custom destination --------- Co-authored-by: Anton Burnashev Co-authored-by: AstrakhantsevaAA --- .../custom_destination_bigquery.py | 20 +++++++------- .../docs/walkthroughs/adjust-a-schema.md | 26 +++++++++++++++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index ea60b9b00d..e890469263 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -5,13 +5,13 @@ keywords: [destination, credentials, example, bigquery, custom destination] --- -In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination. +In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination. We'll learn how to: -- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) -- use the [custom destination](../dlt-ecosystem/destinations/destination.md) -- Use pyarrow tables to create complex column types on bigquery -- Use bigquery `autodetect=True` for schema inference from parquet files +- Use [built-in credentials.](../general-usage/credentials/config_specs#gcp-credentials) +- Use the [custom destination.](../dlt-ecosystem/destinations/destination.md) +- Use pyarrow tables to create complex column types on BigQuery. +- Use BigQuery `autodetect=True` for schema inference from parquet files. """ @@ -38,7 +38,7 @@ def resource(url: str): # load pyarrow table with pandas table = pa.Table.from_pandas(pd.read_csv(url)) - # we add a list type column to demontrate bigquery lists + # we add a list type column to demonstrate bigquery lists table = table.append_column( "tags", pa.array( @@ -57,12 +57,12 @@ def resource(url: str): yield table -# dlt biquery custom destination +# dlt bigquery custom destination # we can use the dlt provided credentials class # to retrieve the gcp credentials from the secrets -@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0) +@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0, naming_convention="snake_case") def bigquery_insert( - items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value + items, table=BIGQUERY_TABLE_ID, credentials: GcpServiceAccountCredentials = dlt.secrets.value ) -> None: client = bigquery.Client( credentials.project_id, credentials.to_native_credentials(), location="US" @@ -74,7 +74,7 @@ def bigquery_insert( ) # since we have set the batch_size to 0, we get a filepath and can load the file directly with open(items, "rb") as f: - load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config) + load_job = client.load_table_from_file(f, table, job_config=job_config) load_job.result() # Waits for the job to complete. diff --git a/docs/website/docs/walkthroughs/adjust-a-schema.md b/docs/website/docs/walkthroughs/adjust-a-schema.md index cfe2d056b0..b0a9a9ce05 100644 --- a/docs/website/docs/walkthroughs/adjust-a-schema.md +++ b/docs/website/docs/walkthroughs/adjust-a-schema.md @@ -121,6 +121,32 @@ Do not rename the tables or columns in the yaml file. `dlt` infers those from th You can [adjust the schema](../general-usage/resource.md#adjust-schema) in Python before resource is loaded. ::: +### Reorder columns +To reorder the columns in your dataset, follow these steps: + +1. Initial Run: Execute the pipeline to obtain the import and export schemas. +1. Modify Export Schema: Adjust the column order as desired in the export schema. +1. Sync Import Schema: Ensure that these changes are mirrored in the import schema to maintain consistency. +1. Delete Dataset: Remove the existing dataset to prepare for the reload. +1. Reload Data: Reload the data. The dataset should now reflect the new column order as specified in the import YAML. + +These steps ensure that the column order in your dataset matches your specifications. + +**Another approach** to reorder columns is to use the `add_map` function. For instance, to rearrange ‘column1’, ‘column2’, and ‘column3’, you can proceed as follows: + +```py +# Define the data source and reorder columns using add_map +data_source = resource().add_map(lambda row: { + 'column3': row['column3'], + 'column1': row['column1'], + 'column2': row['column2'] +}) + +# Run the pipeline +load_info = pipeline.run(data_source) +``` + +In this example, the `add_map` function reorders columns by defining a new mapping. The lambda function specifies the desired order by rearranging the key-value pairs. When the pipeline runs, the data will load with the columns in the new order. ### Load data as json instead of generating child table or columns from flattened dicts From e44984801a60beed73d32d9739b2cd5ae00cb403 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Thu, 23 May 2024 11:49:49 +0200 Subject: [PATCH 24/29] rest_api: add response_actions documentation (#1362) --- .../verified-sources/rest_api.md | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 0022850987..d5d29344de 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -282,7 +282,7 @@ The fields in the endpoint configuration are: - `json`: The JSON payload to be sent with the request (for POST and PUT requests). - `paginator`: Pagination configuration for the endpoint. See the [pagination](#pagination) section for more details. - `data_selector`: A JSONPath to select the data from the response. See the [data selection](#data-selection) section for more details. -- `response_actions`: A list of actions that define how to process the response data. +- `response_actions`: A list of actions that define how to process the response data. See the [response actions](#response-actions) section for more details. - `incremental`: Configuration for [incremental loading](#incremental-loading). ### Pagination @@ -586,3 +586,33 @@ See the [incremental loading](../../general-usage/incremental-loading.md#increme - `root_key` (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False. - `schema_contract`: Schema contract settings that will be applied to this resource. - `spec`: A specification of configuration and secret values required by the source. + +### Response actions + +The `response_actions` field in the endpoint configuration allows you to specify how to handle specific responses from the API based on status codes or content substrings. This is useful for handling edge cases like ignoring responses on specific conditions. + +:::caution Experimental Feature +This is an experimental feature and may change in future releases. +::: + +#### Example + +```py +{ + "path": "issues", + "response_actions": [ + {"status_code": 404, "action": "ignore"}, + {"content": "Not found", "action": "ignore"}, + {"status_code": 200, "content": "some text", "action": "ignore"}, + ], +} +``` + +In this example, the source will ignore responses with a status code of 404, responses with the content "Not found", and responses with a status code of 200 _and_ content "some text". + +**Fields:** + +- `status_code` (int, optional): The HTTP status code to match. +- `content` (str, optional): A substring to search for in the response content. +- `action` (str): The action to take when the condition is met. Currently supported actions: + - `ignore`: Ignore the response. From 19e1462e65995291be39081dfcbc0cd0dd588b6f Mon Sep 17 00:00:00 2001 From: rudolfix Date: Thu, 23 May 2024 17:04:25 +0200 Subject: [PATCH 25/29] detects a path param in the right-most path segment (#1394) --- dlt/sources/helpers/rest_client/detector.py | 8 ++++++-- .../helpers/rest_client/test_detector.py | 17 ++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/dlt/sources/helpers/rest_client/detector.py b/dlt/sources/helpers/rest_client/detector.py index d004ca173c..19a1e83a82 100644 --- a/dlt/sources/helpers/rest_client/detector.py +++ b/dlt/sources/helpers/rest_client/detector.py @@ -1,5 +1,6 @@ import re -from typing import List, Dict, Any, Tuple, Union, Optional, Callable, Iterable +from pathlib import PurePosixPath +from typing import List, Dict, Any, Tuple, Union, Callable, Iterable from urllib.parse import urlparse from requests import Response @@ -47,7 +48,10 @@ def single_entity_path(path: str) -> bool: """Checks if path ends with path param indicating that single object is returned""" - return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}/?$", path) is not None + # get last path segment + name = PurePosixPath(path).name + # alphabet for a name taken from https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.0.3.md#fixed-fields-6 + return re.search(r"\{([a-zA-Z0-9\.\-_]+)\}", name) is not None def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool: diff --git a/tests/sources/helpers/rest_client/test_detector.py b/tests/sources/helpers/rest_client/test_detector.py index f01f9409a1..6511b472fb 100644 --- a/tests/sources/helpers/rest_client/test_detector.py +++ b/tests/sources/helpers/rest_client/test_detector.py @@ -406,16 +406,20 @@ def test_find_paginator(test_case) -> None: [ "/users/{user_id}", "/api/v1/products/{product_id}/", - # those are not valid paths - # "/api/v1/products/{product_id}//", - # "/api/v1/products/{product_id}?param1=value1", - # "/api/v1/products/{product_id}#section", - # "/api/v1/products/{product_id}/#section", + "/api/v1/products/{product_id}//", + "/api/v1/products/{product_id}?param1=value1", + "/api/v1/products/{product_id}#section", + "/api/v1/products/{product_id}.json", + "/api/v1/products/{product_id}.json/", + "/api/v1/products/{product_id}_data", + "/api/v1/products/{product_id}_data?param=true", "/users/{user_id}/posts/{post_id}", "/users/{user_id}/posts/{post_id}/comments/{comment_id}", "{entity}", "/{entity}", "/{user_123}", + "/users/{user-id}", + "/users/{123}", ], ) def test_single_entity_path_valid(path): @@ -430,8 +434,7 @@ def test_single_entity_path_valid(path): "/users/{user_id}/details", "/", "/{}", - "/users/{123}", - "/users/{user-id}", + "/api/v1/products/{product_id}/#section", "/users/{user id}", "/users/{user_id}/{", # Invalid ending ], From d0fdfb4ca000ca57c56134bd0fb9b9f11e90b286 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Thu, 23 May 2024 18:50:26 +0200 Subject: [PATCH 26/29] Update the tutorial to use `rest_client.paginate` for pagination (#1287) --- .../docs/tutorial/grouping-resources.md | 141 ++++++------------ .../docs/tutorial/load-data-from-an-api.md | 51 ++++++- 2 files changed, 93 insertions(+), 99 deletions(-) diff --git a/docs/website/docs/tutorial/grouping-resources.md b/docs/website/docs/tutorial/grouping-resources.md index 3a05f7940c..3ba95b7971 100644 --- a/docs/website/docs/tutorial/grouping-resources.md +++ b/docs/website/docs/tutorial/grouping-resources.md @@ -14,6 +14,9 @@ This tutorial continues the [previous](load-data-from-an-api) part. We'll use th In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepare to load comments from the API as well. Here's a sample [dlt resource](../general-usage/resource) that does that: ```py +import dlt +from dlt.sources.helpers.rest_client import paginate + @dlt.resource( table_name="comments", write_disposition="merge", @@ -22,17 +25,11 @@ In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepar def get_comments( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = "https://api.github.com/repos/dlt-hub/dlt/comments?per_page=100" - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/comments", + params={"per_page": 100} + ): + yield page ``` We can load this resource separately from the issues resource, however loading both issues and comments in one go is more efficient. To do that, we'll use the `@dlt.source` decorator on a function that returns a list of resources: @@ -47,7 +44,7 @@ def github_source(): ```py import dlt -from dlt.sources.helpers import requests +from dlt.sources.helpers.rest_client import paginate @dlt.resource( table_name="issues", @@ -57,21 +54,17 @@ from dlt.sources.helpers import requests def get_issues( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = ( - "https://api.github.com/repos/dlt-hub/dlt/issues" - f"?since={updated_at.last_value}&per_page=100" - "&sort=updated&directions=desc&state=open" - ) - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/issues", + params={ + "since": updated_at.last_value, + "per_page": 100, + "sort": "updated", + "directions": "desc", + "state": "open", + } + ): + yield page @dlt.resource( @@ -82,20 +75,14 @@ def get_issues( def get_comments( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = ( - "https://api.github.com/repos/dlt-hub/dlt/comments" - "?per_page=100" - ) - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/comments", + params={ + "since": updated_at.last_value, + "per_page": 100, + } + ): + yield page @dlt.source @@ -124,18 +111,8 @@ from dlt.sources.helpers import requests BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt" def fetch_github_data(endpoint, params={}): - """Fetch data from GitHub API based on endpoint and params.""" url = f"{BASE_GITHUB_URL}/{endpoint}" - - while True: - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + return paginate(url, params=params) @dlt.source def github_source(): @@ -164,21 +141,16 @@ For the next step we'd want to get the [number of repository clones](https://doc Let's handle this by changing our `fetch_github_data()` first: ```py -def fetch_github_data(endpoint, params={}, access_token=None): - """Fetch data from GitHub API based on endpoint and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +def fetch_github_data(endpoint, params={}, access_token=None): url = f"{BASE_GITHUB_URL}/{endpoint}" + return paginate( + url, + params=params, + auth=BearerTokenAuth(token=access_token) if access_token else None, + ) - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] @dlt.source def github_source(access_token): @@ -229,28 +201,7 @@ access_token = "ghp_A...3aRY" Now we can run the script and it will load the data from the `traffic/clones` endpoint: ```py -import dlt -from dlt.sources.helpers import requests - -BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt" - - -def fetch_github_data(endpoint, params={}, access_token=None): - """Fetch data from GitHub API based on endpoint and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} - - url = f"{BASE_GITHUB_URL}/{endpoint}" - - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] - +... @dlt.source def github_source( @@ -287,19 +238,12 @@ BASE_GITHUB_URL = "https://api.github.com/repos/{repo_name}" def fetch_github_data(repo_name, endpoint, params={}, access_token=None): """Fetch data from GitHub API based on repo_name, endpoint, and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} - url = BASE_GITHUB_URL.format(repo_name=repo_name) + f"/{endpoint}" - - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + return paginate( + url, + params=params, + auth=BearerTokenAuth(token=access_token) if access_token else None, + ) @dlt.source @@ -347,5 +291,6 @@ Interested in learning more? Here are some suggestions: - [Pass config and credentials into your sources and resources](../general-usage/credentials). - [Run in production: inspecting, tracing, retry policies and cleaning up](../running-in-production/running). - [Run resources in parallel, optimize buffers and local storage](../reference/performance.md) + - [Use REST API client helpers](../general-usage/http/rest-client.md) to simplify working with REST APIs. 3. Check out our [how-to guides](../walkthroughs) to get answers to some common questions. 4. Explore the [Examples](../examples) section to see how dlt can be used in real-world scenarios diff --git a/docs/website/docs/tutorial/load-data-from-an-api.md b/docs/website/docs/tutorial/load-data-from-an-api.md index 31a2c1592d..ec6136b6d3 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api.md +++ b/docs/website/docs/tutorial/load-data-from-an-api.md @@ -44,7 +44,7 @@ dlt pipeline github_issues show ## Append or replace your data -Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have a new folder created daily with `json` file logs, and you want to ingest them. +Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have daily data updates and you want to ingest them. To get the latest data, we'd need to run the script again. But how to do that without duplicating the data? One option is to tell `dlt` to replace the data in existing tables in the destination by using `replace` write disposition. Change the `github_issues.py` script to the following: @@ -148,6 +148,55 @@ and `updated_at.last_value` to tell GitHub to return issues updated only **after [Learn more about merge write disposition](../general-usage/incremental-loading#merge-incremental_loading). +## Using pagination helper + +In the previous examples, we used the `requests` library to make HTTP requests to the GitHub API and handled pagination manually. `dlt` has the built-in [REST client](../general-usage/http/rest-client.md) that simplifies API requests. We'll pick the `paginate()` helper from it for the next example. The `paginate` function takes a URL and optional parameters (quite similar to `requests`) and returns a generator that yields pages of data. + +Here's how the updated script looks: + +```py +import dlt +from dlt.sources.helpers.rest_client import paginate + +@dlt.resource( + table_name="issues", + write_disposition="merge", + primary_key="id", +) +def get_issues( + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") +): + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/issues", + params={ + "since": updated_at.last_value, + "per_page": 100, + "sort": "updated", + "direction": "desc", + "state": "open", + }, + ): + yield page + +pipeline = dlt.pipeline( + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", +) +load_info = pipeline.run(get_issues) +row_counts = pipeline.last_trace.last_normalize_info + +print(row_counts) +print("------") +print(load_info) +``` + +Let's zoom in on the changes: + +1. The `while` loop that handled pagination is replaced with reading pages from the `paginate()` generator. +2. `paginate()` takes the URL of the API endpoint and optional parameters. In this case, we pass the `since` parameter to get only issues updated after the last pipeline run. +3. We're not explicitly setting up pagination, `paginate()` handles it for us. Magic! Under the hood, `paginate()` analyzes the response and detects the pagination method used by the API. Read more about pagination in the [REST client documentation](../general-usage/http/rest-client.md#paginating-api-responses). + ## Next steps Continue your journey with the [Resource Grouping and Secrets](grouping-resources) tutorial. From a8a6ff7288f8639bea9e92c20573126e538215e1 Mon Sep 17 00:00:00 2001 From: Harato Daisuke <129731743+Benjamin0313@users.noreply.github.com> Date: Fri, 24 May 2024 17:51:48 +0900 Subject: [PATCH 27/29] fix command to install dlt (#1404) --- docs/website/docs/dlt-ecosystem/destinations/snowflake.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md index f144da02e6..deaaff3562 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md +++ b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md @@ -9,7 +9,7 @@ keywords: [Snowflake, destination, data warehouse] ## Install `dlt` with Snowflake **To install the `dlt` library with Snowflake dependencies, run:** ```sh -pip install dlt[snowflake] +pip install "dlt[snowflake]" ``` ## Setup Guide From f6f583c65ef32dfa6bc4741a0db4e5e0a2b096b3 Mon Sep 17 00:00:00 2001 From: Alena Astrakhantseva Date: Fri, 24 May 2024 11:28:06 +0200 Subject: [PATCH 28/29] Update rest_api.md fix auth methods table --- docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index d5d29344de..54edac5062 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -414,8 +414,8 @@ Available authentication types: | Authentication class | String Alias (`type`) | Description | | ------------------- | ----------- | ----------- | | [BearTokenAuth](../../general-usage/http/rest-client.md#bearer-token-authentication) | `bearer` | Bearer token authentication. | -| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `api_key` | Basic HTTP authentication. | -| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `http_basic` | API key authentication with key defined in the query parameters or in the headers. | +| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `http_basic` | Basic HTTP authentication. | +| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `api_key` | API key authentication with key defined in the query parameters or in the headers. | To specify the authentication configuration, use the `auth` field in the [client](#client) configuration: From 7c07c674b67c6fac86e0c1f4f1d2b00ebbe7e655 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Fri, 24 May 2024 12:53:36 +0200 Subject: [PATCH 29/29] add typing classifier (#1391) update maintainers --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4bd62ce03b..1a946f5e10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "dlt" version = "0.4.11" description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run." authors = ["dltHub Inc. "] -maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] +maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Anton Burnashev ", "David Scharf " ] readme = "README.md" license = "Apache-2.0" homepage = "https://github.com/dlt-hub" @@ -13,6 +13,7 @@ classifiers = [ "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Topic :: Software Development :: Libraries", + "Typing :: Typed", "Operating System :: MacOS :: MacOS X", "Operating System :: POSIX :: Linux", "Operating System :: Microsoft :: Windows",]