Skip to content

Commit

Permalink
removes calls to raise_on_failed_jobs() in docs
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Sep 9, 2024
1 parent 932c160 commit e8c8b22
Show file tree
Hide file tree
Showing 18 changed files with 4 additions and 62 deletions.
5 changes: 1 addition & 4 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
Sequence,
Iterable,
Optional,
Any,
Dict,
Union,
TYPE_CHECKING,
)

from dlt.common.jsonpath import TAnyJsonPath
from dlt.common.exceptions import TerminalException
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.pipeline import pipeline_state as current_pipeline_state, TRefreshMode
from dlt.common.storages.load_package import TLoadPackageDropTablesState
Expand Down Expand Up @@ -139,7 +136,7 @@ def __call__(self) -> None:

self.pipeline.normalize()
try:
self.pipeline.load(raise_on_failed_jobs=True)
self.pipeline.load()
except Exception:
# Clear extracted state on failure so command can run again
self.pipeline.drop_pending_packages()
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Note: All paths in this guide are relative to the `dlt` repository directory.
- Update the doc string which will compromise the generated markdown file, check the other examples how it is done
- If your example requires any secrets, add the vars to the example.secrects.toml but do not enter the values.
- Add your example code, make sure you have a `if __name__ = "__main__"` clause in which you run the example script, this will be used for testing
- You should add one or two assertions after running your example and maybe also `load_info.raise_on_failed_jobs()`, this will help greatly with testing
- You should add one or two assertions after running your example

## Testing
- You can test your example simply by running your example script from your example folder. On CI a test will be automatically generated.
Expand All @@ -31,4 +31,4 @@ If you use any secrets for the code snippets, e.g. Zendesk requires credentials.
If your example requires any additional dependency, then you can add it

- To `pyproject.toml` in the `[tool.poetry.group.docs.dependencies]` section.
- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit.
- Do not forget to update your `poetry.lock` file with `poetry lock --no-update` command and commit.
3 changes: 0 additions & 3 deletions docs/examples/_template/_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,3 @@
# Extract, normalize, and load the data
load_info = pipeline.run([1, 2, 3], table_name="player")
print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/chess/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,3 @@ def players_games(username: Any) -> Iterator[TDataItems]:
).run(chess(max_players=5, month=9))
# display where the data went
print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
5 changes: 0 additions & 5 deletions docs/examples/chess_production/chess_production.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ def load_data_with_retry(pipeline, data):
load_info = pipeline.run(data)
logger.info(str(load_info))

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
Expand Down Expand Up @@ -170,6 +168,3 @@ def load_data_with_retry(pipeline, data):
# get data for a few famous players
data = chess(max_players=MAX_PLAYERS)
load_info = load_data_with_retry(pipeline, data)

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/connector_x_arrow/connector_x_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,3 @@ def genome_resource():
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
__source_name__ = "spotify"

import datetime # noqa: I251
import os
from dataclasses import dataclass, fields
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -142,7 +141,6 @@ def lancedb_destination(items: TDataItems, table: TTableSchema) -> None:
)

load_info = pipeline.run(spotify_shows())
load_info.raise_on_failed_jobs()
print(load_info)

row_counts = pipeline.last_trace.last_normalize_info
Expand Down
4 changes: 0 additions & 4 deletions docs/examples/custom_naming/custom_naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
# Extract, normalize, and load the data
load_info = pipeline.run([{"StückId": 1}], table_name="Ausrüstung")
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()
with pipeline.sql_client() as client:
# NOTE: we quote case sensitive identifers
with client.execute_query('SELECT "StückId" FROM "Ausrüstung"') as cur:
Expand All @@ -66,12 +64,10 @@
# duckdb is case insensitive so tables and columns below would clash but sql_ci_no_collision prevents that
data_1 = {"ItemID": 1, "itemid": "collides"}
load_info = pipeline.run([data_1], table_name="BigData")
load_info.raise_on_failed_jobs()

data_2 = {"1Data": 1, "_1data": "collides"}
# use colliding table
load_info = pipeline.run([data_2], table_name="bigdata")
load_info.raise_on_failed_jobs()

with pipeline.sql_client() as client:
from duckdb import DuckDBPyConnection
Expand Down
3 changes: 0 additions & 3 deletions docs/examples/google_sheets/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,3 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
print(row_counts.keys())
assert row_counts["hidden_columns_merged_cells"] == 7
assert row_counts["blank_columns"] == 21

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/incremental_loading/incremental_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,3 @@ def get_pages(
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["ticket_events"] == 17

# make sure nothing failed
load_info.raise_on_failed_jobs()
9 changes: 0 additions & 9 deletions docs/examples/nested_data/nested_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def convert_mongo_objs(value: Any) -> Any:
tables.pop("_dlt_pipeline_state")
assert len(tables) == 7, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
# This allows for dynamic control over the maximum nesting
Expand All @@ -149,9 +146,6 @@ def convert_mongo_objs(value: Any) -> Any:
tables.pop("_dlt_pipeline_state")
assert len(tables) == 1, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()

# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
# in 'movies' table should have type complex which means
Expand All @@ -168,6 +162,3 @@ def convert_mongo_objs(value: Any) -> Any:
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 6, pipeline.last_trace.last_normalize_info

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,3 @@ def pdf_to_text(file_item, separate_pages: bool = False):
client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
assert row_counts["table_1"] == 9
assert row_counts["table_2"] == 9

# make sure nothing failed
load_info.raise_on_failed_jobs()

if load_type == "replace":
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
Expand Down
6 changes: 0 additions & 6 deletions docs/examples/qdrant_zendesk/qdrant_zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ def get_pages(

print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()

# getting the authenticated Qdrant client to connect to your Qdrant database
with pipeline.destination_client() as destination_client:
from qdrant_client import QdrantClient
Expand All @@ -194,6 +191,3 @@ def get_pages(
)

assert len(response) <= 3 and len(response) > 0

# make sure nothing failed
load_info.raise_on_failed_jobs()
3 changes: 0 additions & 3 deletions docs/examples/transformers/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,3 @@ def species(pokemon_details):
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def destination_instantiation_snippet() -> None:
# here dependencies dependencies will be imported, secrets pulled and destination accessed
# we pass bucket_url explicitly and expect credentials passed by config provider
load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url))
load_info.raise_on_failed_jobs()
# @@@DLT_SNIPPET_END late_destination_access


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ async def _run_async():
loop.run_in_executor(executor, _run_pipeline, pipeline_1, async_table),
loop.run_in_executor(executor, _run_pipeline, pipeline_2, defer_table),
)
# result contains two LoadInfo instances
results[0].raise_on_failed_jobs()
results[1].raise_on_failed_jobs()
# results contains two LoadInfo instances

# load data
asyncio.run(_run_async())
Expand Down
2 changes: 0 additions & 2 deletions docs/website/docs/running-in-production/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ All the jobs that were running in parallel are completed before raising. The dlt
Here is an example `config.toml` to disable this behavior:

```toml
# you should really load just one job at a time to get the deterministic behavior
load.workers=1
# I hope you know what you are doing by setting this to false
load.raise_on_failed_jobs=false
```
Expand Down

0 comments on commit e8c8b22

Please sign in to comment.