Skip to content

Commit

Permalink
Fix imports, cosmetics for core sources
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Sep 17, 2024
1 parent 5bbad87 commit 290562d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ In this example, the source will ignore responses with a status code of 404, res
#### Example B

```py
from requests.models import Response
from dlt.common import json

def set_encoding(response, *args, **kwargs):
# sets the encoding in case it's not correctly detected
response.encoding = 'windows-1252'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,61 @@ certain range.

#### Examples

**1. Incremental loading with the resource `sql_table`**
Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows:
```py
from sql_database import sql_table
from datetime import datetime

# Example: Incrementally loading a table based on a timestamp column
table = sql_table(
table='family',
incremental=dlt.sources.incremental(
'last_modified', # Cursor column name
initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value
)
)

info = pipeline.extract(table, write_disposition="merge")
print(info)
```
Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024).
In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state).

**2. Incremental loading with the source `sql_database`**
To achieve the same using the `sql_database` source, you would specify your cursor as follows:
1. **Incremental loading with the resource `sql_table`**.

```py
source = sql_database().with_resources("family")
#using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024
source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)))
#running the pipeline
info = pipeline.run(source, write_disposition="merge")
print(info)
```

:::info
* When using "merge" write disposition, the source table needs a primary key, which `dlt` automatically sets up.
* `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources.
:::
Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows:

```py
import dlt
from dlt.sources.sql_database import sql_table
from dlt.common.pendulum import pendulum

# Example: Incrementally loading a table based on a timestamp column
table = sql_table(
table='family',
incremental=dlt.sources.incremental(
'last_modified', # Cursor column name
initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value
)
)

pipeline = dlt.pipeline(destination="duckdb")
info = pipeline.extract(table, write_disposition="merge")
print(info)
```

Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024).
In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state).

2. **Incremental loading with the source `sql_database`**.

To achieve the same using the `sql_database` source, you would specify your cursor as follows:

```py
import dlt
from dlt.sources.sql_database import sql_database

source = sql_database().with_resources("family")
#using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024
source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)))

#running the pipeline
pipeline = dlt.pipeline(destination="duckdb")
info = pipeline.run(source, write_disposition="merge")
print(info)
```

:::info
* When using "merge" write disposition, the source table needs a primary key, which `dlt` automatically sets up.
* `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources.
:::

## Parallelized extraction

You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows:
```py
from dlt.sources.sql_database import sql_database, sql_table

database = sql_database().parallelize()
table = sql_table().parallelize()
```
Expand All @@ -83,7 +96,7 @@ The `reflection_level` argument controls how much information is reflected:
- `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.**
- `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types.

If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `json` (JSON in most destinations).
If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `json` (JSON in most destinations).
:::tip
If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are:
1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds
Expand All @@ -104,8 +117,9 @@ In the following example, when loading timestamps from Snowflake, you ensure tha

```py
import dlt
from snowflake.sqlalchemy import TIMESTAMP_NTZ
import sqlalchemy as sa
from dlt.sources.sql_database import sql_database, sql_table
from snowflake.sqlalchemy import TIMESTAMP_NTZ

def type_adapter_callback(sql_type):
if isinstance(sql_type, TIMESTAMP_NTZ): # Snowflake does not inherit from sa.DateTime
Expand Down Expand Up @@ -142,9 +156,9 @@ The examples below show how you can set arguments in any of the `.toml` files (`
[sources.sql_database.chat_message.incremental]
cursor_path="updated_at"
```
This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table.
This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table.

3. Handling separate configurations for database and individual tables
3. Handling separate configurations for database and individual tables
When using the `sql_database()` source, you can separately configure the parameters for the database and for the individual tables.
```toml
[sources.sql_database]
Expand All @@ -155,16 +169,16 @@ The examples below show how you can set arguments in any of the `.toml` files (`

[sources.sql_database.chat_message.incremental]
cursor_path="updated_at"
```
```

The resulting source created below will extract data using **pandas** backend with **chunk_size** 1000. The table **chat_message** will load data incrementally using **updated_at** column. All the other tables will not use incremental loading, and will instead load the full data.

```py
database = sql_database()
```

You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources).
You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources).

It is also possible to set these arguments as environment variables [using the proper naming convention](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables):
```sh
SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@ import Header from '../_source-info-header.md';

## Configuring the SQL Database source

`dlt` sources are python scripts made up of source and resource functions that can be easily customized. The SQL Database verified source has the following built-in source and resource:
`dlt` sources are python scripts made up of source and resource functions that can be easily customized. The SQL Database verified source has the following built-in source and resource:
1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database
2. `sql_table`: a `dlt` resource that loads a single table from the SQL database

Read more about sources and resources here: [General usage: source](../../../general-usage/source.md) and [General usage: resource](../../../general-usage/resource.md).

### Example usage:

1. **Load all the tables from a database**
Calling `sql_database()` loads all tables from the database.
1. **Load all the tables from a database**
Calling `sql_database()` loads all tables from the database.

```py
def load_entire_database() -> None:
import dlt
from dlt.sources.sql_database import sql_database

def load_entire_database() -> None:
# Define the pipeline
pipeline = dlt.pipeline(
pipeline_name="rfam",
destination='synapse',
pipeline_name="rfam",
destination='synapse',
dataset_name="rfam_data"
)

