diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a8a8cc37ae..85dbf37c97 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -52,6 +52,29 @@ We use **master** branch for hot fixes (including documentation) that needs to b On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**. +### Branch naming rules + +We want to make sure that our git history explains in a human readable way what has been changed with which Branch or PR. To this end, we are using the following branch naming pattern (all lowercase and dashes, no underscores): + +```sh +{category}/{ticket-id}-description-of-the-branch +# example: +feat/4922-add-avro-support +``` + +#### Branch categories + +* **feat** - a new feature that is being implemented (ticket required) +* **fix** - a change that fixes a bug (ticket required) +* **exp** - an experiment where we are testing a new idea or want to demonstrate something to the team, might turn into a `feat` later (ticket encouraged) +* **test** - anything related to the tests (ticket encouraged) +* **blogs** - a new entry to our blog (ticket optional) +* **docs** - a change to our docs (ticket optional) + +#### Ticket Numbers + +We encourage you to attach your branches to a ticket, if none exists, create one and explain what you are doing. For `feat` and `fix` branches, tickets are mandatory, for `exp` and `test` branches encouraged and for `blogs` and `docs` branches optional. + ### Submitting a hotfix We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it. @@ -166,3 +189,4 @@ Once the version has been bumped, follow these steps to publish the new release - [Poetry Documentation](https://python-poetry.org/docs/) If you have any questions or need help, don't hesitate to reach out to us. We're here to help you succeed in contributing to `dlt`. Happy coding! +**** \ No newline at end of file diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index fdd5b50111..bd32c68c49 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -55,7 +55,10 @@ def __init__( self.closed_files: List[DataWriterMetrics] = [] # all fully processed files # buffered items must be less than max items in file self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items) + # Explicitly configured max size supersedes destination limit self.file_max_bytes = file_max_bytes + if self.file_max_bytes is None and _caps: + self.file_max_bytes = _caps.recommended_file_size self.file_max_items = file_max_items # the open function is either gzip.open or open self.open = ( diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e74f5a980d..089b4a1d5e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + recommended_file_size: Optional[int] = None + """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None supported_staging_file_formats: Sequence[TLoaderFileFormat] = None escape_identifier: Callable[[str], str] = None diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index d33466ed5e..39322b43a0 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] + # BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size + caps.recommended_file_size = int(1024 * 1024 * 1024) caps.escape_identifier = escape_bigquery_identifier caps.escape_literal = None caps.format_datetime_literal = format_bigquery_datetime_literal diff --git a/dlt/destinations/impl/mssql/__init__.py b/dlt/destinations/impl/mssql/__init__.py index e9d9fe24fd..f7768d9238 100644 --- a/dlt/destinations/impl/mssql/__init__.py +++ b/dlt/destinations/impl/mssql/__init__.py @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext: # https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN caps.max_identifier_length = 128 caps.max_column_identifier_length = 128 - caps.max_query_length = 4 * 1024 * 64 * 1024 + # A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet + caps.max_query_length = 65536 * 10 caps.is_max_query_length_in_bytes = True caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 8de15e2bd9..6f364c8af1 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -181,7 +181,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index fac6391e01..9c4076cfa7 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -567,16 +567,13 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl: compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs), incremental, ) - except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex: + except InvalidResourceDataTypeFunctionNotAGenerator: # we allow an edge case: resource can return another resource - try: - # actually call the function to see if it contains DltResource - data_ = conf_f(*args, **kwargs) - if not isinstance(data_, DltResource): - raise - r = data_ # type: ignore[assignment] - except Exception: - raise gen_ex from None + # actually call the function to see if it contains DltResource + data_ = conf_f(*args, **kwargs) + if not isinstance(data_, DltResource): + raise + r = data_ # type: ignore[assignment] # consider transformer arguments bound r._args_bound = True # keep explicit args passed diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 97cf23fdfc..b3fc2fbcd4 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration): raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" _load_storage_config: LoadStorageConfiguration = None + # if set to `True`, the staging dataset will be + # truncated after loading the data + truncate_staging_dataset: bool = False def on_resolved(self) -> None: self.pool_type = "none" if self.workers == 1 else "thread" diff --git a/dlt/load/load.py b/dlt/load/load.py index 66ddb1c308..9d898bc54d 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -53,7 +53,7 @@ LoadClientUnsupportedWriteDisposition, LoadClientUnsupportedFileFormats, ) -from dlt.load.utils import get_completed_table_chain, init_client +from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]): @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) ) ): job_client.complete_load(load_id) + self._maybe_trancate_staging_dataset(schema, job_client) + self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: return TRunMetrics(False, len(self.load_storage.list_normalized_packages())) + def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None: + """ + Truncate the staging dataset if one used, + and configuration requests truncation. + + Args: + schema (Schema): Schema to use for the staging dataset. + job_client (JobClientBase): + Job client to use for the staging dataset. + """ + if not ( + isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset + ): + return + + data_tables = schema.data_table_names() + tables = _extend_tables_with_table_chain( + schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset + ) + + try: + with self.get_destination_client(schema) as client: + with client.with_staging_dataset(): # type: ignore + client.initialize_storage(truncate_tables=tables) + + except Exception as exc: + logger.warn( + f"Staging dataset truncate failed due to the following error: {exc}" + " However, it didn't affect the data integrity." + ) + def get_step_info( self, pipeline: SupportsPipeline, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a2ea1936a9..53770f332d 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -554,6 +554,7 @@ def load( with signals.delayed_signals(): runner.run_pool(load_step.config, load_step) info: LoadInfo = self._get_step_info(load_step) + self.first_run = False return info except Exception as l_ex: diff --git a/dlt/sources/helpers/rest_client/detector.py b/dlt/sources/helpers/rest_client/detector.py index 857f6bbb4e..19a1e83a82 100644 --- a/dlt/sources/helpers/rest_client/detector.py +++ b/dlt/sources/helpers/rest_client/detector.py @@ -1,5 +1,6 @@ import re -from typing import List, Dict, Any, Tuple, Union, Optional, Callable, Iterable +from pathlib import PurePosixPath +from typing import List, Dict, Any, Tuple, Union, Callable, Iterable from urllib.parse import urlparse from requests import Response @@ -25,6 +26,7 @@ "payload", "content", "objects", + "values", ] ) @@ -46,7 +48,10 @@ def single_entity_path(path: str) -> bool: """Checks if path ends with path param indicating that single object is returned""" - return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}/?$", path) is not None + # get last path segment + name = PurePosixPath(path).name + # alphabet for a name taken from https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.0.3.md#fixed-fields-6 + return re.search(r"\{([a-zA-Z0-9\.\-_]+)\}", name) is not None def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool: diff --git a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py index 125938ace5..e890469263 100644 --- a/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py +++ b/docs/examples/custom_destination_bigquery/custom_destination_bigquery.py @@ -8,10 +8,10 @@ In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination. We'll learn how to: -- Use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials) -- Use the [custom destination](../dlt-ecosystem/destinations/destination.md) -- Use pyarrow tables to create complex column types on BigQuery -- Use BigQuery `autodetect=True` for schema inference from parquet files +- Use [built-in credentials.](../general-usage/credentials/config_specs#gcp-credentials) +- Use the [custom destination.](../dlt-ecosystem/destinations/destination.md) +- Use pyarrow tables to create complex column types on BigQuery. +- Use BigQuery `autodetect=True` for schema inference from parquet files. """ diff --git a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md index f144da02e6..deaaff3562 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md +++ b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md @@ -9,7 +9,7 @@ keywords: [Snowflake, destination, data warehouse] ## Install `dlt` with Snowflake **To install the `dlt` library with Snowflake dependencies, run:** ```sh -pip install dlt[snowflake] +pip install "dlt[snowflake]" ``` ## Setup Guide diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md index 0022850987..54edac5062 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md @@ -282,7 +282,7 @@ The fields in the endpoint configuration are: - `json`: The JSON payload to be sent with the request (for POST and PUT requests). - `paginator`: Pagination configuration for the endpoint. See the [pagination](#pagination) section for more details. - `data_selector`: A JSONPath to select the data from the response. See the [data selection](#data-selection) section for more details. -- `response_actions`: A list of actions that define how to process the response data. +- `response_actions`: A list of actions that define how to process the response data. See the [response actions](#response-actions) section for more details. - `incremental`: Configuration for [incremental loading](#incremental-loading). ### Pagination @@ -414,8 +414,8 @@ Available authentication types: | Authentication class | String Alias (`type`) | Description | | ------------------- | ----------- | ----------- | | [BearTokenAuth](../../general-usage/http/rest-client.md#bearer-token-authentication) | `bearer` | Bearer token authentication. | -| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `api_key` | Basic HTTP authentication. | -| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `http_basic` | API key authentication with key defined in the query parameters or in the headers. | +| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `http_basic` | Basic HTTP authentication. | +| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `api_key` | API key authentication with key defined in the query parameters or in the headers. | To specify the authentication configuration, use the `auth` field in the [client](#client) configuration: @@ -586,3 +586,33 @@ See the [incremental loading](../../general-usage/incremental-loading.md#increme - `root_key` (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False. - `schema_contract`: Schema contract settings that will be applied to this resource. - `spec`: A specification of configuration and secret values required by the source. + +### Response actions + +The `response_actions` field in the endpoint configuration allows you to specify how to handle specific responses from the API based on status codes or content substrings. This is useful for handling edge cases like ignoring responses on specific conditions. + +:::caution Experimental Feature +This is an experimental feature and may change in future releases. +::: + +#### Example + +```py +{ + "path": "issues", + "response_actions": [ + {"status_code": 404, "action": "ignore"}, + {"content": "Not found", "action": "ignore"}, + {"status_code": 200, "content": "some text", "action": "ignore"}, + ], +} +``` + +In this example, the source will ignore responses with a status code of 404, responses with the content "Not found", and responses with a status code of 200 _and_ content "some text". + +**Fields:** + +- `status_code` (int, optional): The HTTP status code to match. +- `content` (str, optional): A substring to search for in the response content. +- `action` (str): The action to take when the condition is met. Currently supported actions: + - `ignore`: Ignore the response. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md index 970a891e60..38eda15c94 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/slack.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/slack.md @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps: [This command](../../reference/command-line-interface) will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py) - with Google Sheets as the [source](../../general-usage/source) and + with Slack as the [source](../../general-usage/source) and [duckdb](../destinations/duckdb.md) as the [destination](../destinations). 1. If you'd like to use a different destination, simply replace `duckdb` with the name of your diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 8e1f771e47..4780d4be20 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -74,7 +74,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(users) ``` -The result will be the same, but the table is implicitly named `users` based on the resource name. +The result will be the same; note that we do not explicitly pass `table_name="users"` to `pipeline.run`, and the table is implicitly named `users` based on the resource name (e.g., `users()` decorated with `@dlt.resource`). :::note @@ -117,9 +117,7 @@ pipeline = dlt.pipeline( load_info = pipeline.run(data, table_name="users") ``` -Running this pipeline will create two tables in the destination, `users` and `users__pets`. The -`users` table will contain the top level data, and the `users__pets` table will contain the child -data. Here is what the tables may look like: +Running this pipeline will create two tables in the destination, `users` and `users__pets`. The `users` table will contain the top-level data, and the `users__pets` table will contain the child data. Here is what the tables may look like: **mydata.users** @@ -141,21 +139,14 @@ creating and linking children and parent tables. This is how it works: -1. Each row in all (top level and child) data tables created by `dlt` contains UNIQUE column named - `_dlt_id`. -1. Each child table contains FOREIGN KEY column `_dlt_parent_id` linking to a particular row - (`_dlt_id`) of a parent table. -1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in - `_dlt_list_idx`. -1. For tables that are loaded with the `merge` write disposition, we add a ROOT KEY column - `_dlt_root_id`, which links child table to a row in top level table. - +1. Each row in all (top level and child) data tables created by `dlt` contains a `UNIQUE` column named `_dlt_id`. +1. Each child table contains a `FOREIGN KEY` column `_dlt_parent_id` linking to a particular row (`_dlt_id`) of a parent table. +1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`. +1. For tables that are loaded with the `merge` write disposition, we add a root key column `_dlt_root_id`, which links the child table to a row in the top-level table. :::note -If you define your own primary key in a child table, it will be used to link to parent table -and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even in -case the primary key or other unique columns are defined. +If you define your own primary key in a child table, it will be used to link to the parent table, and the `_dlt_parent_id` and `_dlt_list_idx` will not be added. `_dlt_id` is always added even if the primary key or other unique columns are defined. ::: @@ -164,17 +155,15 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. ### Variant columns -If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (ie json file) with a filed with name **answer** and your data contains boolean values, you will get get a column with name **answer** of type **BOOLEAN** in your destination. If for some reason, on next load you get integer value and string value in **answer**, the inconsistent data will go to **answer__v_bigint** and **answer__v_text** columns respectively. -The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of data type stored in the variant. - +If your data has inconsistent types, `dlt` will dispatch the data to several **variant columns**. For example, if you have a resource (i.e., JSON file) with a field with name `answer` and your data contains boolean values, you will get a column with name `answer` of type `BOOLEAN` in your destination. If for some reason, on the next load, you get integer and string values in `answer`, the inconsistent data will go to `answer__v_bigint` and `answer__v_text` columns respectively. +The general naming rule for variant columns is `__v_` where `original_name` is the existing column name (with data type clash) and `type` is the name of the data type stored in the variant. ## Load Packages and Load IDs Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from all the [resources](glossary.md#resource) of a particular [source](glossary.md#source). These packages are uniquely identified by a `load_id`. The `load_id` of a particular package is added to the top data tables -(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status 0 -(when the load process is fully completed). +(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status of 0 (when the load process is fully completed). To illustrate this, let's load more data into the same destination: @@ -189,8 +178,7 @@ data = [ ``` The rest of the pipeline definition remains the same. Running this pipeline will create a new load -package with a new `load_id` and add the data to the existing tables. The `users` table will now -look like this: +package with a new `load_id` and add the data to the existing tables. The `users` table will now look like this: **mydata.users** @@ -210,12 +198,12 @@ The `_dlt_loads` table will look like this: | **1234563456.12345** | quick_start | 0 | 2023-09-12 16:46:03.10662+00 | aOEb...Qekd/58= | The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. -Many destinations do not support distributed and long-running transactions (e.g. Amazon Redshift). +Many destinations do not support distributed and long-running transactions (e.g., Amazon Redshift). In that case, the user may see the partially loaded data. It is possible to filter such data out: any row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to identify and delete data for packages that never got completed. -For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g. +For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g., no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab of the [Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md#exploring-the-data) mentioned above. @@ -231,8 +219,7 @@ Data lineage can be super relevant for architectures like the [data vault architecture](https://www.data-vault.co.uk/what-is-data-vault/) or when troubleshooting. The data vault architecture is a data warehouse that large organizations use when representing the same process across multiple systems, which adds data lineage requirements. Using the pipeline name -and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of -data. +and `load_id` provided out of the box by `dlt`, you are able to identify the source and time of data. You can [save](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) complete lineage info for a particular `load_id` including a list of loaded files, error messages @@ -242,11 +229,7 @@ problems. ## Staging dataset So far we've been using the `append` write disposition in our example pipeline. This means that -each time we run the pipeline, the data is appended to the existing tables. When you use [the -merge write disposition](incremental-loading.md), dlt creates a staging database schema for -staging data. This schema is named `_staging` and contains the same tables as the -destination schema. When you run the pipeline, the data from the staging tables is loaded into the -destination tables in a single atomic transaction. +each time we run the pipeline, the data is appended to the existing tables. When you use the [merge write disposition](incremental-loading.md), dlt creates a staging database schema for staging data. This schema is named `_staging` and contains the same tables as the destination schema. When you run the pipeline, the data from the staging tables is loaded into the destination tables in a single atomic transaction. Let's illustrate this with an example. We change our pipeline to use the `merge` write disposition: @@ -270,8 +253,7 @@ load_info = pipeline.run(users) ``` Running this pipeline will create a schema in the destination database with the name `mydata_staging`. -If you inspect the tables in this schema, you will find `mydata_staging.users` table identical to the -`mydata.users` table in the previous example. +If you inspect the tables in this schema, you will find the `mydata_staging.users` table identical to the`mydata.users` table in the previous example. Here is what the tables may look like after running the pipeline: @@ -290,8 +272,7 @@ Here is what the tables may look like after running the pipeline: | 2 | Bob 2 | rX8ybgTeEmAmmA | 2345672350.98417 | | 3 | Charlie | h8lehZEvT3fASQ | 1234563456.12345 | -Notice that the `mydata.users` table now contains the data from both the previous pipeline run and -the current one. +Notice that the `mydata.users` table now contains the data from both the previous pipeline run and the current one. ## Versioned datasets @@ -322,4 +303,4 @@ load_info = pipeline.run(data, table_name="users") Every time you run this pipeline, a new schema will be created in the destination database with a datetime-based suffix. The data will be loaded into tables in this schema. For example, the first time you run the pipeline, the schema will be named -`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. +`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on. \ No newline at end of file diff --git a/docs/website/docs/general-usage/http/rest-client.md b/docs/website/docs/general-usage/http/rest-client.md index ca39046d35..19cc95bf78 100644 --- a/docs/website/docs/general-usage/http/rest-client.md +++ b/docs/website/docs/general-usage/http/rest-client.md @@ -385,7 +385,7 @@ class PostBodyPaginator(BasePaginator): # Add the cursor to the request body request.json["cursor"] = self.cursor - + client = RESTClient( base_url="https://api.example.com", paginator=PostBodyPaginator() @@ -527,4 +527,70 @@ from dlt.sources.helpers.rest_client import paginate for page in paginate("https://api.example.com/posts"): print(page) -``` \ No newline at end of file +``` + +## Troubleshooting + +### `RESTClient.get()` and `RESTClient.post()` methods + +These methods work similarly to the [get()](https://docs.python-requests.org/en/latest/api/#requests.get) and [post()](https://docs.python-requests.org/en/latest/api/#requests.post) functions +from the Requests library. They return a [Response](https://docs.python-requests.org/en/latest/api/#requests.Response) object that contains the response data. +You can inspect the `Response` object to get the `response.status_code`, `response.headers`, and `response.content`. For example: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +client = RESTClient(base_url="https://api.example.com") +response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token")) # type: ignore + +print(response.status_code) +print(response.headers) +print(response.content) +``` + +### `RESTClient.paginate()` + +Debugging `paginate()` is trickier because it's a generator function that yields [`PageData`](#pagedata) objects. Here's several ways to debug the `paginate()` method: + +1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests: + +```sh +RUNTIME__LOG_LEVEL=INFO python my_script.py +``` + +2. Use the [`PageData`](#pagedata) instance to inspect the [request](https://docs.python-requests.org/en/latest/api/#requests.Request) +and [response](https://docs.python-requests.org/en/latest/api/#requests.Response) objects: + +```py +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator + +client = RESTClient( + base_url="https://api.example.com", + paginator=JSONResponsePaginator(next_url_path="pagination.next") +) + +for page in client.paginate("/posts"): + print(page.request) + print(page.response) +``` + +3. Use the `hooks` parameter to add custom response handlers to the `paginate()` method: + +```py +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth + +def response_hook(response, **kwargs): + print(response.status_code) + print(f"Content: {response.content}") + print(f"Request: {response.request.body}") + # Or import pdb; pdb.set_trace() to debug + +for page in client.paginate( + "/posts", + auth=BearerTokenAuth(token="your_access_token"), # type: ignore + hooks={"response": [response_hook]} +): + print(page) +``` diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 776329bcf4..0374802b7d 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -32,6 +32,10 @@ The library will create or update tables, infer data types, and handle nested da ]}> +:::tip +Looking to use a REST API as a source? Explore our new [REST API generic source](dlt-ecosystem/verified-sources/rest_api) for a declarative way to load data. +::: + diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index 253a27d942..9c52f58caa 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -108,6 +108,12 @@ behind. In `config.toml`: load.delete_completed_jobs=true ``` +Also, by default, `dlt` leaves data in staging dataset, used during merge and replace load for deduplication. In order to clear it, put the following line in `config.toml`: + +```toml +load.truncate_staging_dataset=true +``` + ## Using slack to send messages `dlt` provides basic support for sending slack messages. You can configure Slack incoming hook via diff --git a/docs/website/docs/tutorial/grouping-resources.md b/docs/website/docs/tutorial/grouping-resources.md index 3a05f7940c..3ba95b7971 100644 --- a/docs/website/docs/tutorial/grouping-resources.md +++ b/docs/website/docs/tutorial/grouping-resources.md @@ -14,6 +14,9 @@ This tutorial continues the [previous](load-data-from-an-api) part. We'll use th In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepare to load comments from the API as well. Here's a sample [dlt resource](../general-usage/resource) that does that: ```py +import dlt +from dlt.sources.helpers.rest_client import paginate + @dlt.resource( table_name="comments", write_disposition="merge", @@ -22,17 +25,11 @@ In the previous tutorial, we loaded issues from the GitHub API. Now we'll prepar def get_comments( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = "https://api.github.com/repos/dlt-hub/dlt/comments?per_page=100" - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/comments", + params={"per_page": 100} + ): + yield page ``` We can load this resource separately from the issues resource, however loading both issues and comments in one go is more efficient. To do that, we'll use the `@dlt.source` decorator on a function that returns a list of resources: @@ -47,7 +44,7 @@ def github_source(): ```py import dlt -from dlt.sources.helpers import requests +from dlt.sources.helpers.rest_client import paginate @dlt.resource( table_name="issues", @@ -57,21 +54,17 @@ from dlt.sources.helpers import requests def get_issues( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = ( - "https://api.github.com/repos/dlt-hub/dlt/issues" - f"?since={updated_at.last_value}&per_page=100" - "&sort=updated&directions=desc&state=open" - ) - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/issues", + params={ + "since": updated_at.last_value, + "per_page": 100, + "sort": "updated", + "directions": "desc", + "state": "open", + } + ): + yield page @dlt.resource( @@ -82,20 +75,14 @@ def get_issues( def get_comments( updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): - url = ( - "https://api.github.com/repos/dlt-hub/dlt/comments" - "?per_page=100" - ) - - while True: - response = requests.get(url) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/comments", + params={ + "since": updated_at.last_value, + "per_page": 100, + } + ): + yield page @dlt.source @@ -124,18 +111,8 @@ from dlt.sources.helpers import requests BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt" def fetch_github_data(endpoint, params={}): - """Fetch data from GitHub API based on endpoint and params.""" url = f"{BASE_GITHUB_URL}/{endpoint}" - - while True: - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + return paginate(url, params=params) @dlt.source def github_source(): @@ -164,21 +141,16 @@ For the next step we'd want to get the [number of repository clones](https://doc Let's handle this by changing our `fetch_github_data()` first: ```py -def fetch_github_data(endpoint, params={}, access_token=None): - """Fetch data from GitHub API based on endpoint and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +def fetch_github_data(endpoint, params={}, access_token=None): url = f"{BASE_GITHUB_URL}/{endpoint}" + return paginate( + url, + params=params, + auth=BearerTokenAuth(token=access_token) if access_token else None, + ) - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] @dlt.source def github_source(access_token): @@ -229,28 +201,7 @@ access_token = "ghp_A...3aRY" Now we can run the script and it will load the data from the `traffic/clones` endpoint: ```py -import dlt -from dlt.sources.helpers import requests - -BASE_GITHUB_URL = "https://api.github.com/repos/dlt-hub/dlt" - - -def fetch_github_data(endpoint, params={}, access_token=None): - """Fetch data from GitHub API based on endpoint and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} - - url = f"{BASE_GITHUB_URL}/{endpoint}" - - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] - +... @dlt.source def github_source( @@ -287,19 +238,12 @@ BASE_GITHUB_URL = "https://api.github.com/repos/{repo_name}" def fetch_github_data(repo_name, endpoint, params={}, access_token=None): """Fetch data from GitHub API based on repo_name, endpoint, and params.""" - headers = {"Authorization": f"Bearer {access_token}"} if access_token else {} - url = BASE_GITHUB_URL.format(repo_name=repo_name) + f"/{endpoint}" - - while True: - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - yield response.json() - - # Get next page - if "next" not in response.links: - break - url = response.links["next"]["url"] + return paginate( + url, + params=params, + auth=BearerTokenAuth(token=access_token) if access_token else None, + ) @dlt.source @@ -347,5 +291,6 @@ Interested in learning more? Here are some suggestions: - [Pass config and credentials into your sources and resources](../general-usage/credentials). - [Run in production: inspecting, tracing, retry policies and cleaning up](../running-in-production/running). - [Run resources in parallel, optimize buffers and local storage](../reference/performance.md) + - [Use REST API client helpers](../general-usage/http/rest-client.md) to simplify working with REST APIs. 3. Check out our [how-to guides](../walkthroughs) to get answers to some common questions. 4. Explore the [Examples](../examples) section to see how dlt can be used in real-world scenarios diff --git a/docs/website/docs/tutorial/load-data-from-an-api.md b/docs/website/docs/tutorial/load-data-from-an-api.md index 31a2c1592d..ec6136b6d3 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api.md +++ b/docs/website/docs/tutorial/load-data-from-an-api.md @@ -44,7 +44,7 @@ dlt pipeline github_issues show ## Append or replace your data -Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have a new folder created daily with `json` file logs, and you want to ingest them. +Try running the pipeline again with `python github_issues.py`. You will notice that the **issues** table contains two copies of the same data. This happens because the default load mode is `append`. It is very useful, for example, when you have daily data updates and you want to ingest them. To get the latest data, we'd need to run the script again. But how to do that without duplicating the data? One option is to tell `dlt` to replace the data in existing tables in the destination by using `replace` write disposition. Change the `github_issues.py` script to the following: @@ -148,6 +148,55 @@ and `updated_at.last_value` to tell GitHub to return issues updated only **after [Learn more about merge write disposition](../general-usage/incremental-loading#merge-incremental_loading). +## Using pagination helper + +In the previous examples, we used the `requests` library to make HTTP requests to the GitHub API and handled pagination manually. `dlt` has the built-in [REST client](../general-usage/http/rest-client.md) that simplifies API requests. We'll pick the `paginate()` helper from it for the next example. The `paginate` function takes a URL and optional parameters (quite similar to `requests`) and returns a generator that yields pages of data. + +Here's how the updated script looks: + +```py +import dlt +from dlt.sources.helpers.rest_client import paginate + +@dlt.resource( + table_name="issues", + write_disposition="merge", + primary_key="id", +) +def get_issues( + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") +): + for page in paginate( + "https://api.github.com/repos/dlt-hub/dlt/issues", + params={ + "since": updated_at.last_value, + "per_page": 100, + "sort": "updated", + "direction": "desc", + "state": "open", + }, + ): + yield page + +pipeline = dlt.pipeline( + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", +) +load_info = pipeline.run(get_issues) +row_counts = pipeline.last_trace.last_normalize_info + +print(row_counts) +print("------") +print(load_info) +``` + +Let's zoom in on the changes: + +1. The `while` loop that handled pagination is replaced with reading pages from the `paginate()` generator. +2. `paginate()` takes the URL of the API endpoint and optional parameters. In this case, we pass the `since` parameter to get only issues updated after the last pipeline run. +3. We're not explicitly setting up pagination, `paginate()` handles it for us. Magic! Under the hood, `paginate()` analyzes the response and detects the pagination method used by the API. Read more about pagination in the [REST client documentation](../general-usage/http/rest-client.md#paginating-api-responses). + ## Next steps Continue your journey with the [Resource Grouping and Secrets](grouping-resources) tutorial. diff --git a/docs/website/docs/walkthroughs/adjust-a-schema.md b/docs/website/docs/walkthroughs/adjust-a-schema.md index cfe2d056b0..b0a9a9ce05 100644 --- a/docs/website/docs/walkthroughs/adjust-a-schema.md +++ b/docs/website/docs/walkthroughs/adjust-a-schema.md @@ -121,6 +121,32 @@ Do not rename the tables or columns in the yaml file. `dlt` infers those from th You can [adjust the schema](../general-usage/resource.md#adjust-schema) in Python before resource is loaded. ::: +### Reorder columns +To reorder the columns in your dataset, follow these steps: + +1. Initial Run: Execute the pipeline to obtain the import and export schemas. +1. Modify Export Schema: Adjust the column order as desired in the export schema. +1. Sync Import Schema: Ensure that these changes are mirrored in the import schema to maintain consistency. +1. Delete Dataset: Remove the existing dataset to prepare for the reload. +1. Reload Data: Reload the data. The dataset should now reflect the new column order as specified in the import YAML. + +These steps ensure that the column order in your dataset matches your specifications. + +**Another approach** to reorder columns is to use the `add_map` function. For instance, to rearrange ‘column1’, ‘column2’, and ‘column3’, you can proceed as follows: + +```py +# Define the data source and reorder columns using add_map +data_source = resource().add_map(lambda row: { + 'column3': row['column3'], + 'column1': row['column1'], + 'column2': row['column2'] +}) + +# Run the pipeline +load_info = pipeline.run(data_source) +``` + +In this example, the `add_map` function reorders columns by defining a new mapping. The lambda function specifies the desired order by rearranging the key-value pairs. When the pipeline runs, the data will load with the columns in the new order. ### Load data as json instead of generating child table or columns from flattened dicts diff --git a/docs/website/docs/walkthroughs/create-a-pipeline.md b/docs/website/docs/walkthroughs/create-a-pipeline.md index 1d5974efbe..cbbbd73fc3 100644 --- a/docs/website/docs/walkthroughs/create-a-pipeline.md +++ b/docs/website/docs/walkthroughs/create-a-pipeline.md @@ -1,31 +1,46 @@ --- title: Create a pipeline description: How to create a pipeline -keywords: [how to, create a pipeline] +keywords: [how to, create a pipeline, rest client] --- # Create a pipeline -Follow the steps below to create a [pipeline](../general-usage/glossary.md#pipeline) from the -WeatherAPI.com API to DuckDB from scratch. The same steps can be repeated for any source and -destination of your choice—use `dlt init ` and then build the pipeline for -that API instead. +This guide walks you through creating a pipeline that uses our [REST API Client](../general-usage/http/rest-client) +to connect to [DuckDB](../dlt-ecosystem/destinations/duckdb). +:::tip +We're using DuckDB as a destination here, but you can adapt the steps to any [source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/) and [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) by +using the [command](../reference/command-line-interface#dlt-init) `dlt init ` and tweaking the pipeline accordingly. +::: -Please make sure you have [installed `dlt`](../reference/installation.md) before following the +Please make sure you have [installed `dlt`](../reference/installation) before following the steps below. +## Task overview + +Imagine you want to analyze issues from a GitHub project locally. +To achieve this, you need to write code that accomplishes the following: + +1. Constructs a correct request. +2. Authenticates your request. +3. Fetches and handles paginated issue data. +4. Stores the data for analysis. + +This may sound complicated, but dlt provides a [REST API Client](../general-usage/http/rest-client) that allows you to focus more on your data rather than on managing API interactions. + + ## 1. Initialize project Create a new empty directory for your `dlt` project by running: ```sh -mkdir weatherapi_duckdb && cd weatherapi_duckdb +mkdir github_api_duckdb && cd github_api_duckdb ``` Start a `dlt` project with a pipeline template that loads data to DuckDB by running: ```sh -dlt init weatherapi duckdb +dlt init github_api duckdb ``` Install the dependencies necessary for DuckDB: @@ -34,114 +49,127 @@ Install the dependencies necessary for DuckDB: pip install -r requirements.txt ``` -## 2. Add WeatherAPI.com API credentials +## 2. Obtain and add API credentials from GitHub -You will need to [sign up for the WeatherAPI.com API](https://www.weatherapi.com/signup.aspx). +You will need to [sign in](https://github.com/login) to your GitHub account and create your access token via [Personal access tokens page](https://github.com/settings/tokens). -Once you do this, you should see your `API Key` at the top of your -[user page](https://www.weatherapi.com/my/). - -Copy the value of the API key into `.dlt/secrets.toml`: +Copy your new access token over to `.dlt/secrets.toml`: ```toml [sources] api_secret_key = '' ``` -The **secret name** corresponds to the **argument name** in the source function. Below `api_secret_key` [will get its value](../general-usage/credentials/configuration.md#general-usage-and-an-example) from `secrets.toml` when `weatherapi_source()` is called. + +This token will be used by `github_api_source()` to authenticate requests. + +The **secret name** corresponds to the **argument name** in the source function. +Below `api_secret_key` [will get its value](../general-usage/credentials/configuration#allow-dlt-to-pass-the-config-and-secrets-automatically) +from `secrets.toml` when `github_api_source()` is called. + ```py @dlt.source -def weatherapi_source(api_secret_key=dlt.secrets.value): - ... +def github_api_source(api_secret_key: str = dlt.secrets.value): + return github_api_resource(api_secret_key=api_secret_key) ``` -Run the `weatherapi.py` pipeline script to test that authentication headers look fine: +Run the `github_api.py` pipeline script to test that authentication headers look fine: ```sh -python3 weatherapi.py +python github_api.py ``` Your API key should be printed out to stdout along with some test data. -## 3. Request data from the WeatherAPI.com API +## 3. Request project issues from then GitHub API -Replace the definition of the `weatherapi_resource` function definition in the `weatherapi.py` -pipeline script with a call to the WeatherAPI.com API: -```py -@dlt.resource(write_disposition="append") -def weatherapi_resource(api_secret_key=dlt.secrets.value): - url = "https://api.weatherapi.com/v1/current.json" - params = { - "q": "NYC", - "key": api_secret_key - } - response = requests.get(url, params=params) - response.raise_for_status() - yield response.json() -``` +:::tip +We will use `dlt` repository as an example GitHub project https://github.com/dlt-hub/dlt, feel free to replace it with your own repository. +::: -Run the `weatherapi.py` pipeline script to test that the API call works: +Modify `github_api_resource` in `github_api.py` to request issues data from your GitHub project's API: -```sh -python3 weatherapi.py +```py +from dlt.sources.helpers.rest_client import paginate +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator + +@dlt.resource(write_disposition="replace") +def github_api_resource(api_secret_key: str = dlt.secrets.value): + url = "https://api.github.com/repos/dlt-hub/dlt/issues" + + for page in paginate( + url, + auth=BearerTokenAuth(api_secret_key), # type: ignore + paginator=HeaderLinkPaginator(), + params={"state": "open"} + ): + yield page ``` -This should print out the weather in New York City right now. - ## 4. Load the data -Remove the `exit()` call from the `main` function in `weatherapi.py`, so that running the -`python3 weatherapi.py` command will now also run the pipeline: +Uncomment the commented out code in `main` function in `github_api.py`, so that running the +`python github_api.py` command will now also run the pipeline: ```py if __name__=='__main__': - # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name='weatherapi', + pipeline_name='github_api_pipeline', destination='duckdb', - dataset_name='weatherapi_data' + dataset_name='github_api_data' ) # print credentials by running the resource - data = list(weatherapi_resource()) + data = list(github_api_resource()) # print the data yielded from resource print(data) # run the pipeline with your parameters - load_info = pipeline.run(weatherapi_source()) + load_info = pipeline.run(github_api_source()) # pretty print the information on data that was loaded print(load_info) ``` -Run the `weatherapi.py` pipeline script to load data into DuckDB: + +Run the `github_api.py` pipeline script to test that the API call works: ```sh -python3 weatherapi.py +python github_api.py ``` -Then this command to see that the data loaded: +This should print out JSON data containing the issues in the GitHub project. + +It also prints `load_info` object. + +Let's explore the loaded data with the [command](../reference/command-line-interface#show-tables-and-data-in-the-destination) `dlt pipeline show`. + +:::info +Make sure you have `streamlit` installed `pip install streamlit` +::: ```sh -dlt pipeline weatherapi show +dlt pipeline github_api_pipeline show ``` This will open a Streamlit app that gives you an overview of the data loaded. ## 5. Next steps -Now that you have a working pipeline, you have options for what to learn next: +With a functioning pipeline, consider exploring: +- Our [REST Client](../general-usage/http/rest-client). - [Deploy this pipeline with GitHub Actions](deploy-a-pipeline/deploy-with-github-actions), so that the data is automatically loaded on a schedule. - Transform the [loaded data](../dlt-ecosystem/transformations) with dbt or in Pandas DataFrames. -- Learn how to [run](../running-in-production/running.md), - [monitor](../running-in-production/monitoring.md), and - [alert](../running-in-production/alerting.md) when you put your pipeline in production. +- Learn how to [run](../running-in-production/running), + [monitor](../running-in-production/monitoring), and + [alert](../running-in-production/alerting) when you put your pipeline in production. - Try loading data to a different destination like - [Google BigQuery](../dlt-ecosystem/destinations/bigquery.md), - [Amazon Redshift](../dlt-ecosystem/destinations/redshift.md), or - [Postgres](../dlt-ecosystem/destinations/postgres.md). + [Google BigQuery](../dlt-ecosystem/destinations/bigquery), + [Amazon Redshift](../dlt-ecosystem/destinations/redshift), or + [Postgres](../dlt-ecosystem/destinations/postgres). diff --git a/pyproject.toml b/pyproject.toml index 4bd62ce03b..1a946f5e10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "dlt" version = "0.4.11" description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run." authors = ["dltHub Inc. "] -maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] +maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Anton Burnashev ", "David Scharf " ] readme = "README.md" license = "Apache-2.0" homepage = "https://github.com/dlt-hub" @@ -13,6 +13,7 @@ classifiers = [ "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Topic :: Software Development :: Libraries", + "Typing :: Typed", "Operating System :: MacOS :: MacOS X", "Operating System :: POSIX :: Linux", "Operating System :: Microsoft :: Windows",] diff --git a/tests/common/data_writers/utils.py b/tests/common/data_writers/utils.py index 2cb440bde1..e6e377b7d0 100644 --- a/tests/common/data_writers/utils.py +++ b/tests/common/data_writers/utils.py @@ -1,5 +1,5 @@ import os -from typing import Type +from typing import Type, Optional from dlt.common.data_writers.buffered import BufferedDataWriter from dlt.common.data_writers.writers import TWriter, ALL_WRITERS @@ -18,8 +18,8 @@ def get_writer( writer: Type[TWriter], buffer_max_items: int = 10, - file_max_items: int = 10, - file_max_bytes: int = None, + file_max_items: Optional[int] = 10, + file_max_bytes: Optional[int] = None, disable_compression: bool = False, caps: DestinationCapabilitiesContext = None, ) -> BufferedDataWriter[TWriter]: diff --git a/tests/extract/data_writers/test_buffered_writer.py b/tests/extract/data_writers/test_buffered_writer.py index 82b81a1cd7..b6da132de9 100644 --- a/tests/extract/data_writers/test_buffered_writer.py +++ b/tests/extract/data_writers/test_buffered_writer.py @@ -2,6 +2,7 @@ import pytest import time from typing import Iterator, Type +from uuid import uuid4 from dlt.common.data_writers.exceptions import BufferedDataWriterClosed from dlt.common.data_writers.writers import ( @@ -11,7 +12,7 @@ JsonlWriter, ALL_WRITERS, ) -from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.storages.file_storage import FileStorage @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data metrics = writer.import_file( "tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0) ) + + +@pytest.mark.parametrize( + "disable_compression", [True, False], ids=["no_compression", "compression"] +) +@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS) +def test_rotation_on_destination_caps_recommended_file_size( + disable_compression: bool, writer_type: Type[DataWriter] +) -> None: + caps = DestinationCapabilitiesContext.generic_capabilities() + caps.recommended_file_size = int(250 * 1024) + columns = {"id": new_column("id", "text")} + with get_writer( + writer_type, + disable_compression=disable_compression, + buffer_max_items=100, + file_max_items=None, + file_max_bytes=None, + caps=caps, + ) as writer: + for i in range(8): + # Data chunk approximately 40kb serialized + items = [{"id": str(uuid4())} for _ in range(1000)] + writer.write_data_item(items, columns) + if i < 5: + assert not writer.closed_files + + if i > 5: + # We should have written atleast 250kb by now and have rotated the file + assert len(writer.closed_files) == 1 + + # Check the files that were written are all within the recommended size + 1 chunk + assert len(writer.closed_files) == 2 + for file in writer.closed_files: + assert file.file_size < caps.recommended_file_size + 1024 * 50 diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 5e85552d73..c6a675a8d3 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -880,6 +880,17 @@ def rv_resource(name: str): assert list(r) == [1, 2, 3] +def test_standalone_resource_returning_resource_exception() -> None: + @dlt.resource(standalone=True) + def rv_resource(uniq_name: str = dlt.config.value): + return dlt.resource([1, 2, 3], name=uniq_name, primary_key="value") + + # pass through of the exception in `rv_resource` when it returns, not yields + with pytest.raises(ConfigFieldMissingException) as conf_ex: + rv_resource() + assert conf_ex.value.fields == ["uniq_name"] + + def test_resource_rename_credentials_separation(): os.environ["SOURCES__TEST_DECORATORS__STANDALONE_SIGNATURE__SECRET_END"] = "5" assert list(standalone_signature(1)) == [1, 2, 3, 4] diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 845800e47f..533d16c998 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -384,7 +384,17 @@ def dag_parallel(): with mock.patch("dlt.helpers.airflow_helper.logger.warn") as warn_mock: dag_def = dag_parallel() dag_def.test() - warn_mock.assert_called_once() + warn_mock.assert_has_calls( + [ + mock.call( + "The resource resource2 in task" + " mock_data_incremental_source_resource1-resource2 is using incremental loading" + " and may modify the state. Resources that modify the state should not run in" + " parallel within the single pipeline as the state will not be correctly" + " merged. Please use 'serialize' or 'parallel-isolated' modes instead." + ) + ] + ) def test_parallel_isolated_run(): diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index ca962adb16..4519f1ea83 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -1,6 +1,7 @@ import posixpath import os from unittest import mock +from pathlib import Path import pytest @@ -117,16 +118,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id1 = load_info layout = client.config.layout # this path will be kept after replace - job_2_load_1_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[1], - client.schema.name, - load_id1, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_2_load_1_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[1], + client.schema.name, + load_id1, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) with perform_load( @@ -135,16 +138,18 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client, _, root_path, load_id2 = load_info # this one we expect to be replaced with - job_1_load_2_path = posixpath.join( - root_path, - create_path( - layout, - NORMALIZED_FILES[0], - client.schema.name, - load_id2, - load_package_timestamp=timestamp, - extra_placeholders=client.config.extra_placeholders, - ), + job_1_load_2_path = Path( + posixpath.join( + root_path, + create_path( + layout, + NORMALIZED_FILES[0], + client.schema.name, + load_id2, + load_package_timestamp=timestamp, + extra_placeholders=client.config.extra_placeholders, + ), + ) ) # First file from load1 remains, second file is replaced by load2 @@ -159,7 +164,7 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) ls = set(paths) assert ls == {job_2_load_1_path, job_1_load_2_path} @@ -210,7 +215,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None ) for job in jobs2 ] - expected_files = sorted([posixpath.join(root_path, fn) for fn in expected_files]) + expected_files = sorted([Path(posixpath.join(root_path, fn)) for fn in expected_files]) # type: ignore[misc] paths = [] for basedir, _dirs, files in client.fs_client.walk( @@ -222,5 +227,5 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for f in files: if f == INIT_FILE_NAME: continue - paths.append(posixpath.join(basedir, f)) + paths.append(Path(posixpath.join(basedir, f))) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 7680bc6e90..5f24daf57f 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -301,7 +301,7 @@ def count(*args, **kwargs) -> Any: for file in files: if ".jsonl" in file: - expected_files.add(posixpath.join(basedir, file)) + expected_files.add(Path(posixpath.join(basedir, file))) for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] @@ -321,7 +321,7 @@ def count(*args, **kwargs) -> Any: full_path = posixpath.join(client.dataset_path, path) # type: ignore[attr-defined] assert client.fs_client.exists(full_path) # type: ignore[attr-defined] if ".jsonl" in full_path: - known_files.add(full_path) + known_files.add(Path(full_path)) assert expected_files == known_files assert known_files diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a498b570a0..d98f335d16 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -10,6 +10,7 @@ from dlt.common.pipeline import SupportsPipeline from dlt.common.destination import Destination from dlt.common.destination.exceptions import DestinationHasFailedJobs +from dlt.common.destination.reference import WithStagingDataset from dlt.common.schema.exceptions import CannotCoerceColumnException from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME @@ -896,6 +897,7 @@ def test_pipeline_upfront_tables_two_loads( # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + os.environ["TRUNCATE_STAGING_DATASET"] = "True" pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", @@ -1001,6 +1003,21 @@ def table_3(make_data=False): is True ) + job_client, _ = pipeline._get_destination_clients(schema) + + if destination_config.staging and isinstance(job_client, WithStagingDataset): + for i in range(1, 4): + with pipeline.sql_client() as client: + table_name = f"table_{i}" + + if job_client.should_load_data_to_staging_dataset( + job_client.schema.tables[table_name] + ): + with client.with_staging_dataset(staging=True): + tab_name = client.make_qualified_table_name(table_name) + with client.execute_query(f"SELECT * FROM {tab_name}") as cur: + assert len(cur.fetchall()) == 0 + # @pytest.mark.skip(reason="Finalize the test: compare some_data values to values from database") # @pytest.mark.parametrize( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index a828de40fd..1c4383405b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,9 +5,9 @@ import logging import os import random +import threading from time import sleep from typing import Any, Tuple, cast -import threading from tenacity import retry_if_exception, Retrying, stop_after_attempt import pytest @@ -2230,3 +2230,33 @@ def stateful_resource(): assert len(fs_client.list_table_files("_dlt_loads")) == 2 assert len(fs_client.list_table_files("_dlt_version")) == 1 assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1 + + +@pytest.mark.parametrize("truncate", (True, False)) +def test_staging_dataset_truncate(truncate) -> None: + dlt.config["truncate_staging_dataset"] = truncate + + @dlt.resource(write_disposition="merge", merge_key="id") + def test_data(): + yield [{"field": 1, "id": 1}, {"field": 2, "id": 2}, {"field": 3, "id": 3}] + + pipeline = dlt.pipeline( + pipeline_name="test_staging_cleared", + destination="duckdb", + full_refresh=True, + ) + + info = pipeline.run(test_data, table_name="staging_cleared") + assert_load_info(info) + + with pipeline.sql_client() as client: + with client.execute_query( + f"SELECT * FROM {pipeline.dataset_name}_staging.staging_cleared" + ) as cur: + if truncate: + assert len(cur.fetchall()) == 0 + else: + assert len(cur.fetchall()) == 3 + + with client.execute_query(f"SELECT * FROM {pipeline.dataset_name}.staging_cleared") as cur: + assert len(cur.fetchall()) == 3 diff --git a/tests/sources/helpers/rest_client/test_detector.py b/tests/sources/helpers/rest_client/test_detector.py index f01f9409a1..6511b472fb 100644 --- a/tests/sources/helpers/rest_client/test_detector.py +++ b/tests/sources/helpers/rest_client/test_detector.py @@ -406,16 +406,20 @@ def test_find_paginator(test_case) -> None: [ "/users/{user_id}", "/api/v1/products/{product_id}/", - # those are not valid paths - # "/api/v1/products/{product_id}//", - # "/api/v1/products/{product_id}?param1=value1", - # "/api/v1/products/{product_id}#section", - # "/api/v1/products/{product_id}/#section", + "/api/v1/products/{product_id}//", + "/api/v1/products/{product_id}?param1=value1", + "/api/v1/products/{product_id}#section", + "/api/v1/products/{product_id}.json", + "/api/v1/products/{product_id}.json/", + "/api/v1/products/{product_id}_data", + "/api/v1/products/{product_id}_data?param=true", "/users/{user_id}/posts/{post_id}", "/users/{user_id}/posts/{post_id}/comments/{comment_id}", "{entity}", "/{entity}", "/{user_123}", + "/users/{user-id}", + "/users/{123}", ], ) def test_single_entity_path_valid(path): @@ -430,8 +434,7 @@ def test_single_entity_path_valid(path): "/users/{user_id}/details", "/", "/{}", - "/users/{123}", - "/users/{user-id}", + "/api/v1/products/{product_id}/#section", "/users/{user id}", "/users/{user_id}/{", # Invalid ending ],