diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a8a8cc37ae..85dbf37c97 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -52,6 +52,29 @@ We use **master** branch for hot fixes (including documentation) that needs to b On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**. +### Branch naming rules + +We want to make sure that our git history explains in a human readable way what has been changed with which Branch or PR. To this end, we are using the following branch naming pattern (all lowercase and dashes, no underscores): + +```sh +{category}/{ticket-id}-description-of-the-branch +# example: +feat/4922-add-avro-support +``` + +#### Branch categories + +* **feat** - a new feature that is being implemented (ticket required) +* **fix** - a change that fixes a bug (ticket required) +* **exp** - an experiment where we are testing a new idea or want to demonstrate something to the team, might turn into a `feat` later (ticket encouraged) +* **test** - anything related to the tests (ticket encouraged) +* **blogs** - a new entry to our blog (ticket optional) +* **docs** - a change to our docs (ticket optional) + +#### Ticket Numbers + +We encourage you to attach your branches to a ticket, if none exists, create one and explain what you are doing. For `feat` and `fix` branches, tickets are mandatory, for `exp` and `test` branches encouraged and for `blogs` and `docs` branches optional. + ### Submitting a hotfix We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it. @@ -166,3 +189,4 @@ Once the version has been bumped, follow these steps to publish the new release - [Poetry Documentation](https://python-poetry.org/docs/) If you have any questions or need help, don't hesitate to reach out to us. We're here to help you succeed in contributing to `dlt`. Happy coding! +**** \ No newline at end of file diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index fdd5b50111..bd32c68c49 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -55,7 +55,10 @@ def __init__( self.closed_files: List[DataWriterMetrics] = [] # all fully processed files # buffered items must be less than max items in file self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items) + # Explicitly configured max size supersedes destination limit self.file_max_bytes = file_max_bytes + if self.file_max_bytes is None and _caps: + self.file_max_bytes = _caps.recommended_file_size self.file_max_items = file_max_items # the open function is either gzip.open or open self.open = ( diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e74f5a980d..089b4a1d5e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + recommended_file_size: Optional[int] = None + """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None supported_staging_file_formats: Sequence[TLoaderFileFormat] = None escape_identifier: Callable[[str], str] = None diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index d33466ed5e..39322b43a0 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] + # BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size + caps.recommended_file_size = int(1024 * 1024 * 1024) caps.escape_identifier = escape_bigquery_identifier caps.escape_literal = None caps.format_datetime_literal = format_bigquery_datetime_literal diff --git a/dlt/destinations/impl/mssql/__init__.py b/dlt/destinations/impl/mssql/__init__.py index e9d9fe24fd..f7768d9238 100644 --- a/dlt/destinations/impl/mssql/__init__.py +++ b/dlt/destinations/impl/mssql/__init__.py @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext: # https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN caps.max_identifier_length = 128 caps.max_column_identifier_length = 128 - caps.max_query_length = 4 * 1024 * 64 * 1024 + # A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet + caps.max_query_length = 65536 * 10 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 8de15e2bd9..6f364c8af1 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -181,7 +181,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index fac6391e01..9c4076cfa7 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -567,16 +567,13 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl: compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs), incremental, ) - except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex: + except InvalidResourceDataTypeFunctionNotAGenerator: # we allow an edge case: resource can return another resource - try: - # actually call the function to see if it contains DltResource - data_ = conf_f(*args, **kwargs) - if not isinstance(data_, DltResource): - raise - r = data_ # type: ignore[assignment] - except Exception: - raise gen_ex from None + # actually call the function to see if it contains DltResource + data_ = conf_f(*args, **kwargs) + if not isinstance(data_, DltResource): + raise + r = data_ # type: ignore[assignment] # consider transformer arguments bound r._args_bound = True # keep explicit args passed diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 97cf23fdfc..b3fc2fbcd4 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration): raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" _load_storage_config: LoadStorageConfiguration = None + # if set to `True`, the staging dataset will be + # truncated after loading the data + truncate_staging_dataset: bool = False def on_resolved(self) -> None: self.pool_type = "none" if self.workers == 1 else "thread" diff --git a/dlt/load/load.py b/dlt/load/load.py index 66ddb1c308..9d898bc54d 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -53,7 +53,7 @@ LoadClientUnsupportedWriteDisposition, LoadClientUnsupportedFileFormats, ) -from dlt.load.utils import get_completed_table_chain, init_client +from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]): @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) ) ): job_client.complete_load(load_id) + self._maybe_trancate_staging_dataset(schema, job_client) + self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: return TRunMetrics(False, len(self.load_storage.list_normalized_packages())) + def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None: + """ + Truncate the staging dataset if one used, + and configuration requests truncation. + + Args: + schema (Schema): Schema to use for the staging dataset. + job_client (JobClientBase): + Job client to use for the staging dataset. + """ + if not ( + isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset + ): + return + + data_tables = schema.data_table_names() + tables = _extend_tables_with_table_chain( + schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset + ) + + try: + with self.get_destination_client(schema) as client: + with client.with_staging_dataset(): # type: ignore + client.initialize_storage(truncate_tables=tables) + + except Exception as exc: + logger.warn( + f"Staging dataset truncate failed due to the following error: {exc}" + " However, it didn't affect the data integrity." + ) + def get_step_info( self, pipeline: SupportsPipeline, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a2ea1936a9..53770f332d 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -554,6 +554,7 @@ def load( with signals.delayed_signals(): runner.run_pool(load_step.config, load_step) info: LoadInfo = self._get_step_info(load_step) + self.first_run = False return info except Exception as l_ex: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 1367b96bd4..0022850987 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -174,13 +174,13 @@ The configuration object passed to the REST API Generic Source has three main el ```py config: RESTAPIConfig = { "client": { - # ... + ... }, "resource_defaults": { - # ... + ... }, "resources": [ - # ... + ... ], } ``` @@ -203,9 +203,7 @@ For example, you can set the primary key, write disposition, and other default s ```py config = { "client": { - "api_key": "your_api_key_here", - "base_url": "https://api.example.com", - # Add other client configurations here + # ... }, "resource_defaults": { "primary_key": "id", @@ -219,14 +217,16 @@ config = { "resources": [ "resource1", { - "name": "resource2_name", - "write_disposition": "append", - "endpoint": { - "params": { - "param1": "value1", + "resource2": { + "name": "resource2_name", + "write_disposition": "append", + "endpoint": { + "params": { + "param1": "value1", + }, }, - }, - }, + } + } ], } ``` @@ -497,15 +497,15 @@ The `issue_comments` resource will make requests to the following endpoints: The syntax for the `resolve` field in parameter configuration is: ```py -({ - "{parameter_name}" : - { +{ + "": { "type": "resolve", - "resource": "{parent_resource_name}", - "field": "{parent_resource_field_name}", + "resource": "", + "field": "", } -}) +} ``` + Under the hood, dlt handles this by using a [transformer resource](../../general-usage/resource.md#process-resources-with-dlttransformer). #### Include fields from the parent resource @@ -516,7 +516,7 @@ You can include data from the parent resource in the child resource by using the { "name": "issue_comments", "endpoint": { - # ... + ... }, "include_from_parent": ["id", "title", "created_at"], } @@ -534,25 +534,25 @@ When the API endpoint supports incremental loading, you can configure the source 1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration): ```py - ({ + { "": { "type": "incremental", "cursor_path": "", "initial_value": "", - } - }) + }, + } ``` For example, in the `issues` resource configuration in the GitHub example, we have: ```py - ({ + { "since": { "type": "incremental", "cursor_path": "updated_at", "initial_value": "2024-01-25T11:21:28Z", - } - }) + }, + } ``` This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests. @@ -560,7 +560,7 @@ When the API endpoint supports incremental loading, you can configure the source 2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration): ```py - ({ + { "incremental": { "start_param": "", "end_param": "", @@ -568,7 +568,7 @@ When the API endpoint supports incremental loading, you can configure the source "initial_value": "", "end_value": "", } - }) + } ``` This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md index 970a891e60..38eda15c94 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps: [This command](../../reference/command-line-interface) will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py) - with Google Sheets as the [source](../../general-usage/source) and + with Slack as the [source](../../general-usage/source) and [duckdb](../destinations/duckdb.md) as the [destination](../destinations). 1. If you'd like to use a different destination, simply replace `duckdb` with the name of your diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 8e1f771e47..4780d4be20 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -74,7 +74,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(users) ``` -The result will be the same, but the table is implicitly named `users` based on the resource name. +The result will be the same; note that we do not explicitly pass `table_name="users"` to `pipeline.run`, and the table is implicitly named `users` based on the resource name (e.g., `users()` decorated with `@dlt.resource`). :::note @@ -117,9 +117,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(data, table_name="users") ``` -Running this pipeline will create two tables in the destination, `users` and `users__pets`. The -`users` table will contain the top level data, and the `users__pets` table will contain the child -data. Here is what the tables may look like: +Running this pipeline will create two tables in the destination, `users` and `users__pets`. The `users` table will contain the top-level data, and the `users__pets` table will contain the child data. Here is what the tables may look like: **mydata.users** @@ -141,21 +139,14 @@ creating and linking children and parent tables. This is how it works: -1. Each row in all (top level and child) data tables created by `dlt` contains UNIQUE column named - `_dlt_id`. -1. Each child table contains FOREIGN KEY column `_dlt_parent_id` linking to a particular row - (`_dlt_id`) of a parent table. -1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in - `_dlt_list_idx`. -1. For tables that are loaded with the `merge` write disposition, we add a ROOT KEY column - `_dlt_root_id`, which links child table to a row in top level table. - +1. Each row in all (top level and child) data tables created by `dlt` contains a `UNIQUE` column named `_dlt_id`. +1. Each child table contains a `FOREIGN KEY` column `_dlt_parent_id` linking to a particular row (`_dlt_id`) of a parent table. +1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`. +1. For tables that are loaded with the `merge` write disposition, we add a root key column `_dlt_root_id`, which links the child table to a row in the top-level table. :::note -If you define your own primary key in a child table, it will be used to link to parent table -and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even in -case the primary key or other unique columns are defined. +If you define your own primary key in a child table, it will be used to link to the parent table, and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even if the primary key or other unique columns are defined. ::: @@ -164,17 +155,15 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. ### Variant columns -If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (ie json file) with a filed with name **answer** and your data contains boolean values, you will get get a column with name **answer** of type **BOOLEAN** in your destination. If for some reason, on next load you get integer value and string value in **answer**, the inconsistent data will go to **answer__v_bigint** and **answer__v_text** columns respectively. -The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of data type stored in the variant. - +If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (i.e., JSON file) with a field with name `answer` and your data contains boolean values, you will get a column with name `answer` of type `BOOLEAN` in your destination. If for some reason, on the next load, you get integer and string values in `answer`, the inconsistent data will go to `answer__v_bigint` and `answer__v_text` columns respectively. +The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of the data type stored in the variant. ## Load Packages and Load IDs Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from all the [resources](glossary.md#resource) of a particular [source](glossary.md#source). These packages are uniquely identified by a `load_id`. The `load_id` of a particular package is added to the top data tables -(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status 0 -(when the load process is fully completed). +(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status of 0 (when the load process is fully completed). To illustrate this, let's load more data into the same destination: @@ -189,8 +178,7 @@ data = [ ``` The rest of the pipeline definition remains the same. Running this pipeline will create a new load -package with a new `load_id` and add the data to the existing tables. The `users` table will now -look like this: +package with a new `load_id` and add the data to the existing tables. The `users` table will now look like this: **mydata.users** @@ -210,12 +198,12 @@ The `_dlt_loads` table will look like this: | **1234563456.12345** | quick_start | 0 | 2023-09-12 16:46:03.10662+00 | aOEb...Qekd/58= | The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. -Many destinations do not support distributed and long-running transactions (e.g. Amazon Redshift). +Many destinations do not support distributed and long-running transactions (e.g., Amazon Redshift). In that case, the user may see the partially loaded data. It is possible to filter such data out: any row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to identify and delete data for packages that never got completed. -For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g. +For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g., no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab of the [Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md#exploring-the-data) mentioned above. @@ -231,8 +219,7 @@ Data lineage can be super relevant for architectures like the [data vault architecture](https://www.data-vault.co.uk/what-is-data-vault/) or when troubleshooting. The data vault architecture is a data warehouse that large organizations use when representing the same process across multiple systems, which adds data lineage requirements. Using the pipeline name -and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of -data. +and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of data. You can [save](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) complete lineage info for a particular `load_id` including a list of loaded files, error messages @@ -242,11 +229,7 @@ problems. ## Staging dataset So far we've been using the `append` write disposition in our example pipeline. This means that -each time we run the pipeline, the data is appended to the existing tables. When you use [the -merge write disposition](incremental-loading.md), dlt creates a staging database schema for -staging data. This schema is named `_staging` and contains the same tables as the -destination schema. When you run the pipeline, the data from the staging tables is loaded into the -destination tables in a single atomic transaction. +each time we run the pipeline, the data is appended to the existing tables. When you use the [merge write disposition](incremental-loading.md), dlt creates a staging database schema for staging data. This schema is named `_staging` and contains the same tables as the destination schema. When you run the pipeline, the data from the staging tables is loaded into the destination tables in a single atomic transaction. Let's illustrate this with an example. We change our pipeline to use the `merge` write disposition: @@ -270,8 +253,7 @@ load_info = pipeline.run(users) ``` Running this pipeline will create a schema in the destination database with the name `mydata_staging`. -If you inspect the tables in this schema, you will find `mydata_staging.users` table identical to the -`mydata.users` table in the previous example. +If you inspect the tables in this schema, you will find the `mydata_staging.users` table identical to the`mydata.users` table in the previous example. Here is what the tables may look like after running the pipeline: @@ -290,8 +272,7 @@ Here is what the tables may look like after running the pipeline: | 2 | Bob 2 | rX8ybgTeEmAmmA | 2345672350.98417 | | 3 | Charlie | h8lehZEvT3fASQ | 1234563456.12345 | -Notice that the `mydata.users` table now contains the data from both the previous pipeline run and -the current one. +Notice that the `mydata.users` table now contains the data from both the previous pipeline run and the current one. ## Versioned datasets @@ -322,4 +303,4 @@ load_info = pipeline.run(data, table_name="users") Every time you run this pipeline, a new schema will be created in the destination database with a datetime-based suffix. The data will be loaded into tables in this schema. For example, the first time you run the pipeline, the schema will be named -`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. +`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. \ No newline at end of file diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index ca39046d35..19cc95bf78 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -385,7 +385,7 @@ class PostBodyPaginator(BasePaginator): # Add the cursor to the request body request.json["cursor"] = self.cursor - + client = RESTClient( base_url="https://api.example.com", paginator=PostBodyPaginator() @@ -527,4 +527,70 @@ from dlt.sources.helpers.rest_client import paginate for page in paginate("https://api.example.com/posts"): print(page) -``` \ No newline at end of file +``` + +## Troubleshooting + +### `RESTClient.get()` and `RESTClient.post()` methods + +These methods work similarly to the [get()](https://docs.python-requests.org/en/latest/api/#requests.get) and [post()](https://docs.python-requests.org/en/latest/api/#requests.post) functions +from the Requests library. They return a [Response](https://docs.python-requests.org/en/latest/api/#requests.Response) object that contains the response data. +You can inspect the `Response` object to get the `response.status_code`, `response.headers`, and `response.content`. For example: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +client = RESTClient(base_url="https://api.example.com") +response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) # type: ignore + +print(response.status_code) +print(response.headers) +print(response.content) +``` + +### `RESTClient.paginate()` + +Debugging `paginate()` is trickier because it's a generator function that yields [`PageData`](#pagedata) objects. Here's several ways to debug the `paginate()` method: + +1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests: + +```sh +RUNTIME__LOG_LEVEL=INFO python my_script.py +``` + +2. Use the [`PageData`](#pagedata) instance to inspect the [request](https://docs.python-requests.org/en/latest/api/#requests.Request) +and [response](https://docs.python-requests.org/en/latest/api/#requests.Response) objects: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator + +client = RESTClient( + base_url="https://api.example.com", + paginator=JSONResponsePaginator(next_url_path="pagination.next") +) + +for page in client.paginate("/posts"): + print(page.request) + print(page.response) +``` + +3. Use the `hooks` parameter to add custom response handlers to the `paginate()` method: + +```py +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +def response_hook(response, **kwargs): + print(response.status_code) + print(f"Content: {response.content}") + print(f"Request: {response.request.body}") + # Or import pdb; pdb.set_trace() to debug + +for page in client.paginate( + "/posts", + auth=BearerTokenAuth(token="your_access_token"), # type: ignore + hooks={"response": [response_hook]} +): + print(page) +``` diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 776329bcf4..0374802b7d 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -32,6 +32,10 @@ The library will create or update tables, infer data types, and handle nested da ]}> +:::tip +Looking to use a REST API as a source? Explore our new [REST API generic source](dlt-ecosystem/verified-sources/rest_api) for a declarative way to load data. +::: + diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index 253a27d942..9c52f58caa 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -108,6 +108,12 @@ behind. In `config.toml`: load.delete_completed_jobs=true ``` +Also, by default, `dlt` leaves data in staging dataset, used during merge and replace load for deduplication. In order to clear it, put the following line in `config.toml`: + +```toml +load.truncate_staging_dataset=true +``` + ## Using slack to send messages `dlt` provides basic support for sending slack messages. You can configure Slack incoming hook via diff --git a/docs/website/docs/walkthroughs/create-a-pipeline.md b/docs/website/docs/walkthroughs/create-a-pipeline.md index 1d5974efbe..cbbbd73fc3 100644 --- a/docs/website/docs/walkthroughs/create-a-pipeline.md +++ b/docs/website/docs/walkthroughs/create-a-pipeline.md @@ -1,31 +1,46 @@ --- title: Create a pipeline description: How to create a pipeline -keywords: [how to, create a pipeline] +keywords: [how to, create a pipeline, rest client] --- # Create a pipeline -Follow the steps below to create a [pipeline](../general-usage/glossary.md#pipeline) from the -WeatherAPI.com API to DuckDB from scratch. The same steps can be repeated for any source and -destination of your choice—use `dlt init ` and then build the pipeline for -that API instead. +This guide walks you through creating a pipeline that uses our [REST API Client](../general-usage/http/rest-client) +to connect to [DuckDB](../dlt-ecosystem/destinations/duckdb). +:::tip +We're using DuckDB as a destination here, but you can adapt the steps to any [source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/) and [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) by +using the [command](../reference/command-line-interface#dlt-init) `dlt init ` and tweaking the pipeline accordingly. +::: -Please make sure you have [installed `dlt`](../reference/installation.md) before following the +Please make sure you have [installed `dlt`](../reference/installation) before following the steps below. +## Task overview + +Imagine you want to analyze issues from a GitHub project locally. +To achieve this, you need to write code that accomplishes the following: + +1. Constructs a correct request. +2. Authenticates your request. +3. Fetches and handles paginated issue data. +4. Stores the data for analysis. + +This may sound complicated, but dlt provides a [REST API Client](../general-usage/http/rest-client) that allows you to focus more on your data rather than on managing API interactions. + + ## 1. Initialize project Create a new empty directory for your `dlt` project by running: ```sh -mkdir weatherapi_duckdb && cd weatherapi_duckdb +mkdir github_api_duckdb && cd github_api_duckdb ``` Start a `dlt` project with a pipeline template that loads data to DuckDB by running: ```sh -dlt init weatherapi duckdb +dlt init github_api duckdb ``` Install the dependencies necessary for DuckDB: @@ -34,114 +49,127 @@ Install the dependencies necessary for DuckDB: pip install -r requirements.txt ``` -## 2. Add WeatherAPI.com API credentials +## 2. Obtain and add API credentials from GitHub -You will need to [sign up for the WeatherAPI.com API](https://www.weatherapi.com/signup.aspx). +You will need to [sign in](https://github.com/login) to your GitHub account and create your access token via [Personal access tokens page](https://github.com/settings/tokens). -Once you do this, you should see your `API Key` at the top of your -[user page](https://www.weatherapi.com/my/). - -Copy the value of the API key into `.dlt/secrets.toml`: +Copy your new access token over to `.dlt/secrets.toml`: ```toml [sources] api_secret_key = '' ``` -The **secret name** corresponds to the **argument name** in the source function. Below `api_secret_key` [will get its value](../general-usage/credentials/configuration.md#general-usage-and-an-example) from `secrets.toml` when `weatherapi_source()` is called. + +This token will be used by `github_api_source()` to authenticate requests. + +The **secret name** corresponds to the **argument name** in the source function. +Below `api_secret_key` [will get its value](../general-usage/credentials/configuration#allow-dlt-to-pass-the-config-and-secrets-automatically) +from `secrets.toml` when `github_api_source()` is called. + ```py @dlt.source -def weatherapi_source(api_secret_key=dlt.secrets.value): - ... +def github_api_source(api_secret_key: str = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) ``` -Run the `weatherapi.py` pipeline script to test that authentication headers look fine: +Run the `github_api.py` pipeline script to test that authentication headers look fine: ```sh -python3 weatherapi.py +python github_api.py ``` Your API key should be printed out to stdout along with some test data. -## 3. Request data from the WeatherAPI.com API +## 3. Request project issues from then GitHub API -Replace the definition of the `weatherapi_resource` function definition in the `weatherapi.py` -pipeline script with a call to the WeatherAPI.com API: -```py -@dlt.resource(write_disposition="append") -def weatherapi_resource(api_secret_key=dlt.secrets.value): - url = "https://api.weatherapi.com/v1/current.json" - params = { - "q": "NYC", - "key": api_secret_key - } - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() -``` +:::tip +We will use `dlt` repository as an example GitHub project https://github.com/dlt-hub/dlt, feel free to replace it with your own repository. +::: -Run the `weatherapi.py` pipeline script to test that the API call works: +Modify `github_api_resource` in `github_api.py` to request issues data from your GitHub project's API: -```sh -python3 weatherapi.py +```py +from dlt.sources.helpers.rest_client import paginate +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: str = dlt.secrets.value): + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + for page in paginate( + url, + auth=BearerTokenAuth(api_secret_key), # type: ignore + paginator=HeaderLinkPaginator(), + params={"state": "open"} + ): + yield page ``` -This should print out the weather in New York City right now. - ## 4. Load the data -Remove the `exit()` call from the `main` function in `weatherapi.py`, so that running the -`python3 weatherapi.py` command will now also run the pipeline: +Uncomment the commented out code in `main` function in `github_api.py`, so that running the +`python github_api.py` command will now also run the pipeline: ```py if __name__=='__main__': - # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name='weatherapi', + pipeline_name='github_api_pipeline', destination='duckdb', - dataset_name='weatherapi_data' + dataset_name='github_api_data' ) # print credentials by running the resource - data = list(weatherapi_resource()) + data = list(github_api_resource()) # print the data yielded from resource print(data) # run the pipeline with your parameters - load_info = pipeline.run(weatherapi_source()) + load_info = pipeline.run(github_api_source()) # pretty print the information on data that was loaded print(load_info) ``` -Run the `weatherapi.py` pipeline script to load data into DuckDB: + +Run the `github_api.py` pipeline script to test that the API call works: ```sh -python3 weatherapi.py +python github_api.py ``` -Then this command to see that the data loaded: +This should print out JSON data containing the issues in the GitHub project. + +It also prints `load_info` object. + +Let's explore the loaded data with the [command](../reference/command-line-interface#show-tables-and-data-in-the-destination) `dlt pipeline show`. + +:::info +Make sure you have `streamlit` installed `pip install streamlit` +::: ```sh -dlt pipeline weatherapi show +dlt pipeline github_api_pipeline show ``` This will open a Streamlit app that gives you an overview of the data loaded. ## 5. Next steps -Now that you have a working pipeline, you have options for what to learn next: +With a functioning pipeline, consider exploring: +- Our [REST Client](../general-usage/http/rest-client). - [Deploy this pipeline with GitHub Actions](deploy-a-pipeline/deploy-with-github-actions), so that the data is automatically loaded on a schedule. - Transform the [loaded data](../dlt-ecosystem/transformations) with dbt or in Pandas DataFrames. -- Learn how to [run](../running-in-production/running.md), - [monitor](../running-in-production/monitoring.md), and - [alert](../running-in-production/alerting.md) when you put your pipeline in production. +- Learn how to [run](../running-in-production/running), + [monitor](../running-in-production/monitoring), and + [alert](../running-in-production/alerting) when you put your pipeline in production. - Try loading data to a different destination like - [Google BigQuery](../dlt-ecosystem/destinations/bigquery.md), - [Amazon Redshift](../dlt-ecosystem/destinations/redshift.md), or - [Postgres](../dlt-ecosystem/destinations/postgres.md). + [Google BigQuery](../dlt-ecosystem/destinations/bigquery), + [Amazon Redshift](../dlt-ecosystem/destinations/redshift), or + [Postgres](../dlt-ecosystem/destinations/postgres). diff --git a/tests/common/data_writers/utils.py b/tests/common/data_writers/utils.py index 2cb440bde1..e6e377b7d0 100644 --- a/tests/common/data_writers/utils.py +++ b/tests/common/data_writers/utils.py @@ -1,5 +1,5 @@ import os -from typing import Type +from typing import Type, Optional from dlt.common.data_writers.buffered import BufferedDataWriter from dlt.common.data_writers.writers import TWriter, ALL_WRITERS @@ -18,8 +18,8 @@ def get_writer( writer: Type[TWriter], buffer_max_items: int = 10, - file_max_items: int = 10, - file_max_bytes: int = None, + file_max_items: Optional[int] = 10, + file_max_bytes: Optional[int] = None, disable_compression: bool = False, caps: DestinationCapabilitiesContext = None, ) -> BufferedDataWriter[TWriter]: diff --git a/tests/extract/data_writers/test_buffered_writer.py b/tests/extract/data_writers/test_buffered_writer.py index 82b81a1cd7..b6da132de9 100644 --- a/tests/extract/data_writers/test_buffered_writer.py +++ b/tests/extract/data_writers/test_buffered_writer.py @@ -2,6 +2,7 @@ import pytest import time from typing import Iterator, Type +from uuid import uuid4 from dlt.common.data_writers.exceptions import BufferedDataWriterClosed from dlt.common.data_writers.writers import ( @@ -11,7 +12,7 @@ JsonlWriter, ALL_WRITERS, ) -from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.storages.file_storage import FileStorage @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data metrics = writer.import_file( "tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0) ) + + +@pytest.mark.parametrize( + "disable_compression", [True, False], ids=["no_compression", "compression"] +) +@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS) +def test_rotation_on_destination_caps_recommended_file_size( + disable_compression: bool, writer_type: Type[DataWriter] +) -> None: + caps = DestinationCapabilitiesContext.generic_capabilities() + caps.recommended_file_size = int(250 * 1024) + columns = {"id": new_column("id", "text")} + with get_writer( + writer_type, + disable_compression=disable_compression, + buffer_max_items=100, + file_max_items=None, + file_max_bytes=None, + caps=caps, + ) as writer: + for i in range(8): + # Data chunk approximately 40kb serialized + items = [{"id": str(uuid4())} for _ in range(1000)] + writer.write_data_item(items, columns) + if i < 5: + assert not writer.closed_files + + if i > 5: + # We should have written atleast 250kb by now and have rotated the file + assert len(writer.closed_files) == 1 + + # Check the files that were written are all within the recommended size + 1 chunk + assert len(writer.closed_files) == 2 + for file in writer.closed_files: + assert file.file_size < caps.recommended_file_size + 1024 * 50 diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 5e85552d73..c6a675a8d3 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -880,6 +880,17 @@ def rv_resource(name: str): assert list(r) == [1, 2, 3] +def test_standalone_resource_returning_resource_exception() -> None: + @dlt.resource(standalone=True) + def rv_resource(uniq_name: str = dlt.config.value): + return dlt.resource([1, 2, 3], name=uniq_name, primary_key="value") + + # pass through of the exception in `rv_resource` when it returns, not yields + with pytest.raises(ConfigFieldMissingException) as conf_ex: + rv_resource() + assert conf_ex.value.fields == ["uniq_name"] + + def test_resource_rename_credentials_separation(): os.environ["SOURCES__TEST_DECORATORS__STANDALONE_SIGNATURE__SECRET_END"] = "5" assert list(standalone_signature(1)) == [1, 2, 3, 4] diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 845800e47f..533d16c998 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -384,7 +384,17 @@ def dag_parallel(): with mock.patch("dlt.helpers.airflow_helper.logger.warn") as warn_mock: dag_def = dag_parallel() dag_def.test() - warn_mock.assert_called_once() + warn_mock.assert_has_calls( + [ + mock.call( + "The resource resource2 in task" + " mock_data_incremental_source_resource1-resource2 is using incremental loading" + " and may modify the state. Resources that modify the state should not run in" + " parallel within the single pipeline as the state will not be correctly" + " merged. Please use 'serialize' or 'parallel-isolated' modes instead." + ) + ] + ) def test_parallel_isolated_run(): diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index ca962adb16..4519f1ea83 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -1,6 +1,7 @@ import posixpath import os from unittest import mock +from pathlib import Path import pytest @@ -117,16 +118,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id1 = load_info layout = client.config.layout # this path will be kept after replace - job_2_load_1_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[1], - client.schema.name, - load_id1, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_2_load_1_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[1], + client.schema.name, + load_id1, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) with perform_load( @@ -135,16 +138,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id2 = load_info # this one we expect to be replaced with - job_1_load_2_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[0], - client.schema.name, - load_id2, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_1_load_2_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[0], + client.schema.name, + load_id2, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) # First file from load1 remains, second file is replaced by load2 @@ -159,7 +164,7 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) ls = set(paths) assert ls == {job_2_load_1_path, job_1_load_2_path} @@ -210,7 +215,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None ) for job in jobs2 ] - expected_files = sorted([posixpath.join(root_path, fn) for fn in expected_files]) + expected_files = sorted([Path(posixpath.join(root_path, fn)) for fn in expected_files]) # type: ignore[misc] paths = [] for basedir, _dirs, files in client.fs_client.walk( @@ -222,5 +227,5 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 7680bc6e90..5f24daf57f 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -301,7 +301,7 @@ def count(*args, **kwargs) -> Any: for file in files: if ".jsonl" in file: - expected_files.add(posixpath.join(basedir, file)) + expected_files.add(Path(posixpath.join(basedir, file))) for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] @@ -321,7 +321,7 @@ def count(*args, **kwargs) -> Any: full_path = posixpath.join(client.dataset_path, path) # type: ignore[attr-defined] assert client.fs_client.exists(full_path) # type: ignore[attr-defined] if ".jsonl" in full_path: - known_files.add(full_path) + known_files.add(Path(full_path)) assert expected_files == known_files assert known_files diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a498b570a0..d98f335d16 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -10,6 +10,7 @@ from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME @@ -896,6 +897,7 @@ def test_pipeline_upfront_tables_two_loads( # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + os.environ["TRUNCATE_STAGING_DATASET"] = "True" pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", @@ -1001,6 +1003,21 @@ def table_3(make_data=False): is True ) + job_client, _ = pipeline._get_destination_clients(schema) + + if destination_config.staging and isinstance(job_client, WithStagingDataset): + for i in range(1, 4): + with pipeline.sql_client() as client: + table_name = f"table_{i}" + + if job_client.should_load_data_to_staging_dataset( + job_client.schema.tables[table_name] + ): + with client.with_staging_dataset(staging=True): + tab_name = client.make_qualified_table_name(table_name) + with client.execute_query(f"SELECT * FROM {tab_name}") as cur: + assert len(cur.fetchall()) == 0 + # @pytest.mark.skip(reason="Finalize the test: compare some_data values to values from database") # @pytest.mark.parametrize( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index a828de40fd..1c4383405b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,9 +5,9 @@ import logging import os import random +import threading from time import sleep from typing import Any, Tuple, cast -import threading from tenacity import retry_if_exception, Retrying, stop_after_attempt import pytest @@ -2230,3 +2230,33 @@ def stateful_resource(): assert len(fs_client.list_table_files("_dlt_loads")) == 2 assert len(fs_client.list_table_files("_dlt_version")) == 1 assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1 + + +@pytest.mark.parametrize("truncate", (True, False)) +def test_staging_dataset_truncate(truncate) -> None: + dlt.config["truncate_staging_dataset"] = truncate + + @dlt.resource(write_disposition="merge", merge_key="id") + def test_data(): + yield [{"field": 1, "id": 1}, {"field": 2, "id": 2}, {"field": 3, "id": 3}] + + pipeline = dlt.pipeline( + pipeline_name="test_staging_cleared", + destination="duckdb", + full_refresh=True, + ) + + info = pipeline.run(test_data, table_name="staging_cleared") + assert_load_info(info) + + with pipeline.sql_client() as client: + with client.execute_query( + f"SELECT * FROM {pipeline.dataset_name}_staging.staging_cleared" + ) as cur: + if truncate: + assert len(cur.fetchall()) == 0 + else: + assert len(cur.fetchall()) == 3 + + with client.execute_query(f"SELECT * FROM {pipeline.dataset_name}.staging_cleared") as cur: + assert len(cur.fetchall()) == 3