diff --git a/docs/tools/lint_setup/template.py b/docs/tools/lint_setup/template.py index dcfada63f6..6b207ceb0b 100644 --- a/docs/tools/lint_setup/template.py +++ b/docs/tools/lint_setup/template.py @@ -8,8 +8,8 @@ import os import pendulum -from pendulum import DateTime from datetime import datetime # noqa: I251 +from pendulum import DateTime import dlt from dlt.common import json @@ -26,6 +26,7 @@ BaseConfiguration, ) from dlt.common.storages.configuration import FileSystemCredentials +from dlt.pipeline.exceptions import PipelineStepFailed # some universal variables pipeline: dlt.Pipeline = None # type: ignore[assignment] @@ -33,3 +34,4 @@ ex: Exception = None # type: ignore[assignment] load_info: LoadInfo = None # type: ignore[assignment] url: str = None # type: ignore[assignment] +my_resource: DltResource = None # type: ignore[assignment] \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/destinations/destination.md b/docs/website/docs/dlt-ecosystem/destinations/destination.md index 60753d90b5..c9a0bff022 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/destination.md +++ b/docs/website/docs/dlt-ecosystem/destinations/destination.md @@ -54,7 +54,7 @@ The full signature of the destination decorator plus its function is the followi loader_file_format="jsonl", name="my_custom_destination", naming_convention="direct", - max_nesting_level=0, + max_table_nesting=0, skip_dlt_columns_and_tables=True ) def my_destination(items: TDataItems, table: TTableSchema) -> None: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md b/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md index bd04dbfcf3..f6b16ef944 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md @@ -215,7 +215,7 @@ verified source. base_id = base_id, table_names = table_names ) - load_info = pipeline.run(airtables, write_deposition = "replace") + load_info = pipeline.run(airtables, write_disposition = "replace") ``` > You have the option to use table names or table IDs in the code above, in place of "Table1" and diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md index 6ae457d1e6..62776b5c53 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md @@ -127,7 +127,9 @@ def players_profiles(players: List[str]) -> Iterator[TDataItem]: @dlt.defer def _get_profile(username: str) -> TDataItem: return get_path_with_retry(f"player/{username}") - ... + + for username in players: + yield _get_profile(username) ``` `players`: Is a list of player usernames for which you want to fetch profile data. @@ -161,7 +163,7 @@ def players_games( ) -> Iterator[Callable[[], List[TDataItem]]]: # gets a list of already checked(loaded) archives. checked_archives = dlt.current.resource_state().setdefault("archives", []) - ... + yield {} # return your retrieved data here ``` `players`: Is a list of player usernames for which you want to fetch games. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index bf30da8882..9e0d46c563 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -453,7 +453,8 @@ verified source. ) # pretty print the information on data that was loaded print(load_info) - print(listing)(pipeline.last_trace.last_normalize_info) + print(listing) + print(pipeline.last_trace.last_normalize_info) ``` 1. Cleanup after loading: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/jira.md b/docs/website/docs/dlt-ecosystem/verified-sources/jira.md index 38dacb0541..26c4462c34 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/jira.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/jira.md @@ -173,7 +173,8 @@ The resource function searches issues using JQL queries and then loads them to t ```py @dlt.resource(write_disposition="replace") def issues(jql_queries: List[str]) -> Iterable[TDataItem]: - api_path = "rest/api/3/search" + api_path = "rest/api/3/search" + return {} # return the retrieved values here ``` `jql_queries`: Accepts a list of JQL queries. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md b/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md index 3dc815d53b..1e570bfe7a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md @@ -213,10 +213,11 @@ create and store a mapping of custom fields for different entities in the source ```py @dlt.resource(selected=False) def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]: - def _get_pages_for_rename( - entity: str, fields_entity: str, pipedrive_api_key: str - ) -> Dict[str, Any]: + def _get_pages_for_rename( + entity: str, fields_entity: str, pipedrive_api_key: str + ) -> Dict[str, Any]: ... + yield _get_pages_for_rename("", "", "") ``` It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields diff --git a/docs/website/docs/general-usage/credentials/config_specs.md b/docs/website/docs/general-usage/credentials/config_specs.md index e93e1c466a..e66939fc39 100644 --- a/docs/website/docs/general-usage/credentials/config_specs.md +++ b/docs/website/docs/general-usage/credentials/config_specs.md @@ -94,7 +94,7 @@ credentials = ConnectionStringCredentials() credentials.drivername = "postgresql" credentials.database = "my_database" credentials.username = "my_user" -credentials.password = "my_password" +credentials.password = "my_password" # type: ignore credentials.host = "localhost" credentials.port = 5432 @@ -118,12 +118,12 @@ It also allows for the addition of scopes and provides methods for client authen Usage: ```py -credentials = OAuth2Credentials( - client_id="CLIENT_ID", - client_secret="CLIENT_SECRET", - refresh_token="REFRESH_TOKEN", - scopes=["scope1", "scope2"] -) +credentials = OAuth2Credentials({ + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "refresh_token": "REFRESH_TOKEN", + "scopes": ["scope1", "scope2"] +}) # Authorize the client credentials.auth() diff --git a/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md b/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md index ba0b13636b..eff6f795ac 100644 --- a/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md +++ b/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md @@ -51,11 +51,11 @@ for row in dummy_source().dummy_data.add_map(pseudonymize_name): # 1. Create an instance of the source so you can edit it. data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data.add_map(pseudonymize_name) +data_resource = data_source.dummy_data.add_map(pseudonymize_name) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') -load_info = pipeline.run(data_source) +load_info = pipeline.run(data_resource) ``` diff --git a/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md b/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md index 04e4d33b13..4cbb4d7b32 100644 --- a/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md +++ b/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md @@ -44,10 +44,10 @@ def replace_umlauts_in_dict_keys(d): data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) +data_resource = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}} diff --git a/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md index 6b07845689..3aadb2f982 100644 --- a/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md +++ b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md @@ -127,7 +127,7 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the 1. Create `fetch_average_price()` function as follows: ```py - import datetime + from datetime import datetime, timedelta import requests # Uncomment transformer function if it is to be used as a transformer, @@ -160,7 +160,7 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the device_info = dlt.current.resource_state().setdefault("devices", {}) # Current timestamp for checking the last update - current_timestamp = datetime.datetime.now() + current_timestamp = datetime.now() # Print the current device information # print(device_info) # if you need to check state @@ -172,10 +172,10 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the # Calculate the time since the last update last_updated = ( current_timestamp - - device_data.get('timestamp', datetime.datetime.min) + device_data.get('timestamp', datetime.min) ) # Check if the device is not in state or data is older than 180 days - if device not in device_info or last_updated > datetime.timedelta(days=180): + if device not in device_info or last_updated > timedelta(days=180): try: # Make an API request to fetch device prices response = requests.get("https://serpapi.com/search", params={ diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index e2e95d937f..67609b8989 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -63,6 +63,7 @@ accepts following arguments: ... # the `table_schema` method gets table schema generated by a resource + # TODO: needs fixing print(get_users().table_schema()) ``` @@ -154,6 +155,7 @@ def repo_events() -> Iterator[TDataItems]: # the `table_schema` method gets table schema generated by a resource and takes optional # data item to evaluate dynamic hints +# TODO: needs fixing print(repo_events().table_schema({"type": "WatchEvent", id:...})) ``` @@ -283,7 +285,7 @@ def get_orders(): yield o # users and orders will be iterated in parallel in two separate threads -pipeline.run(get_users(), get_orders()) +pipeline.run[(get_users(), get_orders()]) ``` Async generators are automatically extracted concurrently with other resources: diff --git a/docs/website/docs/general-usage/schema-contracts.md b/docs/website/docs/general-usage/schema-contracts.md index 1b5e67357a..c79d240520 100644 --- a/docs/website/docs/general-usage/schema-contracts.md +++ b/docs/website/docs/general-usage/schema-contracts.md @@ -124,7 +124,7 @@ As with any other exception coming from pipeline run, it will be re-raised via ` ```py try: pipeline.run() -except Exception as pip_ex: +except PipelineStepFailed as pip_ex: if pip_ex.step == "normalize": if isinstance(pip_ex.__context__.__context__, DataValidationError): ... diff --git a/docs/website/docs/general-usage/schema-evolution.md b/docs/website/docs/general-usage/schema-evolution.md index 377df0e47f..dd3aa0bf8a 100644 --- a/docs/website/docs/general-usage/schema-evolution.md +++ b/docs/website/docs/general-usage/schema-evolution.md @@ -163,7 +163,7 @@ data = [{ pipeline = dlt.pipeline("organizations_pipeline", destination="duckdb") # Adding not null constraint -pipeline.run(data, table_name="org", columns={"room": {"data_type": "integer", "nullable": False}}) +pipeline.run(data, table_name="org", columns={"room": {"data_type": "bigint", "nullable": False}}) ``` During pipeline execution a data validation error indicates that a removed column is being passed as null. diff --git a/docs/website/docs/general-usage/schema.md b/docs/website/docs/general-usage/schema.md index 164814010d..9b0d8ec622 100644 --- a/docs/website/docs/general-usage/schema.md +++ b/docs/website/docs/general-usage/schema.md @@ -317,7 +317,7 @@ def textual(nesting_level: int): schema.remove_type_detection("iso_timestamp") # convert UNIX timestamp (float, withing a year from NOW) into timestamp schema.add_type_detection("timestamp") - schema.compile_settings() + schema._compile_settings() - return dlt.resource(...) + return dlt.resource([]) ``` diff --git a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md index fc32aa2c30..29a0ae86f8 100644 --- a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md +++ b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md @@ -17,10 +17,10 @@ You can setup GCP cloud function webhook using `dlt` as follows: ```py import dlt - import json import time from google.cloud import bigquery - + from dlt.common import json + def your_webhook(request): # Extract relevant data from the request payload data = request.get_json() @@ -40,7 +40,7 @@ You can setup GCP cloud function webhook using `dlt` as follows: 7. Set the function name as "your_webhook" in the Entry point field. 8. In the requirements.txt file, specify the necessary packages: - ```py + ```text # Function dependencies, for example: # package>=version dlt