Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash authored Sep 19, 2024
1 parent 30bd323 commit 342967c
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def tracked_data():

### 2. Create `fetch_average_price` function

This particular function retrieves the average price of a device by utilizing SerpAPI and Google shopping listings. To filter the data, the function uses `dlt` state, and only fetches prices from SerpAPI for devices that have not been updated in the most recent run or for those that were loaded more than 180 days in the past.
This particular function retrieves the average price of a device by utilizing SerpAPI and Google shopping listings. To filter the data, the function uses dlt state, and only fetches prices from SerpAPI for devices that have not been updated in the most recent run or for those that were loaded more than 180 days in the past.

The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the API token key.

Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/general-usage/destination-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ Running this pipeline will create two tables in the destination, `users` (**root
| 2 | Spot | dog | 9uxh36VU9lqKpw | wX3f5vn801W16A | 1 |
| 3 | Fido | dog | pe3FVtCWz8VuNA | rX8ybgTeEmAmmA | 0 |

When inferring a database schema, `dlt` maps the structure of Python objects (i.e., from parsed JSON files) into nested tables and creates references between them.
When inferring a database schema, dlt maps the structure of Python objects (i.e., from parsed JSON files) into nested tables and creates references between them.

This is how it works:

1. Each row in all (root and nested) data tables created by `dlt` contains a unique column named `_dlt_id` (**row key**).
1. Each row in all (root and nested) data tables created by dlt contains a unique column named `_dlt_id` (**row key**).
2. Each nested table contains a column named `_dlt_parent_id` referencing a particular row (`_dlt_id`) of a parent table (**parent key**).
3. Rows in nested tables come from the Python lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`.
4. For nested tables that are loaded with the `merge` write disposition, we add a **root key** column `_dlt_root_id`, which references the child table to a row in the root table.
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/full-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ p.run(issues, write_disposition="replace", primary_key="id", table_name="issues"

## Choosing the correct replace strategy for your full load

`dlt` implements three different strategies for doing a full load on your table: `truncate-and-insert`, `insert-from-staging`, and `staging-optimized`. The exact behavior of these strategies can also vary between the available destinations.
dlt implements three different strategies for doing a full load on your table: `truncate-and-insert`, `insert-from-staging`, and `staging-optimized`. The exact behavior of these strategies can also vary between the available destinations.

You can select a strategy with a setting in your `config.toml` file. If you do not select a strategy, dlt will default to `truncate-and-insert`.

Expand Down
20 changes: 10 additions & 10 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The `merge` write disposition can be used with three different strategies:
2. `scd2`
3. `upsert`

### `Delete-insert` strategy
### `delete-insert` strategy

The default `delete-insert` strategy is used in two scenarios:

Expand Down Expand Up @@ -405,7 +405,7 @@ The `upsert` merge strategy is currently supported for these destinations:
- `mssql`
- `postgres`
- `snowflake`
- 🧪 `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/filesystem.md#known-limitations))
- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/filesystem.md#known-limitations))
:::

The `upsert` merge strategy does primary-key based *upserts*:
Expand Down Expand Up @@ -468,7 +468,7 @@ In essence, the `dlt.sources.incremental` instance above:

When paginating, you probably need the **start_value** which does not change during the execution of the resource, however, most paginators will return a **next page** link which you should use.

Behind the scenes, `dlt` will deduplicate the results, i.e., in case the last issue is returned again (`updated_at` filter is inclusive) and skip already loaded ones.
Behind the scenes, dlt will deduplicate the results, i.e., in case the last issue is returned again (`updated_at` filter is inclusive) and skip already loaded ones.

In the example below, we incrementally load the GitHub events, where the API does not let us filter for the newest events - it always returns all of them. Nevertheless, `dlt` will load only the new items, filtering out all the duplicates and past issues.
```py
Expand All @@ -487,7 +487,7 @@ We just yield all the events and `dlt` does the filtering (using the `id` column
GitHub returns events ordered from newest to oldest. So we declare the `rows_order` as **descending** to [stop requesting more pages once the incremental value is out of range](#declare-row-order-to-not-request-unnecessary-data). We stop requesting more data from the API after finding the first event with `created_at` earlier than `initial_value`.

:::note
`dlt.sources.incremental` is implemented as a [filter function](resource.md#filter-transform-and-pivot-data) that is executed **after** all other transforms you add with `add_map` / `add_filter`. This means that you can manipulate the data item before the incremental filter sees it. For example:
`dlt.sources.incremental` is implemented as a [filter function](resource.md#filter-transform-and-pivot-data) that is executed **after** all other transforms you add with `add_map` or `add_filter`. This means that you can manipulate the data item before the incremental filter sees it. For example:
* You can create a surrogate primary key from other columns
* You can modify the cursor value or create a new field composed of other fields
* Dump Pydantic models to Python dicts to allow incremental to find custom values
Expand Down Expand Up @@ -592,15 +592,15 @@ august_issues = repo_issues(
...
```

Note that `dlt`'s incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps.
Note that dlt's incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps.

### Declare row order to not request unnecessary data

With the `row_order` argument set, `dlt` will stop retrieving data from the data source (e.g., GitHub API) if it detects that the values of the cursor field are out of the range of **start** and **end** values.
With the `row_order` argument set, dlt will stop retrieving data from the data source (e.g., GitHub API) if it detects that the values of the cursor field are out of the range of **start** and **end** values.

In particular:
* `dlt` stops processing when the resource yields any item with a cursor value _equal to or greater than_ the `end_value` and `row_order` is set to **asc**. (`end_value` is not included)
* `dlt` stops processing when the resource yields any item with a cursor value _lower_ than the `last_value` and `row_order` is set to **desc**. (`last_value` is included)
* dlt stops processing when the resource yields any item with a cursor value _equal to or greater than_ the `end_value` and `row_order` is set to **asc**. (`end_value` is not included)
* dlt stops processing when the resource yields any item with a cursor value _lower_ than the `last_value` and `row_order` is set to **desc**. (`last_value` is included)

:::note
"higher" and "lower" here refer to when the default `last_value_func` is used (`max()`),
Expand Down Expand Up @@ -747,8 +747,8 @@ def tickets(
```

We opt-in to the Airflow scheduler by setting `allow_external_schedulers` to `True`:
1. When running on Airflow, the start and end values are controlled by Airflow and the `dlt` [state](state.md) is not used.
2. In all other environments, the `incremental` behaves as usual, maintaining the `dlt` state.
1. When running on Airflow, the start and end values are controlled by Airflow and the dlt [state](state.md) is not used.
2. In all other environments, the `incremental` behaves as usual, maintaining the dlt state.

Let's generate a deployment with `dlt deploy zendesk_pipeline.py airflow-composer` and customize the DAG:

Expand Down
10 changes: 5 additions & 5 deletions docs/website/docs/general-usage/naming-convention.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ keywords: [identifiers, snake case, case sensitive, case insensitive, naming]
---

# Naming convention
`dlt` creates table and column identifiers from the data. The data source, i.e., a stream of JSON documents, may have identifiers (i.e., key names in a dictionary) with any Unicode characters, of any length, and naming style. On the other hand, destinations require that you follow strict rules when you name tables, columns, or collections.
dlt creates table and column identifiers from the data. The data source, i.e., a stream of JSON documents, may have identifiers (i.e., key names in a dictionary) with any Unicode characters, of any length, and naming style. On the other hand, destinations require that you follow strict rules when you name tables, columns, or collections.
A good example is [Redshift](../dlt-ecosystem/destinations/redshift.md#naming-convention) that accepts case-insensitive alphanumeric identifiers with a maximum of 127 characters.

`dlt` groups tables from a single [source](source.md) in a [schema](schema.md). Each schema defines a **naming convention** that tells `dlt` how to translate identifiers to the
Expand Down Expand Up @@ -139,9 +139,9 @@ Depending on the destination, certain names may not be allowed. To ensure your d

## Avoid identifier collisions
`dlt` detects various types of identifier collisions and ignores the others.
1. `dlt` detects collisions if a case-sensitive naming convention is used on a case-insensitive destination.
2. `dlt` detects collisions if a change of naming convention changes the identifiers of tables already created in the destination.
3. `dlt` detects collisions when the naming convention is applied to column names of arrow tables.
1. dlt detects collisions if a case-sensitive naming convention is used on a case-insensitive destination.
2. dlt detects collisions if a change of naming convention changes the identifiers of tables already created in the destination.
3. dlt detects collisions when the naming convention is applied to column names of arrow tables.

`dlt` will not detect a collision when normalizing source data. If you have a dictionary, keys will be merged if they collide after being normalized.
You can create a custom naming convention that does not generate collisions on data, see examples below.
Expand All @@ -160,7 +160,7 @@ We include [two examples](../examples/custom_naming) of naming conventions that
2. A variant of `sql_cs` that allows for LATIN (i.e., umlaut) characters.

:::note
Note that the fully qualified name of your custom naming convention will be stored in the `Schema`, and `dlt` will attempt to import it when the schema is loaded from storage.
Note that the fully qualified name of your custom naming convention will be stored in the schema, and dlt will attempt to import it when the schema is loaded from storage.
You should distribute your custom naming conventions with your pipeline code or via a pip package from which it can be imported.
:::

8 changes: 4 additions & 4 deletions docs/website/docs/general-usage/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Once the pipeline runs, all resources are evaluated and the data is loaded at th

Example:

This pipeline will load a list of objects into a `duckdb` table named "three":
This pipeline will load a list of objects into a DuckDB table named "three":

```py
import dlt
Expand Down Expand Up @@ -41,7 +41,7 @@ To load the data, you call the `run` method and pass your data in the `data` arg

Arguments:

- `data` (the first argument) may be a dlt source, resource, generator function, or any Iterator /
- `data` (the first argument) may be a dlt source, resource, generator function, or any Iterator or
Iterable (i.e., a list or the result of the `map` function).
- `write_disposition` controls how to write data to a table. Defaults to "append".
- `append` will always add new data at the end of the table.
Expand Down Expand Up @@ -112,7 +112,7 @@ You can reset parts or all of your sources by using the `refresh` argument to `d
That means when you run the pipeline, the sources/resources being processed will have their state reset and their tables either dropped or truncated,
depending on which refresh mode is used.

The `refresh` option works with all relational/SQL destinations and file buckets (`filesystem`). It does not work with vector databases (we are working on that) and
The `refresh` option works with all relational or SQL destinations and cloud storages and files (`filesystem`). It does not work with vector databases (we are working on that) and
with custom destinations.

The `refresh` argument should have one of the following string values to decide the refresh mode:
Expand Down Expand Up @@ -184,7 +184,7 @@ the "_schedule" is truncated, and new (full) table data will be inserted/copied.
## Display the loading progress

You can add a progress monitor to the pipeline. Typically, its role is to visually assure the user that
the pipeline run is progressing. `dlt` supports 4 progress monitors out of the box:
the pipeline run is progressing. dlt supports 4 progress monitors out of the box:

- [enlighten](https://github.com/Rockhopper-Technologies/enlighten) - a status bar with progress
bars that also allows for logging.
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ You can emit columns as a Pydantic model and use dynamic hints (i.e., lambda for
:::

### Import external files
You can import external files, i.e., `csv`, `parquet`, and `jsonl`, by yielding items marked with `with_file_import`, optionally passing a table schema corresponding to the imported file. `dlt` will not read, parse, or normalize any names (i.e., `csv` or `arrow` headers) and will attempt to copy the file into the destination as is.
You can import external files, i.e., CSV, Parquet, and JSONL, by yielding items marked with `with_file_import`, optionally passing a table schema corresponding to the imported file. dlt will not read, parse, or normalize any names (i.e., CSV or Arrow headers) and will attempt to copy the file into the destination as is.
```py
import os
import dlt
Expand Down Expand Up @@ -518,7 +518,7 @@ include_header=false
on_error_continue=true
```

You can sniff the schema from the data, i.e., using `duckdb` to infer the table schema from a `csv` file. `dlt.mark.with_file_import` accepts additional arguments that you can use to pass hints at runtime.
You can sniff the schema from the data, i.e., using DuckDB to infer the table schema from a CSV file. `dlt.mark.with_file_import` accepts additional arguments that you can use to pass hints at runtime.

:::note
* If you do not define any columns, the table will not be created in the destination. `dlt` will still attempt to load data into it, so if you create a fitting table upfront, the load process will succeed.
Expand Down
8 changes: 4 additions & 4 deletions docs/website/docs/general-usage/schema-contracts.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ As a consequence, `discard_row` will drop the whole data item - even if a nested

All contract settings apply to [arrow tables and panda frames](../dlt-ecosystem/verified-sources/arrow-pandas.md) as well.
1. **tables** mode is the same - no matter what the data item type is.
2. **columns** will allow new columns, raise an exception, or modify tables/frames still in the extract step to avoid rewriting parquet files.
2. **columns** will allow new columns, raise an exception, or modify tables/frames still in the extract step to avoid rewriting Parquet files.
3. **data_type** changes to data types in tables/frames are not allowed and will result in a data type schema clash. We could allow for more modes (evolving data types in Arrow tables sounds weird but ping us on Slack if you need it.)

Here's how `dlt` deals with column modes:
Expand Down Expand Up @@ -135,12 +135,12 @@ except PipelineStepFailed as pip_ex:
`DataValidationError` provides the following context:
1. `schema_name`, `table_name`, and `column_name` provide the logical "location" at which the contract was violated.
2. `schema_entity` and `contract_mode` indicate which contract was violated.
3. `table_schema` contains the schema against which the contract was validated. It may be a Pydantic model or a `dlt` TTableSchema instance.
3. `table_schema` contains the schema against which the contract was validated. It may be a Pydantic model or a dlt `TTableSchema` instance.
4. `schema_contract` is the full, expanded schema contract.
5. `data_item` is the causing data item (Python dict, arrow table, Pydantic model, or list thereof).

### Contracts on new tables
If a table is a **new table** that has not been created on the destination yet, `dlt` will allow the creation of new columns. For a single pipeline run, the column mode is changed (internally) to **evolve** and then reverted back to the original mode. This allows for initial schema inference to happen, and then on subsequent runs, the inferred contract will be applied to the new data.
If a table is a **new table** that has not been created on the destination yet, dlt will allow the creation of new columns. For a single pipeline run, the column mode is changed (internally) to **evolve** and then reverted back to the original mode. This allows for initial schema inference to happen, and then on subsequent runs, the inferred contract will be applied to the new data.

The following tables are considered new:
1. Child tables inferred from nested data.
Expand All @@ -164,7 +164,7 @@ Tables that are not considered new:
### Working with datasets that have manually added tables and columns on the first load
In some cases, you might be working with datasets that have tables or columns created outside of `dlt`. If you are loading to a table not created by `dlt` for the first time, `dlt` will not know about this table while enforcing schema contracts. This means that if you do a load where the `tables` are set to `evolve`, all will work as planned. If you have `tables` set to `freeze`, `dlt` will raise an exception because it thinks you are creating a new table (which you are from `dlt`'s perspective). You can allow `evolve` for one load and then switch back to `freeze`.
In some cases, you might be working with datasets that have tables or columns created outside of dlt. If you are loading to a table not created by dlt for the first time, dlt will not know about this table while enforcing schema contracts. This means that if you do a load where the `tables` are set to `evolve`, all will work as planned. If you have `tables` set to `freeze`, dlt will raise an exception because it thinks you are creating a new table (which you are from dlt's perspective). You can allow `evolve` for one load and then switch back to `freeze`.

The same thing will happen if `dlt` knows your table, but you have manually added a column to your destination and you have `columns` set to `freeze`.

Expand Down
Loading

0 comments on commit 342967c

Please sign in to comment.