diff --git a/docs/website/docs/build-a-pipeline-tutorial.md b/docs/website/docs/build-a-pipeline-tutorial.md index f85d2e19ea..860657b16c 100644 --- a/docs/website/docs/build-a-pipeline-tutorial.md +++ b/docs/website/docs/build-a-pipeline-tutorial.md @@ -262,20 +262,30 @@ In this example, the first pipeline loads the data using `pipedrive_source()`. T #### [Using the `dlt` SQL client](dlt-ecosystem/transformations/sql.md) -Another option is to leverage the `dlt` SQL client to query the loaded data and perform transformations using SQL statements. You can execute SQL statements that change the database schema or manipulate data within tables. Here's an example of inserting a row into the `customers` table using the `dlt` SQL client: +Another option is to leverage the `dlt` SQL client to query the loaded data and perform transformations using SQL statements. You can execute SQL statements that change the database schema or manipulate data within tables. Here's an example of creating a new table with aggregated sales data in duckdb: ```py -pipeline = dlt.pipeline(destination="bigquery", dataset_name="crm") +pipeline = dlt.pipeline(destination="duckdb", dataset_name="crm") with pipeline.sql_client() as client: client.execute_sql( - "INSERT INTO customers VALUES (%s, %s, %s)", 10, "Fred", "fred@fred.com" - ) + """ CREATE TABLE aggregated_sales AS + SELECT + category, + region, + SUM(amount) AS total_sales, + AVG(amount) AS average_sales + FROM + sales + GROUP BY + category, + region; + )""" ``` In this example, the `execute_sql` method of the SQL client allows you to execute SQL statements. The statement inserts a row with values into the `customers` table. -#### [Using Pandas](dlt-ecosystem/transformations/pandas.md) +#### [Using Pandas](dlt-ecosystem/transformations/python.md) You can fetch query results as Pandas data frames and perform transformations using Pandas functionalities. Here's an example of reading data from the `issues` table in DuckDB and counting reaction types using Pandas: @@ -287,11 +297,8 @@ pipeline = dlt.pipeline( dev_mode=True ) -with pipeline.sql_client() as client: - with client.execute_query( - 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' - ) as cursor: - reactions = cursor.df() +# get a dataframe of all reactions from the dataset +reactions = pipeline.dataset().issues.select("reactions__+1", "reactions__-1", "reactions__laugh", "reactions__hooray", "reactions__rocket").df() counts = reactions.sum(0).sort_values(0, ascending=False) ``` diff --git a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md index 2b284e991a..7dcdad2e24 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md @@ -118,7 +118,7 @@ to disable tz adjustments. ## Destination configuration -By default, a DuckDB database will be created in the current working directory with a name `.duckdb` (`chess.duckdb` in the example above). After loading, it is available in `read/write` mode via `with pipeline.sql_client() as con:`, which is a wrapper over `DuckDBPyConnection`. See [duckdb docs](https://duckdb.org/docs/api/python/overview#persistent-storage) for details. +By default, a DuckDB database will be created in the current working directory with a name `.duckdb` (`chess.duckdb` in the example above). After loading, it is available in `read/write` mode via `with pipeline.sql_client() as con:`, which is a wrapper over `DuckDBPyConnection`. See [duckdb docs](https://duckdb.org/docs/api/python/overview#persistent-storage) for details. If you want to read data, use [datasets](../general-usage/dataset-access/dataset) instead of the sql client. The `duckdb` credentials do not require any secret values. [You are free to pass the credentials and configuration explicitly](../../general-usage/destination.md#pass-explicit-credentials). For example: ```py diff --git a/docs/website/docs/dlt-ecosystem/transformations/dbt/dbt.md b/docs/website/docs/dlt-ecosystem/transformations/dbt/dbt.md index 449f8b8bde..59eb340ef2 100644 --- a/docs/website/docs/dlt-ecosystem/transformations/dbt/dbt.md +++ b/docs/website/docs/dlt-ecosystem/transformations/dbt/dbt.md @@ -1,10 +1,10 @@ --- -title: Transform the data with dbt +title: Transforming data with dbt description: Transforming the data loaded by a dlt pipeline with dbt keywords: [transform, dbt, runner] --- -# Transform the data with dbt +# Transforming data with dbt [dbt](https://github.com/dbt-labs/dbt-core) is a framework that allows for the simple structuring of your transformations into DAGs. The benefits of using dbt include: @@ -105,8 +105,8 @@ You can run the example with dbt debug log: `RUNTIME__LOG_LEVEL=DEBUG python dbt ## Other transforming tools -If you want to transform the data before loading, you can use Python. If you want to transform the data after loading, you can use dbt or one of the following: +If you want to transform your data before loading, you can use Python. If you want to transform your data after loading, you can use dbt or one of the following: 1. [`dlt` SQL client.](../sql.md) -2. [Pandas.](../pandas.md) +2. [Python with dataframes or arrow tables.](../python.md) diff --git a/docs/website/docs/dlt-ecosystem/transformations/index.md b/docs/website/docs/dlt-ecosystem/transformations/index.md new file mode 100644 index 0000000000..eb99753027 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/transformations/index.md @@ -0,0 +1,27 @@ +--- +title: Transforming your data +description: How to transform your data +keywords: [datasets, data, access, transformations] +--- +import DocCardList from '@theme/DocCardList'; + +# Transforming data + +If you'd like to transform your data after a pipeline load, you have 3 options available to you: + +* [Using dbt](./dbt/dbt.md) - dlt provides a convenient dbt wrapper to make integration easier +* [Using the `dlt` SQL client](./sql.md) - dlt exposes an sql client to transform data on your destination directly using sql +* [Using python with dataframes or arrow tables](./python.md) - you can also transform your data using arrow tables and dataframes in python + +If you need to preprocess some of your data before it is loaded, you can learn about strategies to: + +* [Rename columns](../general-usage/customising-pipelines/renaming_columns) +* [Pseudonymize columns](../general-usage/customising-pipelines/pseudonymizing_columns) +* [Remove columns](../general-usage/customising-pipelines/removing_columns) + +This is particularly useful if you are trying to remove data related to PII or other sensitive data, you want to remove columns that are not needed for your use case or you are using a destination that does not support certain data types in your source data. + + +# Learn more + + diff --git a/docs/website/docs/dlt-ecosystem/transformations/pandas.md b/docs/website/docs/dlt-ecosystem/transformations/pandas.md deleted file mode 100644 index e431313d1c..0000000000 --- a/docs/website/docs/dlt-ecosystem/transformations/pandas.md +++ /dev/null @@ -1,42 +0,0 @@ ---- -title: Transform the data with Pandas -description: Transform the data loaded by a dlt pipeline with Pandas -keywords: [transform, pandas] ---- - -# Transform the data with Pandas - -You can fetch the results of any SQL query as a dataframe. If the destination supports that -natively (i.e., BigQuery and DuckDB), `dlt` uses the native method. Thanks to this, reading -dataframes can be really fast! The example below reads GitHub reactions data from the `issues` table and -counts the reaction types. - -```py -pipeline = dlt.pipeline( - pipeline_name="github_pipeline", - destination="duckdb", - dataset_name="github_reactions", - dev_mode=True -) -with pipeline.sql_client() as client: - with client.execute_query( - 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' - ) as cursor: - # calling `df` on a cursor, returns the data as a pandas data frame - reactions = cursor.df() -counts = reactions.sum(0).sort_values(0, ascending=False) -``` - -The `df` method above returns all the data in the cursor as a data frame. You can also fetch data in -chunks by passing the `chunk_size` argument to the `df` method. - -Once your data is in a Pandas dataframe, you can transform it as needed. - -## Other transforming tools - -If you want to transform the data before loading, you can use Python. If you want to transform the -data after loading, you can use Pandas or one of the following: - -1. [dbt.](dbt/dbt.md) (recommended) -2. [`dlt` SQL client.](sql.md) - diff --git a/docs/website/docs/dlt-ecosystem/transformations/python.md b/docs/website/docs/dlt-ecosystem/transformations/python.md new file mode 100644 index 0000000000..e88d9943cb --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/transformations/python.md @@ -0,0 +1,109 @@ +--- +title: Transforming data in Python with arrow tables or dataframes +description: Transforming data loaded by a dlt pipeline with pandas dataframes or arrow tables +keywords: [transform, pandas] +--- + +# Transforming data in python with dataframes or arrow tables + +You can transform your data in python using pandas dataframes or arrow tables. To get started, please read the [dataset docs](../general-usage/dataset-access/dataset). + + +## Interactively transforming your data in python + +Using the methods explained in the [dataset docs](../general-usage/dataset-access/dataset), you can fetch data from your destination into a dataframe or arrow table in your local python process and work with it interactively. This even works for filesystem destinations: + + +The example below reads GitHub reactions data from the `issues` table and +counts the reaction types. + +```py +pipeline = dlt.pipeline( + pipeline_name="github_pipeline", + destination="duckdb", + dataset_name="github_reactions", + dev_mode=True +) + +# get a dataframe of all reactions from the dataset +reactions = pipeline.dataset().issues.select("reactions__+1", "reactions__-1", "reactions__laugh", "reactions__hooray", "reactions__rocket").df() + +# calculate and print out the sum of all reactions +counts = reactions.sum(0).sort_values(0, ascending=False) +print(counts) + +# alternatively, you can fetch the data as an arrow table +reactions = pipeline.dataset().issues.select("reactions__+1", "reactions__-1", "reactions__laugh", "reactions__hooray", "reactions__rocket").arrow() +# ... do transformations on the arrow table +``` + +## Persisting your transformed data + +Since dlt supports dataframes and arrow tables from resources directly, you can use the same pipeline to load the transformed data back into the destination. + + +### A simple example + +A simple example that creates a new table from an existing user table but only with columns that do not contain private information. Note that we use the iter_arrow() method on the relation to iterate over the arrow table instead of fetching it all at once. + +```py +pipeline = dlt.pipeline( + pipeline_name="users_pipeline", + destination="duckdb", + dataset_name="users_raw", + dev_mode=True +) + +# get user relation with only a few columns selected, but omitting email and name +users = pipeline.dataset().users.select("age", "amount_spent", "country") + +# load the data into a new table called users_clean in the same dataset +pipeline.run(users.iter_arrow(chunk_size=1000), table_name="users_clean") +``` + +### A more complex example + +The example above could easily be done in SQL. Let's assume you'd like to actually do some in python arrow transformations. For this will create a resources from which we can yield the modified arrow tables. The same is possibly with dataframes. + +```py +import pyarrow.compute as pc + +pipeline = dlt.pipeline( + pipeline_name="users_pipeline", + destination="duckdb", + dataset_name="users_raw", + dev_mode=True +) + +# NOTE: this resource will work like a regular resource and support write_disposition, primary_key, etc. +# NOTE: For selecting only users above 18, we could also use the filter method on the relation with ibis expressions +@dlt.resource(table_name="users_clean") +def users_clean(): + users = pipeline.dataset().users + for arrow_table in users.iter_arrow(chunk_size=1000): + + # we want to filter out users under 18 + age_filter = pc.greater_equal(arrow_table["age"], 18) + arrow_table = arrow_table.filter(age_filter) + + # we want to hash the email column + arrow_table = arrow_table.append_column("email_hash", pc.sha256(arrow_table["email"])) + + # we want to remove the email column and name column + arrow_table = arrow_table.drop(["email", "name"]) + + # yield the transformed arrow table + yield arrow_table + + +pipeline.run(users_clean()) +``` + +## Other transforming tools + +If you want to transform your data before loading, you can use Python. If you want to transform the +data after loading, you can use Pandas or one of the following: + +1. [dbt.](dbt/dbt.md) (recommended) +2. [`dlt` SQL client.](sql.md) + diff --git a/docs/website/docs/dlt-ecosystem/transformations/sql.md b/docs/website/docs/dlt-ecosystem/transformations/sql.md index ffd348d1a0..e6011a71ae 100644 --- a/docs/website/docs/dlt-ecosystem/transformations/sql.md +++ b/docs/website/docs/dlt-ecosystem/transformations/sql.md @@ -1,33 +1,52 @@ --- -title: Transform the data with SQL +title: Transforming data with SQL description: Transforming the data loaded by a dlt pipeline with the dlt SQL client keywords: [transform, sql] --- -# Transform the data using the `dlt` SQL client +# Transforming data using the `dlt` SQL client A simple alternative to dbt is to query the data using the `dlt` SQL client and then perform the -transformations using Python. The `execute_sql` method allows you to execute any SQL statement, +transformations using sql statements in python. The `execute_sql` method allows you to execute any SQL statement, including statements that change the database schema or data in the tables. In the example below, we insert a row into the `customers` table. Note that the syntax is the same as for any standard `dbapi` connection. +:::info +* This method will work for all sql destinations supported by `dlt`, but not for the filesystem destination. +* Read the [sql client docs](../general-usage/dataset-access/dataset) for more information on how to access data with the sql client. +* If you are simply trying to read data, you should use the powerful [dataset interface](../general-usage/dataset-access/dataset) instead. +::: + + +Typically you will use this type of transformation if you can create or update tables directly from existing tables +without any need to insert data from your python environment. + +The example below creates a new table `aggregated_sales` that contains the total and average sales for each category and region + + ```py -pipeline = dlt.pipeline(destination="bigquery", dataset_name="crm") -try: - with pipeline.sql_client() as client: - client.execute_sql( - "INSERT INTO customers VALUES (%s, %s, %s)", - 10, - "Fred", - "fred@fred.com" - ) -except Exception: - ... +pipeline = dlt.pipeline(destination="duckdb", dataset_name="crm") + +# NOTE: this is the duckdb sql dialect, other destinations may use different expressions +with pipeline.sql_client() as client: + client.execute_sql( + """ CREATE OR REPLACE TABLE aggregated_sales AS + SELECT + category, + region, + SUM(amount) AS total_sales, + AVG(amount) AS average_sales + FROM + sales + GROUP BY + category, + region; + )""" ``` -In the case of SELECT queries, the data is returned as a list of rows, with the elements of a row -corresponding to selected columns. +You can also use the `execute_sql` method to run select queries. The data is returned as a list of rows, with the elements of a row +corresponding to selected columns. A more convenient way to extract data is to use dlt datasets. ```py try: @@ -44,9 +63,9 @@ except Exception: ## Other transforming tools -If you want to transform the data before loading, you can use Python. If you want to transform the +If you want to transform your data before loading, you can use Python. If you want to transform the data after loading, you can use SQL or one of the following: 1. [dbt](dbt/dbt.md) (recommended). -2. [Pandas](pandas.md). +2. [Python with dataframes or arrow tables](python.md). diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md index 14d9ecb04b..ea3c9c768b 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md @@ -306,7 +306,7 @@ A resource configuration is used to define a [dlt resource](../../../general-usa - `write_disposition`: The write disposition for the resource. - `primary_key`: The primary key for the resource. - `include_from_parent`: A list of fields from the parent resource to be included in the resource output. See the [resource relationships](#include-fields-from-the-parent-resource) section for more details. -- `processing_steps`: A list of [processing steps](#processing-steps-filter-and-transform-data) to filter and transform the data. +- `processing_steps`: A list of [processing steps](#processing-steps-filter-and-transform-data) to filter and transform your data. - `selected`: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource. - `auth`: An optional `AuthConfig` instance. If passed, is used over the one defined in the [client](#client) definition. Example: ```py diff --git a/docs/website/docs/general-usage/dataset-access/dataset.md b/docs/website/docs/general-usage/dataset-access/dataset.md index 07fe37ecb2..16ee7f870a 100644 --- a/docs/website/docs/general-usage/dataset-access/dataset.md +++ b/docs/website/docs/general-usage/dataset-access/dataset.md @@ -226,6 +226,8 @@ other_pipeline = dlt.pipeline(pipeline_name="other_pipeline", destination="duckd other_pipeline.run(limited_items_relation.iter_arrow(chunk_size=10_000), table_name="limited_items") ``` +Learn more about [transforming data in python with dataframes or arrow tables](../dlt-ecosystem/transformations/python). + ### Using `ibis` to query the data Visit the [Native Ibis integration](./ibis-backend.md) guide to learn more. diff --git a/docs/website/docs/general-usage/destination.md b/docs/website/docs/general-usage/destination.md index fa133b6257..ba42869957 100644 --- a/docs/website/docs/general-usage/destination.md +++ b/docs/website/docs/general-usage/destination.md @@ -128,7 +128,7 @@ When loading data, `dlt` will access the destination in two cases: 1. At the beginning of the `run` method to sync the pipeline state with the destination (or if you call `pipeline.sync_destination` explicitly). 2. In the `pipeline.load` method - to migrate the schema and load the load package. -Obviously, `dlt` will access the destination when you instantiate [sql_client](../dlt-ecosystem/transformations/sql.md). +`dlt` will also access the destination when you instantiate [sql_client](../dlt-ecosystem/transformations/sql.md). :::note `dlt` will not import the destination dependencies or access destination configuration if access is not needed. You can build multi-stage pipelines where steps are executed in separate processes or containers - the `extract` and `normalize` step do not need destination dependencies, configuration, and actual connection. diff --git a/docs/website/docs/general-usage/state.md b/docs/website/docs/general-usage/state.md index 46aa1d63ce..d849a093dc 100644 --- a/docs/website/docs/general-usage/state.md +++ b/docs/website/docs/general-usage/state.md @@ -123,14 +123,13 @@ def comments(user_id: str): # on the first pipeline run, the user_comments table does not yet exist so do not check at all # alternatively, catch DatabaseUndefinedRelation which is raised when an unknown table is selected if not current_pipeline.first_run: - with current_pipeline.sql_client() as client: - # we may get the last user comment or None which we replace with 0 - max_id = ( - client.execute_sql( - "SELECT MAX(_id) FROM user_comments WHERE user_id=?", user_id - )[0][0] - or 0 - ) + # get user comments table from pipeline dataset + user_comments = current_pipeline.dataset().user_comments + # get last user comment id with ibis expression, ibis-extras need to be installed + max_id = user_comments.filter(user_comments.user_id == user_id).select(user_comments["_id"].max()).df() + # if there are no comments for the user, max_id will be None, so we replace it with 0 + max_id = max_id[0] or 0 + # use max_id to filter our results (we simulate an API query) yield from [ {"_id": i, "value": letter, "user_id": user_id} diff --git a/docs/website/docs/tutorial/load-data-from-an-api.md b/docs/website/docs/tutorial/load-data-from-an-api.md index ddfef2cbe8..6a682457f4 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api.md +++ b/docs/website/docs/tutorial/load-data-from-an-api.md @@ -72,7 +72,25 @@ Load package 1692364844.460054 is LOADED and contains no failed jobs `dlt` just created a database schema called **mydata** (the `dataset_name`) with a table **users** in it. -### Explore the data +### Explore the data in python + +You can use dlt [datasets](../general-usage/dataset-access/dataset) to easily query the data in pure python. + +```py +# get the dataset +dataset = dlt.dataset("mydata") + +# get the user relation +table = dataset.users + +# query the full table as dataframe +print(table.df()) + +# query the first 10 rows as arrow table +print(table.limit(10).arrow()) +``` + +### Explore the data in streamlit To allow a sneak peek and basic discovery, you can take advantage of [built-in integration with Streamlit](../reference/command-line-interface#show-tables-and-data-in-the-destination): diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 274f3e82b3..53d1c2507d 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -210,13 +210,10 @@ const sidebars = { }, { type: 'category', - label: 'Transform the data', + label: 'Transforming data', link: { - type: 'generated-index', - title: 'Transform the data', - description: 'If you want to transform the data after loading, you can use one of the following methods: dbt, SQL, Pandas.', - slug: 'dlt-ecosystem/transformations', - keywords: ['transformations'], + type: 'doc', + id: 'dlt-ecosystem/transformations/index', }, items: [ { @@ -227,8 +224,8 @@ const sidebars = { 'dlt-ecosystem/transformations/dbt/dbt_cloud', ] }, + 'dlt-ecosystem/transformations/python', 'dlt-ecosystem/transformations/sql', - 'dlt-ecosystem/transformations/pandas', 'general-usage/customising-pipelines/renaming_columns', 'general-usage/customising-pipelines/pseudonymizing_columns', 'general-usage/customising-pipelines/removing_columns'