Skip to content

Commit

Permalink
Replace some full_refresh usage in code and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Mar 7, 2024
1 parent dcaaca6 commit 109c8c9
Show file tree
Hide file tree
Showing 26 changed files with 57 additions and 54 deletions.
13 changes: 8 additions & 5 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,25 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
pipeline_name, pipelines_dir = None, None
f_r_node = call_args.arguments.get("full_refresh")
# Check both full_refresh/dev_mode until full_refresh option is removed from dlt
f_r_node = call_args.arguments.get("full_refresh") or call_args.arguments.get(
"dev_mode"
)
if f_r_node:
f_r_value = evaluate_node_literal(f_r_node)
if f_r_value is None:
fmt.warning(
"The value of `full_refresh` in call to `dlt.pipeline` cannot be"
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
if fmt.confirm(
"The value of 'full_refresh' is set to True. Do you want to abort to set it"
" to False?",
"The value of 'dev_mode' or 'full_refresh' is set to True. Do you want to"
" abort to set it to False?",
default=True,
):
raise CliCommandException("deploy", "Please set the full_refresh to False")
raise CliCommandException("deploy", "Please set the dev_mode to False")

p_d_node = call_args.arguments.get("pipelines_dir")
if p_d_node:
Expand Down
12 changes: 6 additions & 6 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator
elif decompose == "serialize":
if not isinstance(data, DltSource):
raise ValueError("Can only decompose dlt sources")
if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")
if pipeline.dev_mode:
raise ValueError("Cannot decompose pipelines with dev_mode set")
# serialize tasks
tasks = []
pt = None
Expand All @@ -388,8 +388,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator
if not isinstance(data, DltSource):
raise ValueError("Can only decompose dlt sources")

if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")
if pipeline.dev_mode:
raise ValueError("Cannot decompose pipelines with dev_mode set")

tasks = []
sources = data.decompose("scc")
Expand Down Expand Up @@ -424,8 +424,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator
if not isinstance(data, DltSource):
raise ValueError("Can only decompose dlt sources")

if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")
if pipeline.dev_mode:
raise ValueError("Cannot decompose pipelines with dev_mode set")

# parallel tasks
tasks = []
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def pipeline(
export_schema_path (str, optional): A path where the schema `yaml` file will be exported after every schema change. Defaults to None which disables exporting.
full_refresh (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.
dev_mode (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.
The datasets are identified by `dataset_name_` + datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False.
credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/archive/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from sources.google_sheets import google_spreadsheet

dlt.pipeline(destination="bigquery", full_refresh=False)
dlt.pipeline(destination="bigquery", dev_mode=False)
# see example.secrets.toml to where to put credentials

# "2022-05", "model_metadata"
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/archive/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
dataset_name=dataset_name,
credentials=credentials,
export_schema_path=export_schema_path,
full_refresh=True,
dev_mode=True,
)


Expand Down
2 changes: 1 addition & 1 deletion docs/examples/archive/rasa_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
event_files = jsonl_files([file for file in os.scandir("docs/examples/data/rasa_trackers")])

info = dlt.pipeline(
full_refresh=True,
dev_mode=True,
destination=postgres,
# export_schema_path=... # uncomment to see the final schema in the folder you want
).run(
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/archive/singer_tap_jsonl_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# load hubspot schema stub - it converts all field names with `timestamp` into timestamp type
schema = SchemaStorage.load_schema_file("docs/examples/schemas/", "hubspot", ("yaml",))

p = dlt.pipeline(destination="postgres", full_refresh=True)
p = dlt.pipeline(destination="postgres", dev_mode=True)
# now load a pipeline created from jsonl resource that feeds messages into singer tap transformer
pipe = jsonl_file("docs/examples/data/singer_taps/tap_hubspot.jsonl") | singer_raw_stream()
# provide hubspot schema
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/chess/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def players_games(username: Any) -> Iterator[TDataItems]:
assert os.getcwd().endswith("chess")
# chess_url in config.toml, credentials for postgres in secrets.toml, credentials always under credentials key
# look for parallel run configuration in `config.toml`!
# mind the full_refresh: it makes the pipeline to load to a distinct dataset each time it is run and always is resetting the schema and state
# mind the dev_mode: it makes the pipeline to load to a distinct dataset each time it is run and always is resetting the schema and state
info = dlt.pipeline(
pipeline_name="chess_games", destination="postgres", dataset_name="chess", full_refresh=True
pipeline_name="chess_games", destination="postgres", dataset_name="chess", dev_mode=True
).run(chess(max_players=5, month=9))
# display where the data went
print(info)
2 changes: 1 addition & 1 deletion docs/website/docs/build-a-pipeline-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_reactions",
full_refresh=True
dev_mode=True
)

with pipeline.sql_client() as client:
Expand Down
8 changes: 4 additions & 4 deletions docs/website/docs/dlt-ecosystem/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ By default, a DuckDB database will be created in the current working directory w
The `duckdb` credentials do not require any secret values. You are free to pass the configuration explicitly via the `credentials` parameter to `dlt.pipeline` or `pipeline.run` methods. For example:
```python
# will load data to files/data.db database file
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', full_refresh=False, credentials="files/data.db")
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', dev_mode=False, credentials="files/data.db")

# will load data to /var/local/database.duckdb
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', full_refresh=False, credentials="/var/local/database.duckdb")
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', dev_mode=False, credentials="/var/local/database.duckdb")
```

The destination accepts a `duckdb` connection instance via `credentials`, so you can also open a database connection yourself and pass it to `dlt` to use. `:memory:` databases are supported.
```python
import duckdb
db = duckdb.connect()
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', full_refresh=False, credentials=db)
p = dlt.pipeline(pipeline_name='chess', destination='duckdb', dataset_name='chess_data', dev_mode=False, credentials=db)
```

This destination accepts database connection strings in format used by [duckdb-engine](https://github.com/Mause/duckdb_engine#configuration).
Expand Down Expand Up @@ -124,4 +124,4 @@ This destination fully supports [dlt state sync](../../general-usage/state#synci
- [Load data from Chess.com to DuckDB in python with dlt](https://dlthub.com/docs/pipelines/chess/load-data-with-python-from-chess-to-duckdb)
- [Load data from HubSpot to DuckDB in python with dlt](https://dlthub.com/docs/pipelines/hubspot/load-data-with-python-from-hubspot-to-duckdb)
- [Load data from GitHub to DuckDB in python with dlt](https://dlthub.com/docs/pipelines/github/load-data-with-python-from-github-to-duckdb)
<!--@@@DLT_SNIPPET_END tuba::duckdb-->
<!--@@@DLT_SNIPPET_END tuba::duckdb-->
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/transformations/pandas.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_reactions",
full_refresh=True
dev_mode=True
)
with pipeline.sql_client() as client:
with client.execute_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ To read more about tables, columns, and datatypes, please refer to [our document
`dlt` will **not modify** tables after they are created.
So if you changed data types with hints,
then you need to **delete the dataset**
or set `full_refresh=True`.
or set `dev_mode=True`.
:::

## Sources and resources
Expand Down Expand Up @@ -621,4 +621,4 @@ Enjoy the DLT Google Sheets pipeline experience!
- [Load data from Google Sheets to AWS Athena in python with dlt](https://dlthub.com/docs/pipelines/google_sheets/load-data-with-python-from-google_sheets-to-athena)
- [Load data from Google Sheets to Redshift in python with dlt](https://dlthub.com/docs/pipelines/google_sheets/load-data-with-python-from-google_sheets-to-redshift)
- [Load data from Google Sheets to Snowflake in python with dlt](https://dlthub.com/docs/pipelines/google_sheets/load-data-with-python-from-google_sheets-to-snowflake)
<!--@@@DLT_SNIPPET_END tuba::google_sheets-->
<!--@@@DLT_SNIPPET_END tuba::google_sheets-->
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ To create your data pipeline using single loading and
> For incremental loading of endpoints, maintain the pipeline name and destination dataset name.
> The pipeline name is important for accessing the [state](../../general-usage/state) from the
> last run, including the end date for incremental data loads. Altering these names could trigger
> a [full_refresh](../../general-usage/pipeline#do-experiments-with-full-refresh), disrupting
> a [dev-mode](../../general-usage/pipeline#do-experiments-with-dev-mode), disrupting
> the metadata tracking for [incremental data loading](../../general-usage/incremental-loading).
1. To load data from the “contact” in replace mode and “task” incrementally merge mode endpoints:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ To create your own pipeline, use source and resource methods from this verified
print(info)
```

1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[full_refresh](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading).
1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[dev_mode](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-dev-mode)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading).

<!--@@@DLT_SNIPPET_START tuba::sql_database-->
<!--@@@DLT_SNIPPET_END tuba::sql_database-->
<!--@@@DLT_SNIPPET_END tuba::sql_database-->
4 changes: 2 additions & 2 deletions docs/website/docs/dlt-ecosystem/verified-sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ verified source.
load_info = pipeline.run(data=[source_single, source_incremental])
print(load_info)
```
> To load data, maintain the pipeline name and destination dataset name. The pipeline name is vital for accessing the last run's [state](https://dlthub.com/docs/general-usage/state), which determines the incremental data load's end date. Altering these names can trigger a [“full_refresh”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh), disrupting the metadata (state) tracking for [incremental data loading](https://dlthub.com/docs/general-usage/incremental-loading).
> To load data, maintain the pipeline name and destination dataset name. The pipeline name is vital for accessing the last run's [state](https://dlthub.com/docs/general-usage/state), which determines the incremental data load's end date. Altering these names can trigger a [“dev_mode”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-dev-mode), disrupting the metadata (state) tracking for [incremental data loading](https://dlthub.com/docs/general-usage/incremental-loading).

1. To load important metrics and store them in database:

Expand Down Expand Up @@ -279,4 +279,4 @@ verified source.
- [Load data from Stripe to Redshift in python with dlt](https://dlthub.com/docs/pipelines/stripe_analytics/load-data-with-python-from-stripe_analytics-to-redshift)
- [Load data from Stripe to AWS Athena in python with dlt](https://dlthub.com/docs/pipelines/stripe_analytics/load-data-with-python-from-stripe_analytics-to-athena)
- [Load data from Stripe to BigQuery in python with dlt](https://dlthub.com/docs/pipelines/stripe_analytics/load-data-with-python-from-stripe_analytics-to-bigquery)
<!--@@@DLT_SNIPPET_END tuba::stripe_analytics-->
<!--@@@DLT_SNIPPET_END tuba::stripe_analytics-->
4 changes: 2 additions & 2 deletions docs/website/docs/dlt-ecosystem/verified-sources/workable.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ To create your data pipeline using single loading and
destination dataset names. The pipeline name helps retrieve the
[state](https://dlthub.com/docs/general-usage/state) of the last run, essential for incremental
data loading. Changing these names might trigger a
[“full_refresh”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh),
[“dev_mode”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-dev-mode),
disrupting metadata tracking for
[incremental data loading](https://dlthub.com/docs/general-usage/incremental-loading).

<!--@@@DLT_SNIPPET_START tuba::workable-->
<!--@@@DLT_SNIPPET_END tuba::workable-->
<!--@@@DLT_SNIPPET_END tuba::workable-->
4 changes: 2 additions & 2 deletions docs/website/docs/dlt-ecosystem/verified-sources/zendesk.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ verified source.
pipeline = dlt.pipeline(
pipeline_name="dlt_zendesk_pipeline", # Use a custom name if desired
destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
full_refresh = Fasle
dev_mode = False
dataset_name="sample_zendesk_data" # Use a custom name if desired
)
data = zendesk_support(load_all=True, start_date=start_date)
Expand Down Expand Up @@ -375,4 +375,4 @@ verified source.
> data. This approach can be used with all incremental Zendesk sources.
<!--@@@DLT_SNIPPET_START tuba::zendesk-->
<!--@@@DLT_SNIPPET_END tuba::zendesk-->
<!--@@@DLT_SNIPPET_END tuba::zendesk-->
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_reactions",
full_refresh=True
dev_mode=True
)
with pipeline.sql_client() as client:
with client.execute_query(
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/destination-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ the current one.

## Versioned datasets

When you set the `full_refresh` argument to `True` in `dlt.pipeline` call, dlt creates a versioned dataset.
When you set the `dev_mode` argument to `True` in `dlt.pipeline` call, dlt creates a versioned dataset.
This means that each time you run the pipeline, the data is loaded into a new dataset (a new database schema).
The dataset name is the same as the `dataset_name` you provided in the pipeline definition with a
datetime-based suffix.

We modify our pipeline to use the `full_refresh` option to see how this works:
We modify our pipeline to use the `dev_mode` option to see how this works:

```py
import dlt
Expand All @@ -314,7 +314,7 @@ pipeline = dlt.pipeline(
pipeline_name='quick_start',
destination='duckdb',
dataset_name='mydata',
full_refresh=True # <-- add this line
dev_mode=True # <-- add this line
)
load_info = pipeline.run(data, table_name="users")
```
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pipeline = dlt.pipeline(
pipeline_name='facebook_insights',
destination='duckdb',
dataset_name='facebook_insights_data',
full_refresh=True
dev_mode=True
)
fb_ads = facebook_ads_source()
# enable root key propagation on a source that is not a merge one by default.
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Pipeline
description: Explanation of what a dlt pipeline is
keywords: [pipeline, source, full refresh]
keywords: [pipeline, source, full refresh, dev mode]
---

# Pipeline
Expand Down Expand Up @@ -85,11 +85,11 @@ You can inspect stored artifacts using the command
> 💡 You can attach `Pipeline` instance to an existing working folder, without creating a new
> pipeline with `dlt.attach`.
## Do experiments with full refresh
## Do experiments with dev mode

If you [create a new pipeline script](../walkthroughs/create-a-pipeline.md) you will be
experimenting a lot. If you want that each time the pipeline resets its state and loads data to a
new dataset, set the `full_refresh` argument of the `dlt.pipeline` method to True. Each time the
new dataset, set the `dev_mode` argument of the `dlt.pipeline` method to True. Each time the
pipeline is created, `dlt` adds datetime-based suffix to the dataset name.

## Display the loading progress
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ will display source and resource state slots for all known sources.
**To fully reset the state:**

- Drop the destination dataset to fully reset the pipeline.
- [Set the `full_refresh` flag when creating pipeline](pipeline.md#do-experiments-with-full-refresh).
- [Set the `dev_mode` flag when creating pipeline](pipeline.md#do-experiments-with-dev-mode).
- Use the `dlt pipeline drop --drop-all` command to
[drop state and tables for a given schema name](../reference/command-line-interface.md#selectively-drop-tables-and-reset-state).

Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def read_table(limit):

# this prevents process pool to run the initialization code again
if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ:
pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True)
pipeline = dlt.pipeline("parallel_load", destination="duckdb", dev_mode=True)
pipeline.extract(read_table(1000000))

load_id = pipeline.list_extracted_load_packages()[0]
Expand Down Expand Up @@ -449,8 +449,8 @@ def _run_pipeline(pipeline, gen_):
return pipeline.run(gen_())

# declare pipelines in main thread then run them "async"
pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)
pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", full_refresh=True)
pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True)
pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", dev_mode=True)

async def _run_async():
loop = asyncio.get_running_loop()
Expand Down
Loading

0 comments on commit 109c8c9

Please sign in to comment.