From 2c0340915054ac16722975c0b8c0217d138f2ee5 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 7 Feb 2024 00:24:42 +0100 Subject: [PATCH 1/3] adds current context for source and resource name --- dlt/common/source.py | 5 ++++- dlt/extract/decorators.py | 31 +++++++++++++++++++++++++++---- dlt/extract/exceptions.py | 10 +++++++++- dlt/extract/extract.py | 6 ++++-- dlt/pipeline/current.py | 7 +++++-- 5 files changed, 49 insertions(+), 10 deletions(-) diff --git a/dlt/common/source.py b/dlt/common/source.py index 249d54b4c5..ea2a25f1d7 100644 --- a/dlt/common/source.py +++ b/dlt/common/source.py @@ -34,7 +34,10 @@ def unset_current_pipe_name() -> None: def get_current_pipe_name() -> str: - """Gets pipe name associated with current thread""" + """When executed from withing dlt.resource decorated function, gets pipe name associated with current thread. + + Pipe name is the same as resource name for all currently known cases. In some multithreading cases, pipe name may be not available. + """ name = _CURRENT_PIPE_NAME.get(threading.get_ident()) if name is None: raise ResourceNameNotAvailable() diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index cf7426e683..d86fd04ef4 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -37,6 +37,7 @@ TSchemaContract, TTableFormat, ) +from dlt.extract.hints import make_hints from dlt.extract.utils import ( ensure_table_schema_columns_hint, simulate_func_call, @@ -48,6 +49,7 @@ from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable from dlt.extract.exceptions import ( + CurrentSourceNotAvailable, DynamicNameNotStandaloneResource, InvalidTransformerDataTypeGeneratorFunctionRequired, ResourceFunctionExpected, @@ -56,7 +58,7 @@ SourceIsAClassTypeError, ExplicitSourceNameInvalid, SourceNotAFunction, - SourceSchemaNotAvailable, + CurrentSourceSchemaNotAvailable, ) from dlt.extract.incremental import IncrementalResourceWrapper @@ -67,7 +69,7 @@ @configspec class SourceSchemaInjectableContext(ContainerInjectableContext): - """A context containing the source schema, present when decorated function is executed""" + """A context containing the source schema, present when dlt.source/resource decorated function is executed""" schema: Schema @@ -78,6 +80,19 @@ class SourceSchemaInjectableContext(ContainerInjectableContext): def __init__(self, schema: Schema = None) -> None: ... +@configspec +class SourceInjectableContext(ContainerInjectableContext): + """A context containing the source schema, present when dlt.resource decorated function is executed""" + + source: DltSource + + can_create_default: ClassVar[bool] = False + + if TYPE_CHECKING: + + def __init__(self, source: DltSource = None) -> None: ... + + TSourceFunParams = ParamSpec("TSourceFunParams") TResourceFunParams = ParamSpec("TResourceFunParams") TDltSourceImpl = TypeVar("TDltSourceImpl", bound=DltSource, default=DltSource) @@ -395,7 +410,7 @@ def resource( def make_resource( _name: str, _section: str, _data: Any, incremental: IncrementalResourceWrapper = None ) -> DltResource: - table_template = DltResource.new_table_template( + table_template = make_hints( table_name, write_disposition=write_disposition, columns=columns, @@ -694,7 +709,15 @@ def get_source_schema() -> Schema: try: return Container()[SourceSchemaInjectableContext].schema except ContextDefaultCannotBeCreated: - raise SourceSchemaNotAvailable() + raise CurrentSourceSchemaNotAvailable() + + +def get_source() -> DltSource: + """When executed from the function decorated with @dlt.resource, returns currently extracted source""" + try: + return Container()[SourceInjectableContext].source + except ContextDefaultCannotBeCreated: + raise CurrentSourceNotAvailable() TBoundItems = TypeVar("TBoundItems", bound=TDataItems) diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index 8e7d0dddf8..4d489e5a9b 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -377,7 +377,7 @@ def __init__(self, source_name: str, _typ: Type[Any]) -> None: ) -class SourceSchemaNotAvailable(DltSourceException): +class CurrentSourceSchemaNotAvailable(DltSourceException): def __init__(self) -> None: super().__init__( "Current source schema is available only when called from a function decorated with" @@ -385,6 +385,14 @@ def __init__(self) -> None: ) +class CurrentSourceNotAvailable(DltSourceException): + def __init__(self) -> None: + super().__init__( + "Current source is available only when called from a function decorated with" + " dlt.resource or dlt.transformer during the extract step" + ) + + class ExplicitSourceNameInvalid(DltSourceException): def __init__(self, source_name: str, schema_name: str) -> None: self.source_name = source_name diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 9ff3cf872c..c1ff5da80b 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -31,7 +31,7 @@ from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.utils import get_callable_name, get_full_class_name -from dlt.extract.decorators import SourceSchemaInjectableContext +from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints from dlt.extract.pipe import PipeIterator from dlt.extract.source import DltSource @@ -322,7 +322,9 @@ def extract( ) -> str: # generate load package to be able to commit all the sources together later load_id = self.extract_storage.create_load_package(source.discover_schema()) - with Container().injectable_context(SourceSchemaInjectableContext(source.schema)): + with Container().injectable_context( + SourceSchemaInjectableContext(source.schema) + ), Container().injectable_context(SourceInjectableContext(source)): # inject the config section with the current source name with inject_section( ConfigSectionContext( diff --git a/dlt/pipeline/current.py b/dlt/pipeline/current.py index f915a30932..7fdc0f095c 100644 --- a/dlt/pipeline/current.py +++ b/dlt/pipeline/current.py @@ -1,11 +1,14 @@ """Easy access to active pipelines, state, sources and schemas""" -from dlt.common.pipeline import source_state as _state, resource_state +from dlt.common.pipeline import source_state as _state, resource_state, get_current_pipe_name from dlt.pipeline import pipeline as _pipeline -from dlt.extract.decorators import get_source_schema +from dlt.extract.decorators import get_source_schema, get_source pipeline = _pipeline """Alias for dlt.pipeline""" state = source_state = _state """Alias for dlt.state""" source_schema = get_source_schema +source = get_source +pipe_name = get_current_pipe_name +resource_name = get_current_pipe_name From 60ebffa12b34255e0e56bbee5a691cf44dc3f0ae Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 7 Feb 2024 00:25:28 +0100 Subject: [PATCH 2/3] adds a mark method to emit and update resource hints from decorated function --- dlt/extract/__init__.py | 5 +- dlt/extract/extractors.py | 16 ++- dlt/extract/hints.py | 129 +++++++++++------- dlt/extract/resource.py | 13 +- dlt/extract/source.py | 4 +- dlt/pipeline/mark.py | 4 +- docs/examples/chess_production/chess.py | 12 +- docs/examples/connector_x_arrow/load_arrow.py | 2 + docs/examples/google_sheets/google_sheets.py | 5 +- docs/examples/incremental_loading/zendesk.py | 8 +- docs/examples/nested_data/nested_data.py | 2 + .../pdf_to_weaviate/pdf_to_weaviate.py | 5 +- docs/examples/qdrant_zendesk/qdrant.py | 9 +- docs/examples/transformers/pokemon.py | 4 +- .../common/storages/samples/gzip/taxi.csv.gz | Bin 899 -> 897 bytes tests/extract/test_decorators.py | 4 +- tests/extract/test_extract.py | 81 +++++++++-- tests/pipeline/test_pipeline.py | 24 ++++ tests/pipeline/test_pipeline_extra.py | 26 ++++ 19 files changed, 269 insertions(+), 84 deletions(-) diff --git a/dlt/extract/__init__.py b/dlt/extract/__init__.py index 9dcffdacb9..78e246cd46 100644 --- a/dlt/extract/__init__.py +++ b/dlt/extract/__init__.py @@ -1,4 +1,5 @@ -from dlt.extract.resource import DltResource, with_table_name +from dlt.extract.resource import DltResource, with_table_name, with_hints +from dlt.extract.hints import make_hints from dlt.extract.source import DltSource from dlt.extract.decorators import source, resource, transformer, defer from dlt.extract.incremental import Incremental @@ -8,6 +9,8 @@ "DltResource", "DltSource", "with_table_name", + "with_hints", + "make_hints", "source", "resource", "transformer", diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index bc32893677..f6c3fde5d4 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -18,7 +18,7 @@ TTableSchemaColumns, TPartialTableSchema, ) - +from dlt.extract.hints import HintsMeta from dlt.extract.resource import DltResource from dlt.extract.typing import TableNameMeta from dlt.extract.storage import ExtractStorage, ExtractorItemStorage @@ -85,6 +85,12 @@ def item_format(items: TDataItems) -> Optional[TLoaderFileFormat]: def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> None: """Write `items` to `resource` optionally computing table schemas and revalidating/filtering data""" + if isinstance(meta, HintsMeta): + # update the resource with new hints, remove all caches so schema is recomputed + # and contracts re-applied + resource.merge_hints(meta.hints) + self._reset_contracts_cache() + if table_name := self._get_static_table_name(resource, meta): # write item belonging to table with static name self._write_to_static_table(resource, table_name, items) @@ -152,7 +158,7 @@ def _compute_and_update_table( self, resource: DltResource, table_name: str, items: TDataItems ) -> TDataItems: """ - Computes new table and does contract checks, if false is returned, the table may not be created and not items should be written + Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written """ computed_table = self._compute_table(resource, items) # overwrite table name (if coming from meta) @@ -190,6 +196,12 @@ def _compute_and_update_table( filtered_columns[name] = mode return items + def _reset_contracts_cache(self) -> None: + """Removes all cached contracts, filtered columns and tables""" + self._table_contracts.clear() + self._filtered_tables.clear() + self._filtered_columns.clear() + class JsonLExtractor(Extractor): file_format = "puae-jsonl" diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index c1a39041d8..61e1e2af34 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -5,7 +5,6 @@ from dlt.common.schema.typing import ( TColumnNames, TColumnProp, - TColumnSchema, TPartialTableSchema, TTableSchema, TTableSchemaColumns, @@ -23,7 +22,6 @@ from dlt.extract.exceptions import ( DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, - TableNameMissing, ) from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator @@ -40,10 +38,66 @@ class TResourceHints(TypedDict, total=False): merge_key: TTableHintTemplate[TColumnNames] incremental: Incremental[Any] schema_contract: TTableHintTemplate[TSchemaContract] + table_format: TTableHintTemplate[TTableFormat] validator: ValidateItem original_columns: TTableHintTemplate[TAnySchemaColumns] +class HintsMeta: + __slots__ = "hints" + + hints: TResourceHints + + def __init__(self, hints: TResourceHints) -> None: + self.hints = hints + + +def make_hints( + table_name: TTableHintTemplate[str] = None, + parent_table_name: TTableHintTemplate[str] = None, + write_disposition: TTableHintTemplate[TWriteDisposition] = None, + columns: TTableHintTemplate[TAnySchemaColumns] = None, + primary_key: TTableHintTemplate[TColumnNames] = None, + merge_key: TTableHintTemplate[TColumnNames] = None, + schema_contract: TTableHintTemplate[TSchemaContract] = None, + table_format: TTableHintTemplate[TTableFormat] = None, +) -> TResourceHints: + """A convenience function to create resource hints. Accepts both static and dynamic hints based on data. + + This method accepts the same table hints arguments as `dlt.resource` decorator. + """ + validator, schema_contract = create_item_validator(columns, schema_contract) + clean_columns = columns + if columns is not None: + clean_columns = ensure_table_schema_columns_hint(columns) + if not callable(clean_columns): + clean_columns = clean_columns.values() # type: ignore + # create a table schema template where hints can be functions taking TDataItem + new_template: TResourceHints = new_table( + table_name, # type: ignore + parent_table_name, # type: ignore + write_disposition=write_disposition, # type: ignore + columns=clean_columns, # type: ignore + schema_contract=schema_contract, # type: ignore + table_format=table_format, # type: ignore + ) + if not table_name: + new_template.pop("name") + # remember original columns + if columns is not None: + new_template["original_columns"] = columns + # always remove resource + new_template.pop("resource", None) # type: ignore + if primary_key is not None: + new_template["primary_key"] = primary_key + if merge_key is not None: + new_template["merge_key"] = merge_key + if validator: + new_template["validator"] = validator + DltResourceHints.validate_dynamic_hints(new_template) + return new_template + + class DltResourceHints: def __init__(self, table_schema_template: TResourceHints = None): self.__qualname__ = self.__name__ = self.name @@ -126,6 +180,7 @@ def apply_hints( incremental: Incremental[Any] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None, + table_format: TTableHintTemplate[TTableFormat] = None, ) -> None: """Creates or modifies existing table schema by setting provided hints. Accepts both static and dynamic hints based on data. @@ -142,7 +197,7 @@ def apply_hints( t = None if not self._hints: # if there's no template yet, create and set new one - t = self.new_table_template( + t = make_hints( table_name, parent_table_name, write_disposition, @@ -150,6 +205,7 @@ def apply_hints( primary_key, merge_key, schema_contract, + table_format, ) else: # set single hints @@ -176,7 +232,12 @@ def apply_hints( # normalize columns columns = ensure_table_schema_columns(columns) # this updates all columns with defaults + print("PREV") + print(t["columns"]) t["columns"] = update_dict_nested(t["columns"], columns) + print("POST") + print(t["columns"]) + print("------") else: # set to empty columns t["columns"] = ensure_table_schema_columns(columns) @@ -202,8 +263,13 @@ def apply_hints( ) if schema_contract is not None: t["schema_contract"] = schema_contract + if table_format is not None: + if table_format: + t["table_format"] = table_format + else: + t.pop("table_format", None) - # set properties that cannot be passed to new_table_template + # set properties that cannot be passed to make_hints if incremental is not None: if incremental is Incremental.EMPTY: t["incremental"] = None @@ -233,6 +299,19 @@ def set_hints(self, hints_template: TResourceHints) -> None: ) self._hints = hints_template + def merge_hints(self, hints_template: TResourceHints) -> None: + self.apply_hints( + table_name=hints_template.get("name"), + parent_table_name=hints_template.get("parent"), + write_disposition=hints_template.get("write_disposition"), + columns=hints_template.get("original_columns"), + primary_key=hints_template.get("primary_key"), + merge_key=hints_template.get("merge_key"), + incremental=hints_template.get("incremental"), + schema_contract=hints_template.get("schema_contract"), + table_format=hints_template.get("table_format"), + ) + @staticmethod def _clone_hints(hints_template: TResourceHints) -> TResourceHints: t_ = copy(hints_template) @@ -273,48 +352,6 @@ def _merge_keys(t_: TResourceHints) -> TPartialTableSchema: return partial - @staticmethod - def new_table_template( - table_name: TTableHintTemplate[str], - parent_table_name: TTableHintTemplate[str] = None, - write_disposition: TTableHintTemplate[TWriteDisposition] = None, - columns: TTableHintTemplate[TAnySchemaColumns] = None, - primary_key: TTableHintTemplate[TColumnNames] = None, - merge_key: TTableHintTemplate[TColumnNames] = None, - schema_contract: TTableHintTemplate[TSchemaContract] = None, - table_format: TTableHintTemplate[TTableFormat] = None, - ) -> TResourceHints: - validator, schema_contract = create_item_validator(columns, schema_contract) - clean_columns = columns - if columns is not None: - clean_columns = ensure_table_schema_columns_hint(columns) - if not callable(clean_columns): - clean_columns = clean_columns.values() # type: ignore - # create a table schema template where hints can be functions taking TDataItem - new_template: TResourceHints = new_table( - table_name, # type: ignore - parent_table_name, # type: ignore - write_disposition=write_disposition, # type: ignore - columns=clean_columns, # type: ignore - schema_contract=schema_contract, # type: ignore - table_format=table_format, # type: ignore - ) - if not table_name: - new_template.pop("name") - # remember original columns - if columns is not None: - new_template["original_columns"] = columns - # always remove resource - new_template.pop("resource", None) # type: ignore - if primary_key: - new_template["primary_key"] = primary_key - if merge_key: - new_template["merge_key"] = merge_key - if validator: - new_template["validator"] = validator - DltResourceHints.validate_dynamic_hints(new_template) - return new_template - @staticmethod def validate_dynamic_hints(template: TResourceHints) -> None: table_name = template.get("name") diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index ac7339ec7c..3d03486436 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -1,6 +1,5 @@ from copy import deepcopy import inspect -import asyncio from typing import ( AsyncIterable, AsyncIterator, @@ -38,7 +37,7 @@ ValidateItem, ) from dlt.extract.pipe import Pipe, ManagedPipeIterator, TPipeStep -from dlt.extract.hints import DltResourceHints, TResourceHints +from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints from dlt.extract.incremental import Incremental, IncrementalResourceWrapper from dlt.extract.exceptions import ( InvalidTransformerDataTypeGeneratorFunctionRequired, @@ -47,7 +46,6 @@ InvalidResourceDataType, InvalidResourceDataTypeIsNone, InvalidTransformerGeneratorFunction, - InvalidResourceDataTypeAsync, InvalidResourceDataTypeBasic, InvalidResourceDataTypeMultiplePipes, ParametrizedResourceUnbound, @@ -62,6 +60,15 @@ def with_table_name(item: TDataItems, table_name: str) -> DataItemWithMeta: return DataItemWithMeta(TableNameMeta(table_name), item) +def with_hints(item: TDataItems, hints: TResourceHints) -> DataItemWithMeta: + """Marks `item` to update the resource with specified `hints`. + + Create `TResourceHints` with `make_hints`. + Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name` + """ + return DataItemWithMeta(HintsMeta(hints), item) + + class DltResource(Iterable[TDataItem], DltResourceHints): """Implements dlt resource. Contains a data pipe that wraps a generating item and table schema that can be adjusted""" diff --git a/dlt/extract/source.py b/dlt/extract/source.py index b1f59f7bda..bc33394d4d 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -25,7 +25,7 @@ from dlt.extract.typing import TDecompositionStrategy from dlt.extract.pipe import Pipe, ManagedPipeIterator -from dlt.extract.hints import DltResourceHints +from dlt.extract.hints import DltResourceHints, make_hints from dlt.extract.resource import DltResource from dlt.extract.exceptions import ( DataItemRequiredForDynamicTableHints, @@ -64,7 +64,7 @@ def extracted(self) -> Dict[str, DltResource]: resource = self[pipe.name] except KeyError: # resource for pipe not found: return mock resource - mock_template = DltResourceHints.new_table_template( + mock_template = make_hints( pipe.name, write_disposition=resource.write_disposition ) resource = DltResource(pipe, mock_template, False, section=resource.section) diff --git a/dlt/pipeline/mark.py b/dlt/pipeline/mark.py index 3b9b3ccfc7..0aba0e19ae 100644 --- a/dlt/pipeline/mark.py +++ b/dlt/pipeline/mark.py @@ -1,2 +1,2 @@ -"""Module with market functions that make data to be specially processed""" -from dlt.extract import with_table_name +"""Module with mark functions that make data to be specially processed""" +from dlt.extract import with_table_name, with_hints, make_hints diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index 2e85805781..f7c5849e57 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -6,6 +6,7 @@ from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client + @dlt.source def chess( chess_url: str = dlt.config.value, @@ -59,6 +60,7 @@ def players_games(username: Any) -> Iterator[TDataItems]: MAX_PLAYERS = 5 + def load_data_with_retry(pipeline, data): try: for attempt in Retrying( @@ -68,9 +70,7 @@ def load_data_with_retry(pipeline, data): reraise=True, ): with attempt: - logger.info( - f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" - ) + logger.info(f"Running the pipeline, attempt={attempt.retry_state.attempt_number}") load_info = pipeline.run(data) logger.info(str(load_info)) @@ -92,9 +92,7 @@ def load_data_with_retry(pipeline, data): # print the information on the first load package and all jobs inside logger.info(f"First load package info: {load_info.load_packages[0]}") # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) + logger.info(f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}") # check for schema updates: schema_updates = [p.schema_update for p in load_info.load_packages] @@ -152,4 +150,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_data_with_retry(pipeline, data) \ No newline at end of file + load_data_with_retry(pipeline, data) diff --git a/docs/examples/connector_x_arrow/load_arrow.py b/docs/examples/connector_x_arrow/load_arrow.py index 24ba2acb0e..307e657514 100644 --- a/docs/examples/connector_x_arrow/load_arrow.py +++ b/docs/examples/connector_x_arrow/load_arrow.py @@ -3,6 +3,7 @@ import dlt from dlt.sources.credentials import ConnectionStringCredentials + def read_sql_x( conn_str: ConnectionStringCredentials = dlt.secrets.value, query: str = dlt.config.value, @@ -14,6 +15,7 @@ def read_sql_x( protocol="binary", ) + def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 8a93df9970..1ba330e4ca 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,6 +9,7 @@ ) from dlt.common.typing import DictStrAny, StrAny + def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -16,6 +17,7 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service + @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] + if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) \ No newline at end of file + print(info) diff --git a/docs/examples/incremental_loading/zendesk.py b/docs/examples/incremental_loading/zendesk.py index 4b8597886a..6113f98793 100644 --- a/docs/examples/incremental_loading/zendesk.py +++ b/docs/examples/incremental_loading/zendesk.py @@ -6,12 +6,11 @@ from dlt.common.typing import TAnyDateTime from dlt.sources.helpers.requests import client + @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 - year=2000, month=1, day=1 - ), + start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 end_date: Optional[TAnyDateTime] = None, ): """ @@ -113,6 +112,7 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] + if __name__ == "__main__": # create dlt pipeline pipeline = dlt.pipeline( @@ -120,4 +120,4 @@ def get_pages( ) load_info = pipeline.run(zendesk_support()) - print(load_info) \ No newline at end of file + print(load_info) diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 3464448de6..7f85f0522e 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -13,6 +13,7 @@ CHUNK_SIZE = 10000 + # You can limit how deep dlt goes when generating child tables. # By default, the library will descend and generate child tables # for all nested lists, without a limit. @@ -81,6 +82,7 @@ def load_documents(self) -> Iterator[TDataItem]: while docs_slice := list(islice(cursor, CHUNK_SIZE)): yield map_nested_in_place(convert_mongo_objs, docs_slice) + def convert_mongo_objs(value: Any) -> Any: if isinstance(value, (ObjectId, Decimal128)): return str(value) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index 8f7833e7d7..e7f57853ed 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -4,6 +4,7 @@ from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader + @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -15,6 +16,7 @@ def list_files(folder_path: str): "mtime": os.path.getmtime(file_path), } + @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -28,6 +30,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item + pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" @@ -51,4 +54,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) diff --git a/docs/examples/qdrant_zendesk/qdrant.py b/docs/examples/qdrant_zendesk/qdrant.py index 300d8dc6ad..bd0cbafc99 100644 --- a/docs/examples/qdrant_zendesk/qdrant.py +++ b/docs/examples/qdrant_zendesk/qdrant.py @@ -10,13 +10,12 @@ from dlt.common.configuration.inject import with_config + # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 - year=2000, month=1, day=1 - ), + start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 end_date: Optional[TAnyDateTime] = None, ): """ @@ -80,6 +79,7 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: return None return ensure_pendulum_datetime(value) + # modify dates to return datetime objects instead def _fix_date(ticket): ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) @@ -87,6 +87,7 @@ def _fix_date(ticket): ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) return ticket + # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk def get_pages( url: str, @@ -127,6 +128,7 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] + if __name__ == "__main__": # create a pipeline with an appropriate name pipeline = dlt.pipeline( @@ -146,7 +148,6 @@ def get_pages( print(load_info) - # running the Qdrant client to connect to your Qdrant database @with_config(sections=("destination", "qdrant", "credentials")) diff --git a/docs/examples/transformers/pokemon.py b/docs/examples/transformers/pokemon.py index c17beff6a8..97b9a98b11 100644 --- a/docs/examples/transformers/pokemon.py +++ b/docs/examples/transformers/pokemon.py @@ -1,6 +1,7 @@ import dlt from dlt.sources.helpers import requests + @dlt.source(max_table_nesting=2) def source(pokemon_api_url: str): """""" @@ -46,6 +47,7 @@ def species(pokemon_details): return (pokemon_list | pokemon, pokemon_list | pokemon | species) + if __name__ == "__main__": # build duck db pipeline pipeline = dlt.pipeline( @@ -54,4 +56,4 @@ def species(pokemon_details): # the pokemon_list resource does not need to be loaded load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon")) - print(load_info) \ No newline at end of file + print(load_info) diff --git a/tests/common/storages/samples/gzip/taxi.csv.gz b/tests/common/storages/samples/gzip/taxi.csv.gz index 184c0740d47bfefe12c14ed8728d63fa59a4620a..c74f9c86ea61722fb345def3b1a5cab6d141840f 100644 GIT binary patch literal 897 zcmV-{1AhD;iwFotce!N%19V|{X)a@Pb^w)CU2EJ%6n)RHSo}N^bH8WyEm=}raG=yp zOCCimVv$91WjP`LzIQZQH`FR5*xt2fuk3TqJ?H2OFvg^u39iBwMvx|26X6i!(Z7LS zoB|)&XB(q7=g3MZnn*GSj;*l*s8WJ@v`$%uWD&spP1dQzY9cuSvrv#gt#AN;4bguf z!~oK!aJsz??}oc&`Z_k-`{8zaTJFR9hlhSX{MA1$huh(jgUh>RI>#tRK)Q@_^)nZfY~Xjy`;9HM_&OX2kU+wgjr zAEtS^ysuz^{Vs`nVDaTCz%g!Ma}vuO)hJxpk`PqIjFNhz_mks`}f`mHl%#pSw?eSY_TWR`5u^al<|EyhBNjtXCT@bmXFv z!Ln6ci6|z9sP2VrNPQik4!H1HUFDd48I*GongY7m7o9IEe~&U;Kixe}` z`~Ljccg-*7ZW-ojY~GK9e|Y=%Q#WtcN#wTOv5xD$W1W0Nx@@DM1SK&>tXPQ|g@j|I zyb+QDU&;Ma{bkNBQ`ATii@zYCv&btWEQ=_YWe-2jBiv5cQ}f3>yy;I}a~i%39}lN~ zT>81W8J6YTAAViB^S5d~QiQ!sRGaEhz5*AloRs`op+%B#F~E6nE(kCccSK$cUC1wz-?6m4&9vxu6iw9 zRPc^d@^0P;3wric*pfiqr}+SQ$@%SID^t(uzqM?eFKhph>t5_+|!(req++_n_SdF9e;Th*ak(EHmW)jo{=|JYys z8fC3zga5C(AInX6(=C4X?eJSSe?FXsqE%)Qu}wLa#I3O%}gg7xEGH zQY0?=mJqCsZ6!vErb$HV5O{kFDVb(}Zm!XCzxoJieYpC=v_+3t-q`w;lf}yPW*9$C X=VvFreq5$`=wAH;$=jC20|)>BS)jrU literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index d1ff98fc26..3c15bf37f5 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -35,7 +35,7 @@ SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, - SourceSchemaNotAvailable, + CurrentSourceSchemaNotAvailable, ) from dlt.extract.typing import TableNameMeta @@ -500,7 +500,7 @@ def test_source_schema_context() -> None: global_schema = Schema("global") # not called from the source - with pytest.raises(SourceSchemaNotAvailable): + with pytest.raises(CurrentSourceSchemaNotAvailable): dlt.current.source_schema() def _assert_source_schema(s: DltSource, expected_name: str) -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index ccf47c24bd..28b08c3648 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -11,7 +11,9 @@ from dlt.common.storages.schema_storage import SchemaStorage from dlt.extract import DltResource, DltSource +from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints from dlt.extract.extract import ExtractStorage, Extract +from dlt.extract.hints import make_hints from tests.utils import clean_test_storage, TEST_STORAGE_ROOT from tests.extract.utils import expect_extracted_file @@ -71,8 +73,7 @@ def table_with_name_selectable(_range): yield dlt.mark.with_table_name(i, n_f(i)) schema = expect_tables(extract_step, table_with_name_selectable) - # TODO: this one should not be there but we cannot remove it really, except explicit flag - assert "table_with_name_selectable" in schema.tables + assert "table_with_name_selectable" not in schema.tables def test_extract_select_tables_lambda(extract_step: Extract) -> None: @@ -88,6 +89,73 @@ def table_name_with_lambda(_range): assert "table_name_with_lambda" not in schema.tables +def test_extract_hints_mark(extract_step: Extract) -> None: + @dlt.resource + def with_table_hints(): + yield dlt.mark.with_hints( + {"id": 1, "pk": "A"}, + make_hints(columns=[{"name": "id", "data_type": "bigint"}], primary_key="pk"), + ) + schema = dlt.current.source_schema() + # table and columns got updated in the schema + assert "with_table_hints" in schema.tables + table = schema.tables["with_table_hints"] + assert "pk" in table["columns"] + assert "id" in table["columns"] + assert table["columns"]["pk"]["primary_key"] is True + assert table["columns"]["id"]["data_type"] == "bigint" + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + table = resource.compute_table_schema() + # also there we see the hints + assert table["columns"]["pk"]["primary_key"] is True + assert table["columns"]["id"]["data_type"] == "bigint" + + # add more columns and primary key + yield dlt.mark.with_hints( + {"id": 1, "pk2": "B"}, + make_hints( + write_disposition="merge", + columns=[{"name": "id", "precision": 16}, {"name": "text", "data_type": "decimal"}], + primary_key="pk2", + ), + ) + # previous columns kept + table = resource.compute_table_schema() + assert schema is dlt.current.source().schema + # previous primary key is gone from the resource + assert "pk" not in table["columns"] + assert table["columns"]["id"]["data_type"] == "bigint" + assert table["columns"]["id"]["precision"] == 16 + assert "text" in table["columns"] + assert table["write_disposition"] == "merge" + # still it is kept in the schema that is merged from resource each time it changes + table = schema.tables["with_table_hints"] + assert "pk" in table["columns"] + assert "text" in table["columns"] + assert table["write_disposition"] == "merge" + + # make table name dynamic + yield dlt.mark.with_hints( + {"namer": "dynamic"}, make_hints(table_name=lambda item: f"{item['namer']}_table") + ) + # dynamic table was created in the schema and it contains the newest resource table schema + table = schema.tables["dynamic_table"] + # so pk is not available + assert "pk" not in table["columns"] + assert "pk2" in table["columns"] + assert "id" in table["columns"] + assert "text" in table["columns"] + # get dynamic schema from resource + with pytest.raises(DataItemRequiredForDynamicTableHints): + table = resource.compute_table_schema() + + source = DltSource(dlt.Schema("hintable"), "module", [with_table_hints]) + extract_step.extract(source, 20, 1) + table = source.schema.tables["dynamic_table"] + assert "pk" not in table["columns"] + + # def test_extract_pipe_from_unknown_resource(): # pass @@ -100,8 +168,7 @@ def input_gen(): source = DltSource( dlt.Schema("selectables"), "module", [input_r, input_r.with_name("gen_clone")] ) - load_id = extract_step.extract_storage.create_load_package(source.discover_schema()) - extract_step._extract_single_source(load_id, source) + extract_step.extract(source, 20, 1) # both tables got generated assert "input_gen" in source.schema._schema_tables assert "gen_clone" in source.schema._schema_tables @@ -120,8 +187,7 @@ def tx_step(item): source = DltSource( dlt.Schema("selectables"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")] ) - load_id = extract_step.extract_storage.create_load_package(source.discover_schema()) - extract_step._extract_single_source(load_id, source) + extract_step.extract(source, 20, 1) assert "input_gen" in source.schema._schema_tables assert "tx_clone" in source.schema._schema_tables # mind that pipe name of the evaluated parent will have different name than the resource @@ -130,8 +196,6 @@ def tx_step(item): def expect_tables(extract_step: Extract, resource: DltResource) -> dlt.Schema: source = DltSource(dlt.Schema("selectables"), "module", [resource(10)]) - schema = source.discover_schema() - load_id = extract_step.extract_storage.create_load_package(source.discover_schema()) extract_step._extract_single_source(load_id, source) # odd and even tables must be in the source schema @@ -149,6 +213,7 @@ def expect_tables(extract_step: Extract, resource: DltResource) -> dlt.Schema: expect_extracted_file( extract_step.extract_storage, "selectables", "even_table", json.dumps([0, 2, 4, 6, 8]) ) + schema = source.schema # same thing but select only odd source = DltSource(dlt.Schema("selectables"), "module", [resource]) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 7e99027e08..cdc4a02509 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -414,6 +414,30 @@ def i_fail(): assert set(p._schema_storage.list_schemas()) == {"default", "default_2"} +def test_mark_hints() -> None: + # this resource emits table schema with first item + @dlt.resource + def with_mark(): + yield dlt.mark.with_hints( + {"id": 1}, + dlt.mark.make_hints( + table_name="spec_table", write_disposition="merge", primary_key="id" + ), + ) + yield {"id": 2} + + p = dlt.pipeline(destination="dummy", pipeline_name="mark_pipeline") + p.extract(with_mark()) + storage = ExtractStorage(p._normalize_storage_config()) + expect_extracted_file(storage, "mark", "spec_table", json.dumps([{"id": 1}, {"id": 2}])) + p.normalize() + # no "with_mark" table in the schema: we update resource hints before any table schema is computed + assert "with_mark" not in p.default_schema.tables + assert "spec_table" in p.default_schema.tables + # resource name is kept + assert p.default_schema.tables["spec_table"]["resource"] == "with_mark" + + def test_restore_state_on_dummy() -> None: os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index dd60002e6c..856e716134 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -192,6 +192,32 @@ def users() -> Iterator[User]: ) +def test_mark_hints_pydantic_columns() -> None: + pipeline = dlt.pipeline(destination="duckdb") + + class User(BaseModel): + user_id: int + name: str + + # this resource emits table schema with first item + @dlt.resource + def with_mark(): + yield dlt.mark.with_hints( + {"user_id": 1, "name": "zenek"}, + dlt.mark.make_hints(columns=User, primary_key="user_id"), + ) + + pipeline.run(with_mark) + # pydantic schema used to create columns + assert "with_mark" in pipeline.default_schema.tables + # resource name is kept + table = pipeline.default_schema.tables["with_mark"] + assert table["resource"] == "with_mark" + assert table["columns"]["user_id"]["data_type"] == "bigint" + assert table["columns"]["user_id"]["primary_key"] is True + assert table["columns"]["name"]["data_type"] == "text" + + @pytest.mark.parametrize("file_format", ("parquet", "insert_values", "jsonl")) def test_columns_hint_with_file_formats(file_format: TLoaderFileFormat) -> None: @dlt.resource(write_disposition="replace", columns=[{"name": "text", "data_type": "text"}]) From dd1af51cece14978bf816adf6f97c4dfee716435 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 7 Feb 2024 12:40:11 +0100 Subject: [PATCH 3/3] adds with_hints docs --- dlt/extract/hints.py | 5 --- docs/website/docs/general-usage/resource.md | 42 ++++++++++++++++++++- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 61e1e2af34..c9f6327d3c 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -232,12 +232,7 @@ def apply_hints( # normalize columns columns = ensure_table_schema_columns(columns) # this updates all columns with defaults - print("PREV") - print(t["columns"]) t["columns"] = update_dict_nested(t["columns"], columns) - print("POST") - print(t["columns"]) - print("------") else: # set to empty columns t["columns"] = ensure_table_schema_columns(columns) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index b29aacea3b..3b08a0b8ab 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -136,7 +136,6 @@ behaviour of creating child tables for these fields. We do not support `RootModel` that validate simple types. You can add such validator yourself, see [data filtering section](#filter-transform-and-pivot-data). - ### Dispatch data to many tables You can load data to many tables from a single resource. The most common case is a stream of events @@ -227,7 +226,7 @@ pipeline.run(users(limit=100) | user_details) ### Declare a standalone resource A standalone resource is defined on a function that is top level in a module (not inner function) that accepts config and secrets values. Additionally if `standalone` flag is specified, the decorated function signature and docstring will be preserved. `dlt.resource` will just wrap the -function decorated function and user must call the wrapper to get the actual resource. Below we declare a `filesystem` resource that must be called before use. +decorated function and user must call the wrapper to get the actual resource. Below we declare a `filesystem` resource that must be called before use. ```python @dlt.resource(standalone=True) def filesystem(bucket_url=dlt.config.value): @@ -339,6 +338,45 @@ tables = sql_database() tables.users.table_name = "other_users" ``` +### Adjust schema when you yield data + +You can set or update the table name, columns and other schema elements when your resource is executed and you already yield data. Such changes will be merged +with the existing schema in the same way `apply_hints` method above works. There are many reason to adjust schema at runtime. For example when using Airflow, you +should avoid lengthy operations (ie. reflecting database tables) during creation of the DAG so it is better do do it when DAG executes. You may also emit partial +hints (ie. precision and scale for decimal types) for column to help `dlt` type inference. + +```python +@dlt.resource +def sql_table(credentials, schema, table): + # create sql alchemy engine + engine = engine_from_credentials(credentials) + engine.execution_options(stream_results=True) + metadata = MetaData(schema=schema) + # reflect the table schema + table_obj = Table(table, metadata, autoload_with=engine) + + for idx, batch in enumerate(table_rows(engine, table_obj)): + if idx == 0: + # emit first row with hints, table_to_columns and get_primary_key are helpers that extract dlt schema from + # SqlAlchemy model + yield dlt.mark.with_hints( + batch, + dlt.mark.make_hints(columns=table_to_columns(table_obj), primary_key=get_primary_key(table_obj)), + ) + else: + # just yield all the other rows + yield batch + +``` + +In the example above we use `dlt.mark.with_hints` and `dlt.mark.make_hints` to emit columns and primary key with the first extracted item. Table schema will +be adjusted after the `batch` is processed in the extract pipeline but before any schema contracts are applied and data is persisted in load package. + +:::tip +You can emit columns as Pydantic model and use dynamic hints (ie. lambda for table name) as well. You should avoid redefining `Incremental` this way. +::: + + ### Duplicate and rename resources There are cases when you your resources are generic (ie. bucket filesystem) and you want to load several instances of it (ie. files from different folders) to separate tables. In example below we use `filesystem` source to load csvs from two different folders into separate tables: ```python