Expand All @@ -41,22 +43,24 @@ Read more about sources and resources here: [General usage: source](../../../gen

# Print load info
print(info)
```
```

2. **Load select tables from a database**
Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database.
2. **Load select tables from a database**
Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database.

```py
def load_select_tables_from_database() -> None:
import dlt
from dlt.sources.sql_database import sql_database

def load_select_tables_from_database() -> None:
# Define the pipeline
pipeline = dlt.pipeline(
pipeline_name="rfam",
destination="postgres",
pipeline_name="rfam",
destination="postgres",
dataset_name="rfam_data"
)

# Fetch tables "family" and "clan"
# Fetch tables "family" and "clan"
source = sql_database().with_resources("family", "clan")

# Run the pipeline
Expand All @@ -65,22 +69,24 @@ Read more about sources and resources here: [General usage: source](../../../gen
# Print load info
print(info)

```
```

3. **Load a standalone table**
3. **Load a standalone table**
Calling `sql_table(table="family")` fetches only the table `"family"`

```py
def load_select_tables_from_database() -> None:
import dlt
from dlt.sources.sql_database import sql_table

def load_select_tables_from_database() -> None:
# Define the pipeline
pipeline = dlt.pipeline(
pipeline_name="rfam",
destination="duckdb",
pipeline_name="rfam",
destination="duckdb",
dataset_name="rfam_data"
)

# Fetch the table "family"
# Fetch the table "family"
table = sql_table(table="family")

# Run the pipeline
Expand All @@ -92,8 +98,8 @@ Read more about sources and resources here: [General usage: source](../../../gen
```

:::tip
We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs.
:::
We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs.
:::


## Configuring the connection
Expand All @@ -106,12 +112,12 @@ We intend our sources to be fully hackable. Feel free to change the source code
"dialect+database_type://username:password@server:port/database_name"
```

For example, to connect to a MySQL database using the `pymysql` dialect you can use the following connection string:
For example, to connect to a MySQL database using the `pymysql` dialect you can use the following connection string:
```py
"mysql+pymysql://rfamro:[email protected]:4497/Rfam"
```

Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows:
Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows:

```py
"mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server"
Expand All @@ -124,30 +130,32 @@ There are several options for adding your connection credentials into your `dlt`

#### 1. Setting them in `secrets.toml` or as environment variables (Recommended)

You can set up credentials using [any method](https://dlthub.com/docs/devel/general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](https://dlthub.com/docs/devel/general-usage/credentials/setup).
You can set up credentials using [any method](https://dlthub.com/docs/devel/general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](https://dlthub.com/docs/devel/general-usage/credentials/setup).


#### 2. Passing them directly in the script
#### 2. Passing them directly in the script
It is also possible to explicitly pass credentials inside the source. Example:

```py
from dlt.sources.credentials import ConnectionStringCredentials
from sql_database import sql_table
from dlt.sources.sql_database import sql_database

credentials = ConnectionStringCredentials(
"mysql+pymysql://[email protected]:4497/Rfam"
)

source = sql_table(credentials).with_resource("family")
source = sql_database(credentials).with_resource("family")
```

:::note
It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code.
:::note
It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code.
:::

### Other connection options
#### Using SqlAlchemy Engine as credentials
#### Using SqlAlchemy Engine as credentials
You are able to pass an instance of SqlAlchemy Engine instead of credentials:
```py
from dlt.sources.sql_database import sql_table
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://[email protected]:4497/Rfam")
Expand Down Expand Up @@ -175,7 +183,10 @@ reflects the database table and preserves original types (i.e. **decimal** / **n
Note that if `pandas` is installed, we'll use it to convert `SQLAlchemy` tuples into `ndarray` as it seems to be 20-30% faster than using `numpy` directly.

```py
import dlt
import sqlalchemy as sa
from dlt.sources.sql_database import sql_database

pipeline = dlt.pipeline(
pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow"
)
Expand Down Expand Up @@ -210,10 +221,13 @@ With the default settings, several data types will be coerced to dtypes in the y
not to use the** `pandas` **backend if your source tables contain date, time, or decimal columns**
:::

Internally dlt uses `pandas.io.sql._wrap_result` to generate `pandas` frames. To adjust [pandas-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`:
Internally dlt uses `pandas.io.sql._wrap_result` to generate `pandas` frames. To adjust [pandas-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`:

```py
import dlt
import sqlalchemy as sa
from dlt.sources.sql_database import sql_database

pipeline = dlt.pipeline(
pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2"
)
Expand Down Expand Up @@ -249,7 +263,7 @@ There are certain limitations when using this backend:
* JSON fields (at least those coming from postgres) are double wrapped in strings. To unwrap this, you can pass the in-built transformation function `unwrap_json_connector_x` (for example, with `add_map`):

```py
from sources.sql_database.helpers import unwrap_json_connector_x
from dlt.sources.sql_database.helpers import unwrap_json_connector_x
```

:::note
Expand All @@ -259,7 +273,9 @@ There are certain limitations when using this backend:
```py
"""This example is taken from the benchmarking tests for ConnectorX performed on the UNSW_Flow dataset (~2mln rows, 25+ columns). Full code here: https://github.com/dlt-hub/sql_database_benchmarking"""
import os
import dlt
from dlt.destinations import filesystem
from dlt.sources.sql_database import sql_table

unsw_table = sql_table(
"postgresql://loader:loader@localhost:5432/dlt_data",
Expand Down
Loading

0 comments on commit 290562d

Please sign in to comment.