diff --git a/dlt/cli/deploy_command_helpers.py b/dlt/cli/deploy_command_helpers.py index 5065ba1cfc..5fe46415dd 100644 --- a/dlt/cli/deploy_command_helpers.py +++ b/dlt/cli/deploy_command_helpers.py @@ -263,22 +263,25 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio if n.PIPELINE in visitor.known_calls: for call_args in visitor.known_calls[n.PIPELINE]: pipeline_name, pipelines_dir = None, None - f_r_node = call_args.arguments.get("full_refresh") + # Check both full_refresh/dev_mode until full_refresh option is removed from dlt + f_r_node = call_args.arguments.get("full_refresh") or call_args.arguments.get( + "dev_mode" + ) if f_r_node: f_r_value = evaluate_node_literal(f_r_node) if f_r_value is None: fmt.warning( - "The value of `full_refresh` in call to `dlt.pipeline` cannot be" + "The value of `dev_mode` in call to `dlt.pipeline` cannot be" f" determined from {unparse(f_r_node).strip()}. We assume that you know" " what you are doing :)" ) if f_r_value is True: if fmt.confirm( - "The value of 'full_refresh' is set to True. Do you want to abort to set it" - " to False?", + "The value of 'dev_mode' or 'full_refresh' is set to True. Do you want to" + " abort to set it to False?", default=True, ): - raise CliCommandException("deploy", "Please set the full_refresh to False") + raise CliCommandException("deploy", "Please set the dev_mode to False") p_d_node = call_args.arguments.get("pipelines_dir") if p_d_node: diff --git a/dlt/common/configuration/resolve.py b/dlt/common/configuration/resolve.py index 9101cfdd9c..c9644713b5 100644 --- a/dlt/common/configuration/resolve.py +++ b/dlt/common/configuration/resolve.py @@ -286,7 +286,7 @@ def _resolve_config_field( embedded_sections: Tuple[str, ...], accept_partial: bool, ) -> Tuple[Any, List[LookupTrace]]: - inner_hint = extract_inner_hint(hint) + inner_hint = extract_inner_hint(hint, preserve_literal=True) if explicit_value is not None: value = explicit_value diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index 006cde8dce..0456a5374a 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -19,6 +19,7 @@ overload, ClassVar, TypeVar, + Literal, ) from typing_extensions import get_args, get_origin, dataclass_transform, Annotated, TypeAlias from functools import wraps @@ -120,13 +121,18 @@ def is_valid_hint(hint: Type[Any]) -> bool: return False -def extract_inner_hint(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]: +def extract_inner_hint( + hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False +) -> Type[Any]: # extract hint from Optional / Literal / NewType hints - inner_hint = extract_inner_type(hint, preserve_new_types) + inner_hint = extract_inner_type(hint, preserve_new_types, preserve_literal) # get base configuration from union type inner_hint = get_config_if_union_hint(inner_hint) or inner_hint # extract origin from generic types (ie List[str] -> List) - return get_origin(inner_hint) or inner_hint + origin = get_origin(inner_hint) or inner_hint + if preserve_literal and origin is Literal: + return inner_hint + return origin or inner_hint def is_secret_hint(hint: Type[Any]) -> bool: diff --git a/dlt/common/configuration/utils.py b/dlt/common/configuration/utils.py index 51e6b5615a..8f3c1789ce 100644 --- a/dlt/common/configuration/utils.py +++ b/dlt/common/configuration/utils.py @@ -2,7 +2,20 @@ import ast import contextlib import tomlkit -from typing import Any, Dict, Mapping, NamedTuple, Optional, Tuple, Type, Sequence +from typing import ( + Any, + Dict, + Mapping, + NamedTuple, + Optional, + Tuple, + Type, + Sequence, + get_args, + Literal, + get_origin, + List, +) from collections.abc import Mapping as C_Mapping from dlt.common.json import json @@ -51,25 +64,35 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny: raise return c # type: ignore + literal_values: Tuple[Any, ...] = () + if get_origin(hint) is Literal: + # Literal fields are validated against the literal values + literal_values = get_args(hint) + hint_origin = type(literal_values[0]) + else: + hint_origin = hint + # coerce value - hint_dt = py_type_to_sc_type(hint) + hint_dt = py_type_to_sc_type(hint_origin) value_dt = py_type_to_sc_type(type(value)) # eval only if value is string and hint is "complex" if value_dt == "text" and hint_dt == "complex": - if hint is tuple: + if hint_origin is tuple: # use literal eval for tuples value = ast.literal_eval(value) else: # use json for sequences and mappings value = json.loads(value) # exact types must match - if not isinstance(value, hint): + if not isinstance(value, hint_origin): raise ValueError(value) else: # for types that are not complex, reuse schema coercion rules if value_dt != hint_dt: value = coerce_value(hint_dt, value_dt, value) + if literal_values and value not in literal_values: + raise ConfigValueCannotBeCoercedException(key, value, hint) return value # type: ignore except ConfigValueCannotBeCoercedException: raise diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 8baf872752..6cefdd9e6c 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -21,6 +21,7 @@ TypeVar, TypedDict, Mapping, + Literal, ) from typing_extensions import NotRequired @@ -52,6 +53,10 @@ from dlt.common.versioned_state import TVersionedState +# TRefreshMode = Literal["full", "replace"] +TRefreshMode = Literal["drop_sources", "drop_resources", "drop_data"] + + class _StepInfo(NamedTuple): pipeline: "SupportsPipeline" loads_ids: List[str] @@ -762,6 +767,14 @@ def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] state_["resources"].pop(resource_name) +def _get_matching_sources( + pattern: REPattern, pipeline_state: Optional[TPipelineState] = None, / +) -> List[str]: + """Get all source names in state matching the regex pattern""" + state_ = _sources_state(pipeline_state) + return [key for key in state_ if pattern.match(key)] + + def _get_matching_resources( pattern: REPattern, source_state_: Optional[DictStrAny] = None, / ) -> List[str]: diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index fb7ad226f1..6d5dc48907 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -438,6 +438,17 @@ def update_schema(self, schema: "Schema") -> None: self._settings = deepcopy(schema.settings) self._compile_settings() + def drop_tables( + self, table_names: Sequence[str], seen_data_only: bool = False + ) -> List[TTableSchema]: + """Drops tables from the schema and returns the dropped tables""" + result = [] + for table_name in table_names: + table = self.tables.get(table_name) + if table and (not seen_data_only or utils.has_table_seen_data(table)): + result.append(self._schema_tables.pop(table_name)) + return result + def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny: rv_row: DictStrAny = {} column_prop: TColumnProp = utils.hint_to_column_prop(hint_type) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 1752039775..e7c7f7a164 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -35,7 +35,7 @@ from dlt.common.destination import TLoaderFileFormat from dlt.common.exceptions import TerminalValueError from dlt.common.schema import Schema, TSchemaTables -from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns +from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns, TTableSchema from dlt.common.storages import FileStorage from dlt.common.storages.exceptions import LoadPackageNotFound, CurrentLoadPackageStateNotAvailable from dlt.common.typing import DictStrAny, SupportsHumanize @@ -76,6 +76,11 @@ class TLoadPackageState(TVersionedState, total=False): destination_state: NotRequired[Dict[str, Any]] """private space for destinations to store state relevant only to the load package""" + dropped_tables: NotRequired[List[TTableSchema]] + """List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)""" + truncated_tables: NotRequired[List[TTableSchema]] + """List of tables that are to be truncated in the destination (i.e. when `refresh='drop_data'` mode is used)""" + class TLoadPackage(TypedDict, total=False): load_id: str diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 7490dc6e53..2d46f367d8 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -245,7 +245,9 @@ def is_dict_generic_type(t: Type[Any]) -> bool: return False -def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]: +def extract_inner_type( + hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False +) -> Type[Any]: """Gets the inner type from Literal, Optional, Final and NewType Args: @@ -256,15 +258,15 @@ def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Typ Type[Any]: Inner type if hint was Literal, Optional or NewType, otherwise hint """ if maybe_modified := extract_type_if_modifier(hint): - return extract_inner_type(maybe_modified, preserve_new_types) + return extract_inner_type(maybe_modified, preserve_new_types, preserve_literal) if is_optional_type(hint): - return extract_inner_type(get_args(hint)[0], preserve_new_types) - if is_literal_type(hint): + return extract_inner_type(get_args(hint)[0], preserve_new_types, preserve_literal) + if is_literal_type(hint) and not preserve_literal: # assume that all literals are of the same type return type(get_args(hint)[0]) if is_newtype_type(hint) and not preserve_new_types: # descend into supertypes of NewType - return extract_inner_type(hint.__supertype__, preserve_new_types) + return extract_inner_type(hint.__supertype__, preserve_new_types, preserve_literal) return hint diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 5070ff061c..d75226be13 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -171,6 +171,15 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: self.fs_client.makedirs(self.dataset_path, exist_ok=True) self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME)) + def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: + self.truncate_tables(list(tables)) + if not delete_schema: + return + # Delete all stored schemas + for filename, fileparts in self._iter_stored_schema_files(): + if fileparts[0] == self.schema.name: + self._delete_file(filename) + def truncate_tables(self, table_names: List[str]) -> None: """Truncate table with given name""" table_dirs = set(self.get_table_dirs(table_names)) @@ -180,19 +189,23 @@ def truncate_tables(self, table_names: List[str]) -> None: # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors # print(f"DEL {table_file}") try: - # NOTE: must use rm_file to get errors on delete - self.fs_client.rm_file(table_file) - except NotImplementedError: - # not all filesystem implement the above - self.fs_client.rm(table_file) - if self.fs_client.exists(table_file): - raise FileExistsError(table_file) + self._delete_file(table_file) except FileNotFoundError: logger.info( f"Directory or path to truncate tables {table_names} does not exist but" " it should have been created previously!" ) + def _delete_file(self, file_path: str) -> None: + try: + # NOTE: must use rm_file to get errors on delete + self.fs_client.rm_file(file_path) + except NotImplementedError: + # not all filesystems implement the above + self.fs_client.rm(file_path) + if self.fs_client.exists(file_path): + raise FileExistsError(file_path) + def update_stored_schema( self, only_tables: Iterable[str] = None, @@ -401,6 +414,11 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", ) + def _iter_stored_schema_files(self) -> Iterator[Tuple[str, List[str]]]: + """Iterator over all stored schema files""" + for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name): + yield filepath, fileparts + def _get_stored_schema_by_hash_or_newest( self, version_hash: str = None ) -> Optional[StorageSchemaInfo]: @@ -409,7 +427,7 @@ def _get_stored_schema_by_hash_or_newest( # find newest schema for pipeline or by version hash selected_path = None newest_load_id = "0" - for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name): + for filepath, fileparts in self._iter_stored_schema_files(): if ( not version_hash and fileparts[0] == self.schema.name diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 5838ab2ab7..853972fcba 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -202,11 +202,18 @@ def update_stored_schema( ) return applied_update - def drop_tables(self, *tables: str, replace_schema: bool = True) -> None: + def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: + """Drop tables in destination database and optionally delete the stored schema as well. + Clients that support ddl transactions will have both operations performed in a single transaction. + + Args: + tables: Names of tables to drop. + delete_schema: If True, also delete all versions of the current schema from storage + """ with self.maybe_ddl_transaction(): self.sql_client.drop_tables(*tables) - if replace_schema: - self._replace_schema_in_storage(self.schema) + if delete_schema: + self._delete_schema_in_storage(self.schema) @contextlib.contextmanager def maybe_ddl_transaction(self) -> Iterator[None]: @@ -520,13 +527,12 @@ def _row_to_schema_info(self, query: str, *args: Any) -> StorageSchemaInfo: return StorageSchemaInfo(row[0], row[1], row[2], row[3], inserted_at, schema_str) - def _replace_schema_in_storage(self, schema: Schema) -> None: + def _delete_schema_in_storage(self, schema: Schema) -> None: """ - Save the given schema in storage and remove all previous versions with the same name + Delete all stored versions with the same name as given schema """ name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) self.sql_client.execute_sql(f"DELETE FROM {name} WHERE schema_name = %s;", schema.name) - self._update_schema_in_storage(schema) def _update_schema_in_storage(self, schema: Schema) -> None: # get schema string or zip diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index d4298f2f6b..009cd8cc53 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -2,7 +2,7 @@ from collections.abc import Sequence as C_Sequence from copy import copy import itertools -from typing import Iterator, List, Dict, Any +from typing import Iterator, List, Dict, Any, Optional import yaml from dlt.common.configuration.container import Container @@ -32,9 +32,8 @@ ParsedLoadJobFileName, LoadPackageStateInjectableContext, TPipelineStateDoc, + commit_load_package_state, ) - - from dlt.common.utils import get_callable_name, get_full_class_name from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext @@ -46,6 +45,7 @@ from dlt.extract.storage import ExtractStorage from dlt.extract.extractors import ObjectExtractor, ArrowExtractor, Extractor from dlt.extract.utils import get_data_item_format +from dlt.pipeline.drop import drop_resources def data_to_sources( @@ -369,6 +369,7 @@ def extract( source: DltSource, max_parallel_items: int, workers: int, + load_package_state_update: Optional[Dict[str, Any]] = None, ) -> str: # generate load package to be able to commit all the sources together later load_id = self.extract_storage.create_load_package(source.discover_schema()) @@ -378,9 +379,9 @@ def extract( SourceInjectableContext(source) ), Container().injectable_context( LoadPackageStateInjectableContext( - storage=self.extract_storage.new_packages, load_id=load_id + load_id=load_id, storage=self.extract_storage.new_packages ) - ): + ) as load_package: # inject the config section with the current source name with inject_section( ConfigSectionContext( @@ -388,6 +389,9 @@ def extract( source_state_key=source.name, ) ): + if load_package_state_update: + load_package.state.update(load_package_state_update) # type: ignore[typeddict-item] + # reset resource states, the `extracted` list contains all the explicit resources and all their parents for resource in source.resources.extracted.values(): with contextlib.suppress(DataItemRequiredForDynamicTableHints): @@ -400,6 +404,7 @@ def extract( max_parallel_items=max_parallel_items, workers=workers, ) + commit_load_package_state() return load_id def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index e68c330765..7d7302aab6 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -432,8 +432,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator elif decompose == "serialize": if not isinstance(data, DltSource): raise ValueError("Can only decompose dlt sources") - if pipeline.full_refresh: - raise ValueError("Cannot decompose pipelines with full_refresh set") + if pipeline.dev_mode: + raise ValueError("Cannot decompose pipelines with dev_mode set") # serialize tasks tasks = [] pt = None @@ -448,8 +448,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator if not isinstance(data, DltSource): raise ValueError("Can only decompose dlt sources") - if pipeline.full_refresh: - raise ValueError("Cannot decompose pipelines with full_refresh set") + if pipeline.dev_mode: + raise ValueError("Cannot decompose pipelines with dev_mode set") tasks = [] sources = data.decompose("scc") @@ -484,8 +484,8 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator if not isinstance(data, DltSource): raise ValueError("Can only decompose dlt sources") - if pipeline.full_refresh: - raise ValueError("Cannot decompose pipelines with full_refresh set") + if pipeline.dev_mode: + raise ValueError("Cannot decompose pipelines with dev_mode set") # parallel tasks tasks = [] diff --git a/dlt/load/load.py b/dlt/load/load.py index 9d898bc54d..d96a6b7116 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -1,24 +1,24 @@ import contextlib from functools import reduce import datetime # noqa: 251 -from typing import Dict, List, Optional, Tuple, Set, Iterator, Iterable +from typing import Dict, List, Optional, Tuple, Set, Iterator, Iterable, Sequence from concurrent.futures import Executor import os +from copy import deepcopy from dlt.common import logger from dlt.common.runtime.signals import sleep from dlt.common.configuration import with_config, known_sections from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.accessors import config -from dlt.common.pipeline import ( - LoadInfo, - LoadMetrics, - SupportsPipeline, - WithStepInfo, -) +from dlt.common.pipeline import LoadInfo, LoadMetrics, SupportsPipeline, WithStepInfo from dlt.common.schema.utils import get_top_level_table +from dlt.common.schema.typing import TTableSchema from dlt.common.storages.load_storage import LoadPackageInfo, ParsedLoadJobFileName, TJobState -from dlt.common.storages.load_package import LoadPackageStateInjectableContext +from dlt.common.storages.load_package import ( + LoadPackageStateInjectableContext, + load_package as current_load_package, +) from dlt.common.runners import TRunMetrics, Runnable, workermethod, NullExecutor from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.logger import pretty_format_exception @@ -362,6 +362,9 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) def load_single_package(self, load_id: str, schema: Schema) -> None: new_jobs = self.get_new_jobs_info(load_id) + + dropped_tables = current_load_package()["state"].get("dropped_tables", []) + truncated_tables = current_load_package()["state"].get("truncated_tables", []) # initialize analytical storage ie. create dataset required by passed schema with self.get_destination_client(schema) as job_client: if (expected_update := self.load_storage.begin_schema_update(load_id)) is not None: @@ -377,6 +380,8 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: if isinstance(job_client, WithStagingDataset) else None ), + drop_tables=dropped_tables, + truncate_tables=truncated_tables, ) # init staging client @@ -385,6 +390,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: f"Job client for destination {self.destination.destination_type} does not" " implement SupportsStagingDestination" ) + with self.get_staging_destination_client(schema) as staging_client: init_client( staging_client, @@ -392,7 +398,10 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: new_jobs, expected_update, job_client.should_truncate_table_before_load_on_staging_destination, + # should_truncate_staging, job_client.should_load_data_to_staging_dataset_on_staging_destination, + drop_tables=dropped_tables, + truncate_tables=truncated_tables, ) self.load_storage.commit_schema_update(load_id, applied_update) diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 067ae33613..5126cbd11e 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -1,4 +1,4 @@ -from typing import List, Set, Iterable, Callable +from typing import List, Set, Iterable, Callable, Optional from dlt.common import logger from dlt.common.storages.load_package import LoadJobInfo, PackageStorage @@ -30,7 +30,10 @@ def get_completed_table_chain( # returns ordered list of tables from parent to child leaf tables table_chain: List[TTableSchema] = [] # allow for jobless tables for those write disposition - skip_jobless_table = top_merged_table["write_disposition"] not in ("replace", "merge") + skip_jobless_table = top_merged_table["write_disposition"] not in ( + "replace", + "merge", + ) # make sure all the jobs for the table chain is completed for table in map( @@ -66,6 +69,8 @@ def init_client( expected_update: TSchemaTables, truncate_filter: Callable[[TTableSchema], bool], load_staging_filter: Callable[[TTableSchema], bool], + drop_tables: Optional[List[TTableSchema]] = None, + truncate_tables: Optional[List[TTableSchema]] = None, ) -> TSchemaTables: """Initializes destination storage including staging dataset if supported @@ -78,12 +83,15 @@ def init_client( expected_update (TSchemaTables): Schema update as in load package. Always present even if empty truncate_filter (Callable[[TTableSchema], bool]): A filter that tells which table in destination dataset should be truncated load_staging_filter (Callable[[TTableSchema], bool]): A filter which tell which table in the staging dataset may be loaded into + drop_tables (Optional[List[TTableSchema]]): List of tables to drop before initializing storage + truncate_tables (Optional[List[TTableSchema]]): List of tables to truncate before initializing storage Returns: TSchemaTables: Actual migrations done at destination """ # get dlt/internal tables dlt_tables = set(schema.dlt_table_names()) + # tables without data (TODO: normalizer removes such jobs, write tests and remove the line below) tables_no_data = set( table["name"] for table in schema.data_tables() if not has_table_seen_data(table) @@ -92,12 +100,22 @@ def init_client( tables_with_jobs = set(job.table_name for job in new_jobs) - tables_no_data # get tables to truncate by extending tables with jobs with all their child tables - truncate_tables = set( - _extend_tables_with_table_chain(schema, tables_with_jobs, tables_with_jobs, truncate_filter) + initial_truncate_names = set(t["name"] for t in truncate_tables) if truncate_tables else set() + truncate_table_names = set( + _extend_tables_with_table_chain( + schema, + tables_with_jobs, + tables_with_jobs, + lambda t: truncate_filter(t) or t["name"] in initial_truncate_names, + ) ) applied_update = _init_dataset_and_update_schema( - job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables + job_client, + expected_update, + tables_with_jobs | dlt_tables, + truncate_table_names, + drop_tables=drop_tables, ) # update the staging dataset if client supports this @@ -128,6 +146,7 @@ def _init_dataset_and_update_schema( update_tables: Iterable[str], truncate_tables: Iterable[str] = None, staging_info: bool = False, + drop_tables: Optional[List[TTableSchema]] = None, ) -> TSchemaTables: staging_text = "for staging dataset" if staging_info else "" logger.info( @@ -135,16 +154,26 @@ def _init_dataset_and_update_schema( f" {staging_text}" ) job_client.initialize_storage() + if drop_tables: + drop_table_names = [table["name"] for table in drop_tables] + if hasattr(job_client, "drop_tables"): + logger.info( + f"Client for {job_client.config.destination_type} will drop tables {staging_text}" + ) + job_client.drop_tables(*drop_table_names, delete_schema=True) + logger.info( f"Client for {job_client.config.destination_type} will update schema to package schema" f" {staging_text}" ) + applied_update = job_client.update_stored_schema( only_tables=update_tables, expected_update=expected_update ) logger.info( f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" ) + job_client.initialize_storage(truncate_tables=truncate_tables) return applied_update @@ -167,7 +196,10 @@ def _extend_tables_with_table_chain( # for replace and merge write dispositions we should include tables # without jobs in the table chain, because child tables may need # processing due to changes in the root table - skip_jobless_table = top_job_table["write_disposition"] not in ("replace", "merge") + skip_jobless_table = top_job_table["write_disposition"] not in ( + "replace", + "merge", + ) for table in map( lambda t: fill_hints_from_parent_and_clone_table(schema.tables, t), get_child_tables(schema.tables, top_job_table["name"]), diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index c9e7b5097c..f8900ae562 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,4 +1,4 @@ -from typing import Sequence, Type, cast, overload +from typing import Sequence, Type, cast, overload, Optional from typing_extensions import TypeVar from dlt.common.schema import Schema @@ -9,12 +9,12 @@ from dlt.common.configuration.container import Container from dlt.common.configuration.inject import get_orig_args, last_config from dlt.common.destination import TLoaderFileFormat, Destination, TDestinationReferenceArg -from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir +from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir, TRefreshMode from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs from dlt.pipeline.pipeline import Pipeline from dlt.pipeline.progress import _from_name as collector_from_name, TCollectorArg, _NULL_COLLECTOR -from dlt.pipeline.warnings import credentials_argument_deprecated +from dlt.pipeline.warnings import credentials_argument_deprecated, full_refresh_argument_deprecated TPipeline = TypeVar("TPipeline", bound=Pipeline, default=Pipeline) @@ -29,7 +29,9 @@ def pipeline( dataset_name: str = None, import_schema_path: str = None, export_schema_path: str = None, - full_refresh: bool = False, + full_refresh: Optional[bool] = None, + dev_mode: bool = False, + refresh: Optional[TRefreshMode] = None, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, _impl_cls: Type[TPipeline] = Pipeline, # type: ignore[assignment] @@ -67,9 +69,15 @@ def pipeline( export_schema_path (str, optional): A path where the schema `yaml` file will be exported after every schema change. Defaults to None which disables exporting. - full_refresh (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset. + dev_mode (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset. The datasets are identified by `dataset_name_` + datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False. + refresh (str | TRefreshMode): Fully or partially reset sources during pipeline run. When set here the refresh is applied on each run of the pipeline. + To apply refresh only once you can pass it to `pipeline.run` or `extract` instead. The following refresh modes are supported: + * `drop_sources`: Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased) + * `drop_resources`: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased) + * `drop_data`: Wipe all data and resource state for all resources being processed. Schema is not modified. + credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials. In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values. @@ -98,7 +106,9 @@ def pipeline( dataset_name: str = None, import_schema_path: str = None, export_schema_path: str = None, - full_refresh: bool = False, + full_refresh: Optional[bool] = None, + dev_mode: bool = False, + refresh: Optional[TRefreshMode] = None, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, _impl_cls: Type[TPipeline] = Pipeline, # type: ignore[assignment] @@ -111,6 +121,7 @@ def pipeline( has_arguments = bool(orig_args[0]) or any(orig_args[1].values()) credentials_argument_deprecated("pipeline", credentials, destination) + full_refresh_argument_deprecated("pipeline", full_refresh) if not has_arguments: context = Container()[PipelineContext] @@ -144,11 +155,12 @@ def pipeline( credentials, import_schema_path, export_schema_path, - full_refresh, + full_refresh if full_refresh is not None else dev_mode, progress, False, last_config(**kwargs), kwargs["runtime"], + refresh=refresh, ) # set it as current pipeline p.activate() @@ -160,13 +172,15 @@ def attach( pipeline_name: str = None, pipelines_dir: str = None, pipeline_salt: TSecretValue = None, - full_refresh: bool = False, + full_refresh: Optional[bool] = None, + dev_mode: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, **kwargs: Any, ) -> Pipeline: """Attaches to the working folder of `pipeline_name` in `pipelines_dir` or in default directory. Requires that valid pipeline state exists in working folder.""" ensure_correct_pipeline_kwargs(attach, **kwargs) + full_refresh_argument_deprecated("attach", full_refresh) # if working_dir not provided use temp folder if not pipelines_dir: pipelines_dir = get_dlt_pipelines_dir() @@ -182,7 +196,7 @@ def attach( credentials, None, None, - full_refresh, + full_refresh if full_refresh is not None else dev_mode, progress, True, last_config(**kwargs), diff --git a/dlt/pipeline/configuration.py b/dlt/pipeline/configuration.py index 8c46ed049f..235ba3485a 100644 --- a/dlt/pipeline/configuration.py +++ b/dlt/pipeline/configuration.py @@ -5,6 +5,7 @@ from dlt.common.typing import AnyFun, TSecretValue from dlt.common.utils import digest256 from dlt.common.destination import TLoaderFileFormat +from dlt.common.pipeline import TRefreshMode @configspec @@ -24,10 +25,14 @@ class PipelineConfiguration(BaseConfiguration): """Enables the tracing. Tracing saves the execution trace locally and is required by `dlt deploy`.""" use_single_dataset: bool = True """Stores all schemas in single dataset. When False, each schema will get a separate dataset with `{dataset_name}_{schema_name}""" - full_refresh: bool = False + full_refresh: Optional[bool] = None + """Deprecated. Use `dev_mode` instead. When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.""" + dev_mode: bool = False """When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.""" progress: Optional[str] = None runtime: RunConfiguration = None + refresh: Optional[TRefreshMode] = None + """Refresh mode for the pipeline to fully or partially reset a source during run. See docstring of `dlt.pipeline` for more details.""" def on_resolved(self) -> None: if not self.pipeline_name: diff --git a/dlt/pipeline/drop.py b/dlt/pipeline/drop.py new file mode 100644 index 0000000000..486bead2f4 --- /dev/null +++ b/dlt/pipeline/drop.py @@ -0,0 +1,171 @@ +from typing import Union, Iterable, Optional, List, Dict, Any, Tuple, TypedDict +from copy import deepcopy +from itertools import chain +from dataclasses import dataclass + +from dlt.common.schema import Schema +from dlt.common.pipeline import ( + TPipelineState, + _sources_state, + _get_matching_resources, + _get_matching_sources, + reset_resource_state, + _delete_source_state_keys, +) +from dlt.common.schema.typing import TSimpleRegex, TTableSchema +from dlt.common.schema.utils import ( + group_tables_by_resource, + compile_simple_regexes, + compile_simple_regex, +) +from dlt.common import jsonpath +from dlt.common.typing import REPattern + + +class _DropInfo(TypedDict): + tables: List[str] + resource_states: List[str] + resource_names: List[str] + state_paths: List[str] + schema_name: str + dataset_name: Optional[str] + drop_all: bool + resource_pattern: Optional[REPattern] + warnings: List[str] + + +@dataclass +class _DropResult: + schema: Schema + state: TPipelineState + info: _DropInfo + dropped_tables: List[TTableSchema] + + +def _create_modified_state( + state: TPipelineState, + resource_pattern: Optional[REPattern], + source_pattern: REPattern, + state_paths: jsonpath.TAnyJsonPath, + info: _DropInfo, +) -> Tuple[TPipelineState, _DropInfo]: + # if not self.drop_state: + # return state # type: ignore[return-value] + all_source_states = _sources_state(state) + for source_name in _get_matching_sources(source_pattern, state): + source_state = all_source_states[source_name] + # drop table states + if resource_pattern: + for key in _get_matching_resources(resource_pattern, source_state): + info["resource_states"].append(key) + reset_resource_state(key, source_state) + # drop additional state paths + # Don't drop 'resources' key if jsonpath is wildcard + resolved_paths = [ + p for p in jsonpath.resolve_paths(state_paths, source_state) if p != "resources" + ] + if state_paths and not resolved_paths: + info["warnings"].append( + f"State paths {state_paths} did not select any paths in source {source_name}" + ) + _delete_source_state_keys(resolved_paths, source_state) + info["state_paths"].extend(f"{source_name}.{p}" for p in resolved_paths) + return state, info + + +def drop_resources( + schema: Schema, + state: TPipelineState, + resources: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]] = (), + state_paths: jsonpath.TAnyJsonPath = (), + drop_all: bool = False, + state_only: bool = False, + sources: Optional[Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]]] = None, +) -> _DropResult: + """Generate a new schema and pipeline state with the requested resources removed. + + Args: + schema: The schema to modify. + state: The pipeline state to modify. + resources: Resource name(s) or regex pattern(s) matching resource names to drop. + If empty, no resources will be dropped unless `drop_all` is True. + state_paths: JSON path(s) relative to the source state to drop. + drop_all: If True, all resources will be dropped (supeseeds `resources`). + state_only: If True, only modify the pipeline state, not schema + sources: Only wipe state for sources matching the name(s) or regex pattern(s) in this list + If not set all source states will be modified according to `state_paths` and `resources` + + Returns: + A 3 part tuple containing the new schema, the new pipeline state, and a dictionary + containing information about the drop operation. + """ + + if isinstance(resources, str): + resources = [resources] + resources = list(resources) + if isinstance(sources, str): + sources = [sources] + if sources is not None: + sources = list(sources) + if isinstance(state_paths, str): + state_paths = [state_paths] + + state_paths = jsonpath.compile_paths(state_paths) + + schema = schema.clone() + state = deepcopy(state) + + resources = set(resources) + if drop_all: + resource_pattern = compile_simple_regex(TSimpleRegex("re:.*")) # Match everything + elif resources: + resource_pattern = compile_simple_regexes(TSimpleRegex(r) for r in resources) + else: + resource_pattern = None + if sources is not None: + source_pattern = compile_simple_regexes(TSimpleRegex(s) for s in sources) + else: + source_pattern = compile_simple_regex(TSimpleRegex("re:.*")) # Match everything + + if resource_pattern: + data_tables = { + t["name"]: t for t in schema.data_tables(seen_data_only=True) + } # Don't remove _dlt tables + resource_tables = group_tables_by_resource(data_tables, pattern=resource_pattern) + resource_names = list(resource_tables.keys()) + # TODO: If drop_tables + if not state_only: + tables_to_drop = list(chain.from_iterable(resource_tables.values())) + tables_to_drop.reverse() + else: + tables_to_drop = [] + else: + tables_to_drop = [] + resource_names = [] + + info: _DropInfo = dict( + tables=[t["name"] for t in tables_to_drop], + resource_states=[], + state_paths=[], + resource_names=resource_names, + schema_name=schema.name, + dataset_name=None, + drop_all=drop_all, + resource_pattern=resource_pattern, + warnings=[], + ) + + new_state, info = _create_modified_state( + state, resource_pattern, source_pattern, state_paths, info + ) + info["resource_names"] = resource_names + + if resource_pattern and not resource_tables: + info["warnings"].append( + f"Specified resource(s) {str(resources)} did not select any table(s) in schema" + f" {schema.name}. Possible resources are:" + f" {list(group_tables_by_resource(data_tables).keys())}" + ) + + dropped_tables = schema.drop_tables([t["name"] for t in tables_to_drop], seen_data_only=True) + return _DropResult(schema, new_state, info, dropped_tables) diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index c1c3326171..0defbc14eb 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -1,25 +1,19 @@ -import contextlib -from typing import Callable, Sequence, Iterable, Optional, Any, List, Dict, Union, TypedDict -from itertools import chain +from copy import deepcopy +from typing import ( + Callable, + Sequence, + Iterable, + Optional, + Any, + Dict, + Union, + TYPE_CHECKING, +) -from dlt.common.jsonpath import resolve_paths, TAnyJsonPath, compile_paths +from dlt.common.jsonpath import TAnyJsonPath from dlt.common.exceptions import TerminalException -from dlt.common.schema.utils import ( - group_tables_by_resource, - compile_simple_regexes, - compile_simple_regex, -) from dlt.common.schema.typing import TSimpleRegex -from dlt.common.typing import REPattern -from dlt.common.pipeline import ( - reset_resource_state, - _sources_state, - _delete_source_state_keys, - _get_matching_resources, -) -from dlt.common.destination.reference import WithStagingDataset - -from dlt.destinations.exceptions import DatabaseUndefinedRelation +from dlt.common.pipeline import pipeline_state as current_pipeline_state, TRefreshMode from dlt.pipeline.exceptions import ( PipelineNeverRan, PipelineStepFailed, @@ -27,7 +21,11 @@ ) from dlt.pipeline.state_sync import force_state_extract from dlt.pipeline.typing import TPipelineStep -from dlt.pipeline import Pipeline +from dlt.pipeline.drop import drop_resources +from dlt.extract import DltSource + +if TYPE_CHECKING: + from dlt.pipeline import Pipeline def retry_load( @@ -62,87 +60,48 @@ def _retry_load(ex: BaseException) -> bool: return _retry_load -class _DropInfo(TypedDict): - tables: List[str] - resource_states: List[str] - resource_names: List[str] - state_paths: List[str] - schema_name: str - dataset_name: str - drop_all: bool - resource_pattern: Optional[REPattern] - warnings: List[str] - - class DropCommand: def __init__( self, - pipeline: Pipeline, + pipeline: "Pipeline", resources: Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]] = (), schema_name: Optional[str] = None, state_paths: TAnyJsonPath = (), drop_all: bool = False, state_only: bool = False, ) -> None: + """ + Args: + pipeline: Pipeline to drop tables and state from + resources: List of resources to drop. If empty, no resources are dropped unless `drop_all` is True + schema_name: Name of the schema to drop tables from. If not specified, the default schema is used + state_paths: JSON path(s) relative to the source state to drop + drop_all: Drop all resources and tables in the schema (supersedes `resources` list) + state_only: Drop only state, not tables + """ self.pipeline = pipeline - if isinstance(resources, str): - resources = [resources] - if isinstance(state_paths, str): - state_paths = [state_paths] if not pipeline.default_schema_name: raise PipelineNeverRan(pipeline.pipeline_name, pipeline.pipelines_dir) - self.schema = pipeline.schemas[schema_name or pipeline.default_schema_name].clone() - self.schema_tables = self.schema.tables - self.drop_tables = not state_only - self.drop_state = True - self.state_paths_to_drop = compile_paths(state_paths) - - resources = set(resources) - resource_names = [] - if drop_all: - self.resource_pattern = compile_simple_regex(TSimpleRegex("re:.*")) # Match everything - elif resources: - self.resource_pattern = compile_simple_regexes(TSimpleRegex(r) for r in resources) - else: - self.resource_pattern = None - - if self.resource_pattern: - data_tables = { - t["name"]: t for t in self.schema.data_tables() - } # Don't remove _dlt tables - resource_tables = group_tables_by_resource(data_tables, pattern=self.resource_pattern) - if self.drop_tables: - self.tables_to_drop = list(chain.from_iterable(resource_tables.values())) - self.tables_to_drop.reverse() - else: - self.tables_to_drop = [] - resource_names = list(resource_tables.keys()) - else: - self.tables_to_drop = [] - self.drop_tables = False # No tables to drop - self.drop_state = not not self.state_paths_to_drop # obtain truth value - - self.drop_all = drop_all - self.info: _DropInfo = dict( - tables=[t["name"] for t in self.tables_to_drop], - resource_states=[], - state_paths=[], - resource_names=resource_names, - schema_name=self.schema.name, - dataset_name=self.pipeline.dataset_name, - drop_all=drop_all, - resource_pattern=self.resource_pattern, - warnings=[], + + drop_result = drop_resources( + # self._drop_schema, self._new_state, self.info = drop_resources( + self.schema, + pipeline.state, + resources, + state_paths, + drop_all, + state_only, ) - if self.resource_pattern and not resource_tables: - self.info["warnings"].append( - f"Specified resource(s) {str(resources)} did not select any table(s) in schema" - f" {self.schema.name}. Possible resources are:" - f" {list(group_tables_by_resource(data_tables).keys())}" - ) - self._new_state = self._create_modified_state() + + self._new_state = drop_result.state + self.info = drop_result.info + self._new_schema = drop_result.schema + self._dropped_tables = drop_result.dropped_tables + self.drop_tables = not state_only and bool(self._dropped_tables) + + self.drop_state = bool(drop_all or resources or state_paths) @property def is_empty(self) -> bool: @@ -152,58 +111,6 @@ def is_empty(self) -> bool: and len(self.info["resource_states"]) == 0 ) - def _drop_destination_tables(self) -> None: - table_names = [tbl["name"] for tbl in self.tables_to_drop] - for table_name in table_names: - assert table_name not in self.schema._schema_tables, ( - f"You are dropping table {table_name} in {self.schema.name} but it is still present" - " in the schema" - ) - with self.pipeline._sql_job_client(self.schema) as client: - client.drop_tables(*table_names, replace_schema=True) - # also delete staging but ignore if staging does not exist - if isinstance(client, WithStagingDataset): - with contextlib.suppress(DatabaseUndefinedRelation): - with client.with_staging_dataset(): - client.drop_tables(*table_names, replace_schema=True) - - def _delete_schema_tables(self) -> None: - for tbl in self.tables_to_drop: - del self.schema_tables[tbl["name"]] - # bump schema, we'll save later - self.schema._bump_version() - - def _list_state_paths(self, source_state: Dict[str, Any]) -> List[str]: - return resolve_paths(self.state_paths_to_drop, source_state) - - def _create_modified_state(self) -> Dict[str, Any]: - state = self.pipeline.state - if not self.drop_state: - return state # type: ignore[return-value] - source_states = _sources_state(state).items() - for source_name, source_state in source_states: - # drop table states - if self.drop_state and self.resource_pattern: - for key in _get_matching_resources(self.resource_pattern, source_state): - self.info["resource_states"].append(key) - reset_resource_state(key, source_state) - # drop additional state paths - resolved_paths = resolve_paths(self.state_paths_to_drop, source_state) - if self.state_paths_to_drop and not resolved_paths: - self.info["warnings"].append( - f"State paths {self.state_paths_to_drop} did not select any paths in source" - f" {source_name}" - ) - _delete_source_state_keys(resolved_paths, source_state) - self.info["state_paths"].extend(f"{source_name}.{p}" for p in resolved_paths) - return state # type: ignore[return-value] - - def _extract_state(self) -> None: - state: Dict[str, Any] - with self.pipeline.managed_state(extract_state=True) as state: # type: ignore[assignment] - state.clear() - state.update(self._new_state) - def __call__(self) -> None: if ( self.pipeline.has_pending_data @@ -216,14 +123,16 @@ def __call__(self) -> None: if not self.drop_state and not self.drop_tables: return # Nothing to drop - if self.drop_tables: - self._delete_schema_tables() - self._drop_destination_tables() - if self.drop_tables: - self.pipeline.schemas.save_schema(self.schema) - if self.drop_state: - self._extract_state() - # Send updated state to destination + self._new_schema._bump_version() + new_state = deepcopy(self._new_state) + force_state_extract(new_state) + + self.pipeline._save_and_extract_state_and_schema( + new_state, + schema=self._new_schema, + load_package_state_update={"dropped_tables": self._dropped_tables}, + ) + self.pipeline.normalize() try: self.pipeline.load(raise_on_failed_jobs=True) @@ -232,11 +141,13 @@ def __call__(self) -> None: self.pipeline.drop_pending_packages() with self.pipeline.managed_state() as state: force_state_extract(state) + # Restore original schema file so all tables are known on next run + self.pipeline.schemas.save_schema(self.schema) raise def drop( - pipeline: Pipeline, + pipeline: "Pipeline", resources: Union[Iterable[str], str] = (), schema_name: str = None, state_paths: TAnyJsonPath = (), @@ -244,3 +155,34 @@ def drop( state_only: bool = False, ) -> None: return DropCommand(pipeline, resources, schema_name, state_paths, drop_all, state_only)() + + +def refresh_source( + pipeline: "Pipeline", source: DltSource, refresh: TRefreshMode +) -> Dict[str, Any]: + """Run the pipeline's refresh mode on the given source, updating the source's schema and state. + + Returns: + The new load package state containing tables that need to be dropped/truncated. + """ + if pipeline.first_run: + return {} + pipeline_state, _ = current_pipeline_state(pipeline._container) + _resources_to_drop = list(source.resources.extracted) if refresh != "drop_sources" else [] + drop_result = drop_resources( + source.schema, + pipeline_state, + resources=_resources_to_drop, + drop_all=refresh == "drop_sources", + state_paths="*" if refresh == "drop_sources" else [], + sources=source.name, + ) + load_package_state = {} + if drop_result.dropped_tables: + key = "dropped_tables" if refresh != "drop_data" else "truncated_tables" + load_package_state[key] = drop_result.dropped_tables + if refresh != "drop_data": # drop_data is only data wipe, keep original schema + source.schema = drop_result.schema + if "sources" in drop_result.state: + pipeline_state["sources"] = drop_result.state["sources"] + return load_package_state diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 53770f332d..81b50a8326 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -16,6 +16,7 @@ cast, get_type_hints, ContextManager, + Dict, ) from dlt import version @@ -45,6 +46,7 @@ TWriteDispositionConfig, TAnySchemaColumns, TSchemaContract, + TTableSchema, ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound @@ -95,6 +97,7 @@ StateInjectableContext, TStepMetrics, WithStepInfo, + TRefreshMode, ) from dlt.common.schema import Schema from dlt.common.utils import is_interactive @@ -122,6 +125,7 @@ PipelineStepFailed, SqlClientNotAvailable, FSClientNotAvailable, + PipelineNeverRan, ) from dlt.pipeline.trace import ( PipelineTrace, @@ -133,6 +137,7 @@ end_trace_step, end_trace, ) +from dlt.common.pipeline import pipeline_state as current_pipeline_state from dlt.pipeline.typing import TPipelineStep from dlt.pipeline.state_sync import ( PIPELINE_STATE_ENGINE_VERSION, @@ -145,6 +150,7 @@ ) from dlt.pipeline.warnings import credentials_argument_deprecated from dlt.common.storages.load_package import TLoadPackageState +from dlt.pipeline.helpers import refresh_source def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]: @@ -293,7 +299,7 @@ class Pipeline(SupportsPipeline): schema_names: List[str] = [] first_run: bool = False """Indicates a first run of the pipeline, where run ends with successful loading of the data""" - full_refresh: bool + dev_mode: bool must_attach_to_local_pipeline: bool pipelines_dir: str """A directory where the pipelines' working directories are created""" @@ -310,6 +316,7 @@ class Pipeline(SupportsPipeline): collector: _Collector config: PipelineConfiguration runtime_config: RunConfiguration + refresh: Optional[TRefreshMode] = None def __init__( self, @@ -322,20 +329,22 @@ def __init__( credentials: Any, import_schema_path: str, export_schema_path: str, - full_refresh: bool, + dev_mode: bool, progress: _Collector, must_attach_to_local_pipeline: bool, config: PipelineConfiguration, runtime: RunConfiguration, + refresh: Optional[TRefreshMode] = None, ) -> None: """Initializes the Pipeline class which implements `dlt` pipeline. Please use `pipeline` function in `dlt` module to create a new Pipeline instance.""" self.pipeline_salt = pipeline_salt self.config = config self.runtime_config = runtime - self.full_refresh = full_refresh + self.dev_mode = dev_mode self.collector = progress or _NULL_COLLECTOR self.destination = None self.staging = None + self.refresh = refresh self._container = Container() self._pipeline_instance_id = self._create_pipeline_instance_id() @@ -379,7 +388,7 @@ def drop(self, pipeline_name: str = None) -> "Pipeline": self.credentials, self._schema_storage.config.import_schema_path, self._schema_storage.config.export_schema_path, - self.full_refresh, + self.dev_mode, self.collector, False, self.config, @@ -403,8 +412,10 @@ def extract( max_parallel_items: int = None, workers: int = None, schema_contract: TSchemaContract = None, + refresh: Optional[TRefreshMode] = None, ) -> ExtractInfo: """Extracts the `data` and prepare it for the normalization. Does not require destination or credentials to be configured. See `run` method for the arguments' description.""" + # create extract storage to which all the sources will be extracted extract_step = Extract( self._schema_storage, @@ -428,7 +439,14 @@ def extract( ): if source.exhausted: raise SourceExhausted(source.name) - self._extract_source(extract_step, source, max_parallel_items, workers) + + self._extract_source( + extract_step, + source, + max_parallel_items, + workers, + refresh=refresh or self.refresh, + ) # extract state state: TPipelineStateDoc = None if self.config.restore_from_destination: @@ -580,6 +598,7 @@ def run( schema: Schema = None, loader_file_format: TLoaderFileFormat = None, schema_contract: TSchemaContract = None, + refresh: Optional[TRefreshMode] = None, ) -> LoadInfo: """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. @@ -633,6 +652,11 @@ def run( schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. + refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported: + * `drop_sources`: Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased) + * `drop_resources`: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased) + * `drop_data`: Wipe all data and resource state for all resources being processed. Schema is not modified. + Raises: PipelineStepFailed when a problem happened during `extract`, `normalize` or `load` steps. Returns: @@ -648,7 +672,7 @@ def run( # sync state with destination if ( self.config.restore_from_destination - and not self.full_refresh + and not self.dev_mode and not self._state_restored and (self.destination or destination) ): @@ -679,6 +703,7 @@ def run( primary_key=primary_key, schema=schema, schema_contract=schema_contract, + refresh=refresh or self.refresh, ) self.normalize(loader_file_format=loader_file_format) return self.load(destination, dataset_name, credentials=credentials) @@ -1031,7 +1056,7 @@ def _init_working_dir(self, pipeline_name: str, pipelines_dir: str) -> None: # create pipeline storage, do not create working dir yet self._pipeline_storage = FileStorage(self.working_dir, makedirs=False) # if full refresh was requested, wipe out all data from working folder, if exists - if self.full_refresh: + if self.dev_mode: self._wipe_working_folder() def _configure( @@ -1083,7 +1108,13 @@ def _attach_pipeline(self) -> None: pass def _extract_source( - self, extract: Extract, source: DltSource, max_parallel_items: int, workers: int + self, + extract: Extract, + source: DltSource, + max_parallel_items: int, + workers: int, + refresh: Optional[TRefreshMode] = None, + load_package_state_update: Optional[Dict[str, Any]] = None, ) -> str: # discover the existing pipeline schema try: @@ -1102,8 +1133,14 @@ def _extract_source( except FileNotFoundError: pass + load_package_state_update = dict(load_package_state_update or {}) + if refresh: + load_package_state_update.update(refresh_source(self, source, refresh)) + # extract into pipeline schema - load_id = extract.extract(source, max_parallel_items, workers) + load_id = extract.extract( + source, max_parallel_items, workers, load_package_state_update=load_package_state_update + ) # save import with fully discovered schema # NOTE: moved to with_schema_sync, remove this if all test pass @@ -1145,9 +1182,9 @@ def _get_destination_client_initial_config( # this client support many schemas and datasets if issubclass(client_spec, DestinationClientDwhConfiguration): - if not self.dataset_name and self.full_refresh: + if not self.dataset_name and self.dev_mode: logger.warning( - "Full refresh may not work if dataset name is not set. Please set the" + "Dev mode may not work if dataset name is not set. Please set the" " dataset_name argument in dlt.pipeline or run method" ) # set default schema name to load all incoming data to a single dataset, no matter what is the current schema name @@ -1335,8 +1372,8 @@ def _set_dataset_name(self, new_dataset_name: str) -> None: if not new_dataset_name: return - # in case of full refresh add unique suffix - if self.full_refresh: + # in case of dev_mode add unique suffix + if self.dev_mode: # dataset must be specified # double _ is not allowed if new_dataset_name.endswith("_"): @@ -1532,8 +1569,37 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState: state["schema_names"] = self._list_schemas_sorted() return state + def _save_and_extract_state_and_schema( + self, + state: TPipelineState, + schema: Schema, + load_package_state_update: Optional[Dict[str, Any]] = None, + ) -> None: + """Save given state + schema and extract creating a new load package + + Args: + state: The new pipeline state, replaces the current state + schema: The new source schema, replaces current schema of the same name + load_package_state_update: Dict which items will be included in the load package state + """ + self.schemas.save_schema(schema) + with self.managed_state() as old_state: + old_state.update(state) + + self._bump_version_and_extract_state( + state, + extract_state=True, + load_package_state_update=load_package_state_update, + schema=schema, + ) + def _bump_version_and_extract_state( - self, state: TPipelineState, extract_state: bool, extract: Extract = None + self, + state: TPipelineState, + extract_state: bool, + extract: Extract = None, + load_package_state_update: Optional[Dict[str, Any]] = None, + schema: Optional[Schema] = None, ) -> TPipelineStateDoc: """Merges existing state into `state` and extracts state using `storage` if extract_state is True. @@ -1547,7 +1613,11 @@ def _bump_version_and_extract_state( self._schema_storage, self._normalize_storage_config(), original_data=data ) self._extract_source( - extract_, data_to_sources(data, self, self.default_schema)[0], 1, 1 + extract_, + data_to_sources(data, self, schema or self.default_schema)[0], + 1, + 1, + load_package_state_update=load_package_state_update, ) # set state to be extracted mark_state_extracted(state, hash_) diff --git a/dlt/pipeline/warnings.py b/dlt/pipeline/warnings.py index 87fcbc1f0c..8bee670cb7 100644 --- a/dlt/pipeline/warnings.py +++ b/dlt/pipeline/warnings.py @@ -20,3 +20,16 @@ def credentials_argument_deprecated( Dlt04DeprecationWarning, stacklevel=2, ) + + +def full_refresh_argument_deprecated(caller_name: str, full_refresh: t.Optional[bool]) -> None: + """full_refresh argument is replaced with dev_mode""" + if full_refresh is None: + return + + warnings.warn( + f"The `full_refresh` argument to {caller_name} is deprecated and will be removed in a" + f" future version. Use `dev_mode={full_refresh}` instead which will have the same effect.", + Dlt04DeprecationWarning, + stacklevel=2, + ) diff --git a/docs/examples/archive/google_sheets.py b/docs/examples/archive/google_sheets.py index 26c3d30b54..61b9859c53 100644 --- a/docs/examples/archive/google_sheets.py +++ b/docs/examples/archive/google_sheets.py @@ -2,7 +2,7 @@ from sources.google_sheets import google_spreadsheet -dlt.pipeline(destination="bigquery", full_refresh=False) +dlt.pipeline(destination="bigquery", dev_mode=False) # see example.secrets.toml to where to put credentials # "2022-05", "model_metadata" diff --git a/docs/examples/archive/quickstart.py b/docs/examples/archive/quickstart.py index 6e49f1af7a..f435fa3fab 100644 --- a/docs/examples/archive/quickstart.py +++ b/docs/examples/archive/quickstart.py @@ -48,7 +48,7 @@ dataset_name=dataset_name, credentials=credentials, export_schema_path=export_schema_path, - full_refresh=True, + dev_mode=True, ) diff --git a/docs/examples/archive/rasa_example.py b/docs/examples/archive/rasa_example.py index e83e6c61f7..76e3e9c011 100644 --- a/docs/examples/archive/rasa_example.py +++ b/docs/examples/archive/rasa_example.py @@ -20,7 +20,7 @@ event_files = jsonl_files([file for file in os.scandir("docs/examples/data/rasa_trackers")]) info = dlt.pipeline( - full_refresh=True, + dev_mode=True, destination=postgres, # export_schema_path=... # uncomment to see the final schema in the folder you want ).run( diff --git a/docs/examples/archive/singer_tap_jsonl_example.py b/docs/examples/archive/singer_tap_jsonl_example.py index c926a9f153..109dd05b3f 100644 --- a/docs/examples/archive/singer_tap_jsonl_example.py +++ b/docs/examples/archive/singer_tap_jsonl_example.py @@ -9,7 +9,7 @@ # load hubspot schema stub - it converts all field names with `timestamp` into timestamp type schema = SchemaStorage.load_schema_file("docs/examples/schemas/", "hubspot", ("yaml",)) -p = dlt.pipeline(destination="postgres", full_refresh=True) +p = dlt.pipeline(destination="postgres", dev_mode=True) # now load a pipeline created from jsonl resource that feeds messages into singer tap transformer pipe = jsonl_file("docs/examples/data/singer_taps/tap_hubspot.jsonl") | singer_raw_stream() # provide hubspot schema diff --git a/docs/examples/chess/chess.py b/docs/examples/chess/chess.py index df1fb18845..7b577c2646 100644 --- a/docs/examples/chess/chess.py +++ b/docs/examples/chess/chess.py @@ -50,9 +50,9 @@ def players_games(username: Any) -> Iterator[TDataItems]: print("You must run this from the docs/examples/chess folder") # chess_url in config.toml, credentials for postgres in secrets.toml, credentials always under credentials key # look for parallel run configuration in `config.toml`! - # mind the full_refresh: it makes the pipeline to load to a distinct dataset each time it is run and always is resetting the schema and state + # mind the dev_mode: it makes the pipeline to load to a distinct dataset each time it is run and always is resetting the schema and state load_info = dlt.pipeline( - pipeline_name="chess_games", destination="postgres", dataset_name="chess", full_refresh=True + pipeline_name="chess_games", destination="postgres", dataset_name="chess", dev_mode=True ).run(chess(max_players=5, month=9)) # display where the data went print(load_info) diff --git a/docs/website/docs/build-a-pipeline-tutorial.md b/docs/website/docs/build-a-pipeline-tutorial.md index 88a64b46a0..a7cd4e4050 100644 --- a/docs/website/docs/build-a-pipeline-tutorial.md +++ b/docs/website/docs/build-a-pipeline-tutorial.md @@ -329,7 +329,7 @@ pipeline = dlt.pipeline( pipeline_name="github_pipeline", destination="duckdb", dataset_name="github_reactions", - full_refresh=True + dev_mode=True ) with pipeline.sql_client() as client: diff --git a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md index c2f6786f8d..d6ec36ae49 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md @@ -90,7 +90,7 @@ p = dlt.pipeline( pipeline_name='chess', destination=dlt.destinations.duckdb("files/data.db"), dataset_name='chess_data', - full_refresh=False + dev_mode=False ) # will load data to /var/local/database.duckdb (absolute path) @@ -98,7 +98,7 @@ p = dlt.pipeline( pipeline_name='chess', destination=dlt.destinations.duckdb("/var/local/database.duckdb"), dataset_name='chess_data', - full_refresh=False + dev_mode=False ) ``` @@ -112,7 +112,7 @@ p = dlt.pipeline( pipeline_name="chess", destination=dlt.destinations.duckdb(db), dataset_name="chess_data", - full_refresh=False, + dev_mode=False, ) # Or if you would like to use in-memory duckdb instance @@ -183,4 +183,3 @@ This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-d This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). - diff --git a/docs/website/docs/dlt-ecosystem/transformations/pandas.md b/docs/website/docs/dlt-ecosystem/transformations/pandas.md index 5a82d8be66..0e08666eaf 100644 --- a/docs/website/docs/dlt-ecosystem/transformations/pandas.md +++ b/docs/website/docs/dlt-ecosystem/transformations/pandas.md @@ -16,7 +16,7 @@ pipeline = dlt.pipeline( pipeline_name="github_pipeline", destination="duckdb", dataset_name="github_reactions", - full_refresh=True + dev_mode=True ) with pipeline.sql_client() as client: with client.execute_query( diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/salesforce.md b/docs/website/docs/dlt-ecosystem/verified-sources/salesforce.md index 70dcc979f3..f00e185480 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/salesforce.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/salesforce.md @@ -262,7 +262,7 @@ To create your data pipeline using single loading and > For incremental loading of endpoints, maintain the pipeline name and destination dataset name. > The pipeline name is important for accessing the [state](../../general-usage/state) from the > last run, including the end date for incremental data loads. Altering these names could trigger - > a [“full_refresh”](../../general-usage/pipeline#do-experiments-with-full-refresh), disrupting + > a [“dev-mode”](../../general-usage/pipeline#do-experiments-with-dev-mode), disrupting > the metadata tracking for [incremental data loading](../../general-usage/incremental-loading). 1. To load data from the “contact” in replace mode and “task” incrementally merge mode endpoints: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/zendesk.md b/docs/website/docs/dlt-ecosystem/verified-sources/zendesk.md index b8993ae8d5..cfccf5d675 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/zendesk.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/zendesk.md @@ -330,10 +330,10 @@ verified source. ```py pipeline = dlt.pipeline( - pipeline_name="dlt_zendesk_pipeline", # Use a custom name if desired - destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post) - full_refresh = False, - dataset_name="sample_zendesk_data" # Use a custom name if desired + pipeline_name="dlt_zendesk_pipeline", # Use a custom name if desired + destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post) + dev_mode = False, + dataset_name="sample_zendesk_data" # Use a custom name if desired ) data = zendesk_support(load_all=True, start_date=start_date) data_chat = zendesk_chat(start_date=start_date) diff --git a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md b/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md index ffe0abd082..d9aae62f94 100644 --- a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md +++ b/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md @@ -59,7 +59,7 @@ pipeline = dlt.pipeline( pipeline_name="github_pipeline", destination="duckdb", dataset_name="github_reactions", - full_refresh=True + dev_mode=True ) with pipeline.sql_client() as client: with client.execute_query( diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index 4780d4be20..b53d864a96 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -276,12 +276,12 @@ Notice that the `mydata.users` table now contains the data from both the previou ## Versioned datasets -When you set the `full_refresh` argument to `True` in `dlt.pipeline` call, dlt creates a versioned dataset. +When you set the `dev_mode` argument to `True` in `dlt.pipeline` call, dlt creates a versioned dataset. This means that each time you run the pipeline, the data is loaded into a new dataset (a new database schema). The dataset name is the same as the `dataset_name` you provided in the pipeline definition with a datetime-based suffix. -We modify our pipeline to use the `full_refresh` option to see how this works: +We modify our pipeline to use the `dev_mode` option to see how this works: ```py import dlt @@ -295,7 +295,7 @@ pipeline = dlt.pipeline( pipeline_name='quick_start', destination='duckdb', dataset_name='mydata', - full_refresh=True # <-- add this line + dev_mode=True # <-- add this line ) load_info = pipeline.run(data, table_name="users") ``` diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 18bdb13b06..c2c951c9b0 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -223,7 +223,7 @@ pipeline = dlt.pipeline( pipeline_name='facebook_insights', destination='duckdb', dataset_name='facebook_insights_data', - full_refresh=True + dev_mode=True ) fb_ads = facebook_ads_source() # enable root key propagation on a source that is not a merge one by default. diff --git a/docs/website/docs/general-usage/pipeline.md b/docs/website/docs/general-usage/pipeline.md index 53eca2e59a..d1f82f970a 100644 --- a/docs/website/docs/general-usage/pipeline.md +++ b/docs/website/docs/general-usage/pipeline.md @@ -1,7 +1,7 @@ --- title: Pipeline description: Explanation of what a dlt pipeline is -keywords: [pipeline, source, full refresh] +keywords: [pipeline, source, full refresh, dev mode] --- # Pipeline @@ -85,13 +85,47 @@ You can inspect stored artifacts using the command > 💡 You can attach `Pipeline` instance to an existing working folder, without creating a new > pipeline with `dlt.attach`. -## Do experiments with full refresh +## Do experiments with dev mode If you [create a new pipeline script](../walkthroughs/create-a-pipeline.md) you will be experimenting a lot. If you want that each time the pipeline resets its state and loads data to a -new dataset, set the `full_refresh` argument of the `dlt.pipeline` method to True. Each time the +new dataset, set the `dev_mode` argument of the `dlt.pipeline` method to True. Each time the pipeline is created, `dlt` adds datetime-based suffix to the dataset name. +## Refresh pipeline data and state + +You can reset parts or all of your sources by using the `refresh` argument to `dlt.pipeline` or the pipeline's `run` or `extract` method. +That means when you run the pipeline the sources/resources being processed will have their state reset and their tables either dropped or truncated +depending on which refresh mode is used. + +The `refresh` argument should have one of the following string values to decide the refresh mode: + +* `drop_sources` + All sources being processed in `pipeline.run` or `pipeline.extract` are refreshed. + That means all tables listed in their schemas are dropped and state belonging to those sources and all their resources is completely wiped. + The tables are deleted both from pipeline's schema and from the destination database. + + If you only have one source or run with all your sources together, then this is practically like running the pipeline again for the first time + + :::caution + This erases schema history for the selected sources and only the latest version is stored + :::: + +* `drop_resources` + Limits the refresh to the resources being processed in `pipeline.run` or `pipeline.extract` (.e.g by using `source.with_resources(...)`). + Tables belonging to those resources are dropped and their resource state is wiped (that includes incremental state). + The tables are deleted both from pipeline's schema and from the destination database. + + Source level state keys are not deleted in this mode (i.e. `dlt.state()[<'my_key>'] = ''`) + + :::caution + This erases schema history for all affected schemas and only the latest schema version is stored + :::: + +* `drop_data` + Same as `drop_resources` but instead of dropping tables from schema only the data is deleted from them (i.e. by `TRUNCATE ` in sql destinations). Resource state for selected resources is also wiped. + The schema remains unmodified in this case. + ## Display the loading progress You can add a progress monitor to the pipeline. Typically, its role is to visually assure user that diff --git a/docs/website/docs/general-usage/state.md b/docs/website/docs/general-usage/state.md index 0ab2b8a658..4a9e453ea4 100644 --- a/docs/website/docs/general-usage/state.md +++ b/docs/website/docs/general-usage/state.md @@ -125,7 +125,7 @@ will display source and resource state slots for all known sources. **To fully reset the state:** - Drop the destination dataset to fully reset the pipeline. -- [Set the `full_refresh` flag when creating pipeline](pipeline.md#do-experiments-with-full-refresh). +- [Set the `dev_mode` flag when creating pipeline](pipeline.md#do-experiments-with-dev-mode). - Use the `dlt pipeline drop --drop-all` command to [drop state and tables for a given schema name](../reference/command-line-interface.md#selectively-drop-tables-and-reset-state). diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index 68ec8ed72d..7fc0f2bce9 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -20,7 +20,7 @@ def read_table(limit): # this prevents process pool to run the initialization code again if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: - pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) + pipeline = dlt.pipeline("parallel_load", destination="duckdb", dev_mode=True) pipeline.extract(read_table(1000000)) load_id = pipeline.list_extracted_load_packages()[0] @@ -168,8 +168,8 @@ def _run_pipeline(pipeline, gen_): return pipeline.run(gen_()) # declare pipelines in main thread then run them "async" - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) + pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", dev_mode=True) async def _run_async(): loop = asyncio.get_running_loop() diff --git a/docs/website/docs/walkthroughs/adjust-a-schema.md b/docs/website/docs/walkthroughs/adjust-a-schema.md index b0a9a9ce05..b92f431f80 100644 --- a/docs/website/docs/walkthroughs/adjust-a-schema.md +++ b/docs/website/docs/walkthroughs/adjust-a-schema.md @@ -71,13 +71,13 @@ You should keep the import schema as simple as possible and let `dlt` do the res automatically on the next run. It means that after a user update, the schema in `import` folder reverts all the automatic updates from the data. -In next steps we'll experiment a lot, you will be warned to set `full_refresh=True` until we are done experimenting. +In next steps we'll experiment a lot, you will be warned to set `dev_mode=True` until we are done experimenting. :::caution `dlt` will **not modify** tables after they are created. So if you have a `yaml` file, and you change it (e.g. change a data type or add a hint), then you need to **delete the dataset** -or set `full_refresh=True`: +or set `dev_mode=True`: ```py dlt.pipeline( import_schema_path="schemas/import", @@ -85,7 +85,7 @@ dlt.pipeline( pipeline_name="chess_pipeline", destination='duckdb', dataset_name="games_data", - full_refresh=True, + dev_mode=True, ) ``` ::: diff --git a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md index 329f484874..ce76240c8a 100644 --- a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md +++ b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md @@ -151,7 +151,7 @@ def load_data(): pipeline_name='pipeline_name', dataset_name='dataset_name', destination='duckdb', - full_refresh=False # must be false if we decompose + dev_mode=False # must be false if we decompose ) # Create the source, the "serialize" decompose option # will convert dlt resources into Airflow tasks. diff --git a/tests/cli/cases/deploy_pipeline/debug_pipeline.py b/tests/cli/cases/deploy_pipeline/debug_pipeline.py index c49e8b524d..1f5bfad976 100644 --- a/tests/cli/cases/deploy_pipeline/debug_pipeline.py +++ b/tests/cli/cases/deploy_pipeline/debug_pipeline.py @@ -17,7 +17,7 @@ def example_source(api_url=dlt.config.value, api_key=dlt.secrets.value, last_id= pipeline_name="debug_pipeline", destination="postgres", dataset_name="debug_pipeline_data", - full_refresh=False, + dev_mode=False, ) load_info = p.run(example_source(last_id=819273998)) print(load_info) diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index 43ccdf856c..945856e93f 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -53,6 +53,7 @@ add_config_dict_to_env, add_config_to_env, ) +from dlt.common.pipeline import TRefreshMode from tests.utils import preserve_environ from tests.common.configuration.utils import ( @@ -240,6 +241,11 @@ def resolve_dynamic_type_field(self) -> Type[Union[int, str]]: return str +@configspec +class ConfigWithLiteralField(BaseConfiguration): + refresh: TRefreshMode = None + + LongInteger = NewType("LongInteger", int) FirstOrderStr = NewType("FirstOrderStr", str) SecondOrderStr = NewType("SecondOrderStr", FirstOrderStr) @@ -1310,3 +1316,20 @@ class EmbeddedConfigurationWithDefaults(BaseConfiguration): c_resolved = resolve.resolve_configuration(c_instance) assert c_resolved.is_resolved() assert c_resolved.conn_str.is_resolved() + + +def test_configuration_with_literal_field(environment: Dict[str, str]) -> None: + """Literal type fields only allow values from the literal""" + environment["REFRESH"] = "not_a_refresh_mode" + + with pytest.raises(ConfigValueCannotBeCoercedException) as einfo: + resolve.resolve_configuration(ConfigWithLiteralField()) + + assert einfo.value.field_name == "refresh" + assert einfo.value.field_value == "not_a_refresh_mode" + assert einfo.value.hint == TRefreshMode + + environment["REFRESH"] = "drop_data" + + spec = resolve.resolve_configuration(ConfigWithLiteralField()) + assert spec.refresh == "drop_data" diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 308b65bd37..125c699c90 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -296,7 +296,7 @@ def some_data(param: str): # create two resource instances and extract in single ad hoc resource data1 = some_data("state1") data1._pipe.name = "state1_data" - dlt.pipeline(full_refresh=True).extract([data1, some_data("state2")], schema=Schema("default")) + dlt.pipeline(dev_mode=True).extract([data1, some_data("state2")], schema=Schema("default")) # both should be extracted. what we test here is the combination of binding the resource by calling it that clones the internal pipe # and then creating a source with both clones. if we keep same pipe id when cloning on call, a single pipe would be created shared by two resources assert all_yields == ["state1", "state2"] @@ -738,7 +738,7 @@ def test_source(no_resources): def test_source_resource_attrs_with_conflicting_attrs() -> None: """Resource names that conflict with DltSource attributes do not work with attribute access""" - dlt.pipeline(full_refresh=True) # Create pipeline so state property can be accessed + dlt.pipeline(dev_mode=True) # Create pipeline so state property can be accessed names = ["state", "resources", "schema", "name", "clone"] @dlt.source @@ -842,7 +842,7 @@ def test_source(expected_state): with pytest.raises(PipelineStateNotAvailable): test_source({}).state - dlt.pipeline(full_refresh=True) + dlt.pipeline(dev_mode=True) assert test_source({}).state == {} # inject state to see if what we write in state is there @@ -872,7 +872,7 @@ def test_source(): with pytest.raises(PipelineStateNotAvailable): s.test_resource.state - p = dlt.pipeline(full_refresh=True) + p = dlt.pipeline(dev_mode=True) assert r.state == {} assert s.state == {} assert s.test_resource.state == {} diff --git a/tests/helpers/dbt_tests/local/test_runner_destinations.py b/tests/helpers/dbt_tests/local/test_runner_destinations.py index c9e4b7c83b..244f06e9ce 100644 --- a/tests/helpers/dbt_tests/local/test_runner_destinations.py +++ b/tests/helpers/dbt_tests/local/test_runner_destinations.py @@ -99,7 +99,7 @@ def test_dbt_test_no_raw_schema(destination_info: DBTDestinationInfo) -> None: assert isinstance(prq_ex.value.args[0], DBTProcessingError) -def test_dbt_run_full_refresh(destination_info: DBTDestinationInfo) -> None: +def test_dbt_run_dev_mode(destination_info: DBTDestinationInfo) -> None: if destination_info.destination_name == "redshift": pytest.skip("redshift disabled due to missing fixtures") runner = setup_rasa_runner(destination_info.destination_name) diff --git a/tests/load/athena_iceberg/test_athena_iceberg.py b/tests/load/athena_iceberg/test_athena_iceberg.py index d3bb9eb5f5..4fe01752ee 100644 --- a/tests/load/athena_iceberg/test_athena_iceberg.py +++ b/tests/load/athena_iceberg/test_athena_iceberg.py @@ -28,7 +28,7 @@ def test_iceberg() -> None: pipeline_name="athena-iceberg", destination="athena", staging="filesystem", - full_refresh=True, + dev_mode=True, ) def items() -> Iterator[Any]: diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 2db90200ec..df564192dc 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -200,7 +200,7 @@ def test_create_table_with_integer_partition(gcp_client: BigQueryClient) -> None def test_bigquery_partition_by_date( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -243,7 +243,7 @@ def demo_source() -> DltResource: def test_bigquery_no_partition_by_date( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -286,7 +286,7 @@ def demo_source() -> DltResource: def test_bigquery_partition_by_timestamp( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -329,7 +329,7 @@ def demo_source() -> DltResource: def test_bigquery_no_partition_by_timestamp( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -372,7 +372,7 @@ def demo_source() -> DltResource: def test_bigquery_partition_by_integer( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( columns={"some_int": {"data_type": "bigint", "partition": True, "nullable": False}}, @@ -407,7 +407,7 @@ def demo_source() -> DltResource: def test_bigquery_no_partition_by_integer( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( columns={"some_int": {"data_type": "bigint", "partition": False, "nullable": False}}, @@ -510,7 +510,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -570,7 +570,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -632,7 +632,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -764,7 +764,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -814,7 +814,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -904,7 +904,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -1004,7 +1004,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -1049,7 +1049,7 @@ def hints() -> Iterator[Dict[str, Any]]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(hints) diff --git a/tests/load/pipeline/test_athena.py b/tests/load/pipeline/test_athena.py index a5bb6efc0d..272cc701d5 100644 --- a/tests/load/pipeline/test_athena.py +++ b/tests/load/pipeline/test_athena.py @@ -30,7 +30,7 @@ ids=lambda x: x.name, ) def test_athena_destinations(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) @dlt.resource(name="items", write_disposition="append") def items(): @@ -88,7 +88,7 @@ def items2(): def test_athena_all_datatypes_and_timestamps( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) # TIME is not supported column_schemas, data_types = table_update_and_row(exclude_types=["time"]) @@ -176,7 +176,7 @@ def my_source() -> Any: ids=lambda x: x.name, ) def test_athena_blocks_time_column(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) column_schemas, data_types = table_update_and_row() diff --git a/tests/load/pipeline/test_dbt_helper.py b/tests/load/pipeline/test_dbt_helper.py index 38e66c4ab9..6793414e3c 100644 --- a/tests/load/pipeline/test_dbt_helper.py +++ b/tests/load/pipeline/test_dbt_helper.py @@ -39,7 +39,7 @@ def test_run_jaffle_package( pytest.skip( "dbt-athena requires database to be created and we don't do it in case of Jaffle" ) - pipeline = destination_config.setup_pipeline("jaffle_jaffle", full_refresh=True) + pipeline = destination_config.setup_pipeline("jaffle_jaffle", dev_mode=True) # get runner, pass the env from fixture dbt = dlt.dbt.package(pipeline, "https://github.com/dbt-labs/jaffle_shop.git", venv=dbt_venv) # no default schema @@ -76,7 +76,7 @@ def test_run_chess_dbt(destination_config: DestinationTestConfiguration, dbt_ven os.environ["CHESS_URL"] = "https://api.chess.com/pub/" pipeline = destination_config.setup_pipeline( - "chess_games", dataset_name="chess_dbt_test", full_refresh=True + "chess_games", dataset_name="chess_dbt_test", dev_mode=True ) assert pipeline.default_schema_name is None # get the runner for the "dbt_transform" package @@ -129,7 +129,7 @@ def test_run_chess_dbt_to_other_dataset( os.environ["CHESS_URL"] = "https://api.chess.com/pub/" pipeline = destination_config.setup_pipeline( - "chess_games", dataset_name="chess_dbt_test", full_refresh=True + "chess_games", dataset_name="chess_dbt_test", dev_mode=True ) # load each schema in separate dataset pipeline.config.use_single_dataset = False diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index afae1c22ca..313ba63a2c 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -130,7 +130,7 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon """Test the drop command with resource and state path options and verify correct data is deleted from destination and locally""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -155,7 +155,7 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon def test_drop_command_only_state(destination_config: DestinationTestConfiguration) -> None: """Test drop command that deletes part of the state and syncs with destination""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -198,22 +198,24 @@ def test_drop_command_only_tables(destination_config: DestinationTestConfigurati "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_drop_destination_tables_fails(destination_config: DestinationTestConfiguration) -> None: - """Fail on drop tables. Command runs again.""" + """Fail on DROP TABLES in destination init. Command runs again.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) with mock.patch.object( - helpers.DropCommand, - "_drop_destination_tables", - side_effect=RuntimeError("Something went wrong"), + pipeline.destination.client_class, + "drop_tables", + autospec=True, + side_effect=RuntimeError("Oh no!"), ): - with pytest.raises(RuntimeError): + with pytest.raises(PipelineStepFailed) as einfo: helpers.drop(attached, resources=("droppable_a", "droppable_b")) + assert isinstance(einfo.value.exception, RuntimeError) + assert "Oh no!" in str(einfo.value.exception) - attached = _attach(pipeline) helpers.drop(attached, resources=("droppable_a", "droppable_b")) assert_dropped_resources(attached, ["droppable_a", "droppable_b"]) @@ -226,17 +228,24 @@ def test_drop_destination_tables_fails(destination_config: DestinationTestConfig def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration) -> None: """Fail directly after drop tables. Command runs again ignoring destination tables missing.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) + # Fail on client update_stored_schema with mock.patch.object( - helpers.DropCommand, "_extract_state", side_effect=RuntimeError("Something went wrong") + pipeline.destination.client_class, + "update_stored_schema", + autospec=True, + side_effect=RuntimeError("Oh no!"), ): - with pytest.raises(RuntimeError): + with pytest.raises(PipelineStepFailed) as einfo: helpers.drop(attached, resources=("droppable_a", "droppable_b")) + assert isinstance(einfo.value.exception, RuntimeError) + assert "Oh no!" in str(einfo.value.exception) + attached = _attach(pipeline) helpers.drop(attached, resources=("droppable_a", "droppable_b")) @@ -250,7 +259,7 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration def test_load_step_fails(destination_config: DestinationTestConfiguration) -> None: """Test idempotence. pipeline.load() fails. Command can be run again successfully""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -272,7 +281,7 @@ def test_load_step_fails(destination_config: DestinationTestConfiguration) -> No ) def test_resource_regex(destination_config: DestinationTestConfiguration) -> None: source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -291,7 +300,7 @@ def test_resource_regex(destination_config: DestinationTestConfiguration) -> Non def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None: """No resources, no state keys. Nothing is changed.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -309,7 +318,7 @@ def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None: def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None: """Using drop_all flag. Destination dataset and all local state is deleted""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) dlt_tables = [ t["name"] for t in pipeline.default_schema.dlt_tables() @@ -335,7 +344,7 @@ def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None ) def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources""" - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(droppable_source()) attached = _attach(pipeline) @@ -354,7 +363,7 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf ) def test_drop_state_only(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources""" - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(droppable_source()) attached = _attach(pipeline) diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 5f24daf57f..623284d8a7 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -362,8 +362,9 @@ def _collect_table_counts(p) -> Dict[str, int]: ) # generate 4 loads from 2 pipelines, store load ids - p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") - p2 = destination_config.setup_pipeline("p2", dataset_name="layout_test") + dataset_name = "layout_test_" + uniq_id() + p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name) + p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name) c1 = cast(FilesystemClient, p1.destination_client()) c2 = cast(FilesystemClient, p2.destination_client()) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index f4e039ee81..a3f5083ae6 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -31,7 +31,7 @@ "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_on_keys_in_schema(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("eth_2", full_refresh=True) + p = destination_config.setup_pipeline("eth_2", dev_mode=True) with open("tests/common/cases/schemas/eth/ethereum_schema_v5.yml", "r", encoding="utf-8") as f: schema = dlt.Schema.from_dict(yaml.safe_load(f)) @@ -97,7 +97,7 @@ def test_merge_on_keys_in_schema(destination_config: DestinationTestConfiguratio "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_on_ad_hoc_primary_key(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_1", full_refresh=True) + p = destination_config.setup_pipeline("github_1", dev_mode=True) with open( "tests/normalize/cases/github.issues.load_page_5_duck.json", "r", encoding="utf-8" @@ -162,7 +162,7 @@ def load_issues(): def test_merge_source_compound_keys_and_changes( destination_config: DestinationTestConfiguration, ) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) info = p.run(github(), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -211,7 +211,7 @@ def test_merge_source_compound_keys_and_changes( "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() assert github_data.max_table_nesting is None assert github_data.root_key is True @@ -251,7 +251,7 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) - # NOTE: we can test filesystem destination merge behavior here too, will also fallback! if destination_config.file_format == "insert_values": pytest.skip("Insert values row count checking is buggy, skipping") - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # remove all keys github_data.load_issues.apply_hints(merge_key=(), primary_key=()) @@ -279,7 +279,7 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_keys_non_existing_columns(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # set keys names that do not exist in the data github_data.load_issues.apply_hints(merge_key=("mA1", "Ma2"), primary_key=("123-x",)) @@ -318,7 +318,7 @@ def test_merge_keys_non_existing_columns(destination_config: DestinationTestConf ids=lambda x: x.name, ) def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # generate some complex types github_data.max_table_nesting = 2 @@ -447,7 +447,7 @@ def _updated_event(node_id): ] # load to destination - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) info = p.run( _get_shuffled_events(True) | github_resource, loader_file_format=destination_config.file_format, @@ -507,7 +507,7 @@ def _updated_event(node_id): "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_deduplicate_single_load(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource(write_disposition="merge", primary_key="id") def duplicates(): @@ -538,7 +538,7 @@ def duplicates_no_child(): "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_no_deduplicate_only_merge_key(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource(write_disposition="merge", merge_key="id") def duplicates(): @@ -575,7 +575,7 @@ def test_complex_column_missing(destination_config: DestinationTestConfiguration def r(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) data = [{"id": 1, "simple": "foo", "complex": [1, 2, 3]}] info = p.run(r(data), loader_file_format=destination_config.file_format) @@ -618,7 +618,7 @@ def data_resource(data): # we test what happens if there are no merge keys pass - p = destination_config.setup_pipeline(f"abstract_{key_type}", full_refresh=True) + p = destination_config.setup_pipeline(f"abstract_{key_type}", dev_mode=True) # insert two records data = [ @@ -766,7 +766,7 @@ def test_hard_delete_hint_config(destination_config: DestinationTestConfiguratio def data_resource(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) # insert two records data = [ @@ -828,7 +828,7 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No def data_resource(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) # three records with same primary key data = [ diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index d98f335d16..ad44cd6f5c 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -217,7 +217,7 @@ def _data(): for d in data: yield d - p = destination_config.setup_pipeline("test_skip_sync_schema_for_tables", full_refresh=True) + p = destination_config.setup_pipeline("test_skip_sync_schema_for_tables", dev_mode=True) p.extract(_data) schema = p.default_schema assert "data_table" in schema.tables @@ -240,7 +240,7 @@ def _data(): destinations_configs(default_sql_configs=True, all_buckets_filesystem_configs=True), ids=lambda x: x.name, ) -def test_run_full_refresh(destination_config: DestinationTestConfiguration) -> None: +def test_run_dev_mode(destination_config: DestinationTestConfiguration) -> None: data = ["a", ["a", "b", "c"], ["a", "b", "c"]] destination_config.setup() @@ -251,7 +251,7 @@ def d(): def _data(): return dlt.resource(d(), name="lists", write_disposition="replace") - p = dlt.pipeline(full_refresh=True) + p = dlt.pipeline(dev_mode=True) info = p.run( _data(), destination=destination_config.destination, @@ -267,7 +267,7 @@ def _data(): # restore the pipeline p = dlt.attach() # restored pipeline should be never put in full refresh - assert p.full_refresh is False + assert p.dev_mode is False # assert parent table (easy), None First (db order) assert_table(p, "lists", [None, None, "a"], info=info) # child tables contain nested lists @@ -456,7 +456,7 @@ def test_dataset_name_change(destination_config: DestinationTestConfiguration) - ds_2_name = "IteRation" + uniq_id() # illegal name that will be later normalized ds_3_name = "1it/era 👍 tion__" + uniq_id() - p, s = simple_nested_pipeline(destination_config, dataset_name=ds_1_name, full_refresh=False) + p, s = simple_nested_pipeline(destination_config, dataset_name=ds_1_name, dev_mode=False) try: info = p.run(s(), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -589,7 +589,7 @@ def conflict(): # conflict deselected assert "conflict" not in discover_2.tables - p = dlt.pipeline(pipeline_name="multi", destination="duckdb", full_refresh=True) + p = dlt.pipeline(pipeline_name="multi", destination="duckdb", dev_mode=True) p.extract([source_1(), source_2()]) default_schema = p.default_schema gen1_table = default_schema.tables["gen1"] @@ -614,7 +614,7 @@ def conflict(): drop_active_pipeline_data() # same pipeline but enable conflict - p = dlt.pipeline(pipeline_name="multi", destination="duckdb", full_refresh=True) + p = dlt.pipeline(pipeline_name="multi", destination="duckdb", dev_mode=True) with pytest.raises(PipelineStepFailed) as py_ex: p.extract([source_1(), source_2().with_resources("conflict")]) assert isinstance(py_ex.value.__context__, CannotCoerceColumnException) @@ -902,7 +902,7 @@ def test_pipeline_upfront_tables_two_loads( pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", dataset_name="test_pipeline_upfront_tables_two_loads", - full_refresh=True, + dev_mode=True, ) @dlt.source @@ -1052,7 +1052,7 @@ def table_3(make_data=False): # pipeline = destination_config.setup_pipeline( # "test_load_non_utc_timestamps", # dataset_name="test_load_non_utc_timestamps", -# full_refresh=True, +# dev_mode=True, # ) # info = pipeline.run(some_data()) # # print(pipeline.default_schema.to_pretty_yaml()) @@ -1062,7 +1062,7 @@ def table_3(make_data=False): def simple_nested_pipeline( - destination_config: DestinationTestConfiguration, dataset_name: str, full_refresh: bool + destination_config: DestinationTestConfiguration, dataset_name: str, dev_mode: bool ) -> Tuple[dlt.Pipeline, Callable[[], DltSource]]: data = ["a", ["a", "b", "c"], ["a", "b", "c"]] @@ -1075,7 +1075,7 @@ def _data(): p = dlt.pipeline( pipeline_name=f"pipeline_{dataset_name}", - full_refresh=full_refresh, + dev_mode=dev_mode, destination=destination_config.destination, staging=destination_config.staging, dataset_name=dataset_name, diff --git a/tests/load/pipeline/test_redshift.py b/tests/load/pipeline/test_redshift.py index 7e786f6845..29293693f5 100644 --- a/tests/load/pipeline/test_redshift.py +++ b/tests/load/pipeline/test_redshift.py @@ -18,7 +18,7 @@ ids=lambda x: x.name, ) def test_redshift_blocks_time_column(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("redshift_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("redshift_" + uniq_id(), dev_mode=True) column_schemas, data_types = table_update_and_row() diff --git a/tests/load/pipeline/test_refresh_modes.py b/tests/load/pipeline/test_refresh_modes.py new file mode 100644 index 0000000000..02ed560068 --- /dev/null +++ b/tests/load/pipeline/test_refresh_modes.py @@ -0,0 +1,439 @@ +from typing import Any, List + +import pytest +import dlt +from dlt.common.pipeline import resource_state +from dlt.destinations.sql_client import DBApiCursor +from dlt.pipeline.state_sync import load_pipeline_state_from_destination +from dlt.common.typing import DictStrAny +from dlt.common.pipeline import pipeline_state as current_pipeline_state + +from tests.utils import clean_test_storage, preserve_environ +from tests.pipeline.utils import ( + assert_load_info, + load_tables_to_dicts, + assert_only_table_columns, + table_exists, +) +from tests.load.utils import destinations_configs, DestinationTestConfiguration + + +def assert_source_state_is_wiped(state: DictStrAny) -> None: + # Keys contains only "resources" or is empty + assert list(state.keys()) == ["resources"] or not state + for value in state["resources"].values(): + assert not value + + +def column_values(cursor: DBApiCursor, column_name: str) -> List[Any]: + """Return all values in a column from a cursor""" + idx = [c[0] for c in cursor.native_cursor.description].index(column_name) + return [row[idx] for row in cursor.fetchall()] + + +@dlt.source +def refresh_source(first_run: bool = True, drop_sources: bool = False): + @dlt.resource + def some_data_1(): + if first_run: + # Set some source and resource state + dlt.state()["source_key_1"] = "source_value_1" + resource_state("some_data_1")["run1_1"] = "value1_1" + resource_state("some_data_1")["run1_2"] = "value1_2" + yield {"id": 1, "name": "John"} + yield {"id": 2, "name": "Jane"} + else: + # Check state is cleared for this resource + assert not resource_state("some_data_1") + if drop_sources: + assert_source_state_is_wiped(dlt.state()) + # Second dataset without name column to test tables are re-created + yield {"id": 3} + yield {"id": 4} + + @dlt.resource + def some_data_2(): + if first_run: + dlt.state()["source_key_2"] = "source_value_2" + resource_state("some_data_2")["run1_1"] = "value1_1" + resource_state("some_data_2")["run1_2"] = "value1_2" + yield {"id": 5, "name": "Joe"} + yield {"id": 6, "name": "Jill"} + else: + assert not resource_state("some_data_2") + if drop_sources: + assert_source_state_is_wiped(dlt.state()) + yield {"id": 7} + yield {"id": 8} + + @dlt.resource + def some_data_3(): + if first_run: + dlt.state()["source_key_3"] = "source_value_3" + resource_state("some_data_3")["run1_1"] = "value1_1" + yield {"id": 9, "name": "Jack"} + yield {"id": 10, "name": "Jill"} + else: + assert not resource_state("some_data_3") + if drop_sources: + assert_source_state_is_wiped(dlt.state()) + yield {"id": 11} + yield {"id": 12} + + @dlt.resource + def some_data_4(): + yield [] + + yield some_data_1 + yield some_data_2 + yield some_data_3 + yield some_data_4 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, subset=["duckdb", "filesystem"], local_filesystem_configs=True + ), + ids=lambda x: x.name, +) +def test_refresh_drop_sources(destination_config: DestinationTestConfiguration): + pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources") + + # First run pipeline so destination so tables are created + info = pipeline.run(refresh_source(first_run=True, drop_sources=True)) + assert_load_info(info) + + # Second run of pipeline with only selected resources + info = pipeline.run( + refresh_source(first_run=False, drop_sources=True).with_resources( + "some_data_1", "some_data_2" + ) + ) + + assert set(t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True)) == { + "some_data_1", + "some_data_2", + # Table has never seen data and is not dropped + "some_data_4", + } + + # No "name" column should exist as table was dropped and re-created without it + assert_only_table_columns(pipeline, "some_data_1", ["id"]) + data = load_tables_to_dicts(pipeline, "some_data_1")["some_data_1"] + result = sorted([row["id"] for row in data]) + # Only rows from second run should exist + assert result == [3, 4] + + # Confirm resource tables not selected on second run got dropped + assert not table_exists(pipeline, "some_data_3") + # Loaded state is wiped + with pipeline.destination_client() as dest_client: + destination_state = load_pipeline_state_from_destination( + pipeline.pipeline_name, dest_client # type: ignore[arg-type] + ) + assert_source_state_is_wiped(destination_state["sources"]["refresh_source"]) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), + ids=lambda x: x.name, +) +def test_existing_schema_hash(destination_config: DestinationTestConfiguration): + """Test when new schema is identical to a previously stored schema after dropping and re-creating tables. + The change should be detected regardless and tables are created again in destination db + """ + pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources") + + info = pipeline.run(refresh_source(first_run=True, drop_sources=True)) + assert_load_info(info) + first_schema_hash = pipeline.default_schema.version_hash + + # Second run with all tables dropped and only some tables re-created + info = pipeline.run( + refresh_source(first_run=False, drop_sources=True).with_resources( + "some_data_1", "some_data_2" + ) + ) + + # Just check the local schema + new_table_names = set( + t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True) + ) + assert new_table_names == {"some_data_1", "some_data_2", "some_data_4"} + + # Run again with all tables to ensure they are re-created + # The new schema in this case should match the schema of the first run exactly + info = pipeline.run(refresh_source(first_run=True, drop_sources=True)) + # Check table 3 was re-created + data = load_tables_to_dicts(pipeline, "some_data_3")["some_data_3"] + result = sorted([(row["id"], row["name"]) for row in data]) + assert result == [(9, "Jack"), (10, "Jill")] + + # Schema is identical to first schema + new_schema_hash = pipeline.default_schema.version_hash + assert new_schema_hash == first_schema_hash + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), + ids=lambda x: x.name, +) +def test_refresh_drop_resources(destination_config: DestinationTestConfiguration): + # First run pipeline with load to destination so tables are created + pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_tables") + + info = pipeline.run(refresh_source(first_run=True)) + assert_load_info(info) + + # Second run of pipeline with only selected resources + info = pipeline.run( + refresh_source(first_run=False).with_resources("some_data_1", "some_data_2") + ) + + # Confirm resource tables not selected on second run are untouched + data = load_tables_to_dicts(pipeline, "some_data_3")["some_data_3"] + result = sorted([(row["id"], row["name"]) for row in data]) + assert result == [(9, "Jack"), (10, "Jill")] + + # Check the columns to ensure the name column was dropped + assert_only_table_columns(pipeline, "some_data_1", ["id"]) + data = load_tables_to_dicts(pipeline, "some_data_1")["some_data_1"] + # Only second run data + result = sorted([row["id"] for row in data]) + assert result == [3, 4] + + # Loaded state contains only keys created in second run + with pipeline.destination_client() as dest_client: + destination_state = load_pipeline_state_from_destination( + pipeline.pipeline_name, dest_client # type: ignore[arg-type] + ) + + source_state = destination_state["sources"]["refresh_source"] + # Source level state is kept + assert source_state["source_key_1"] == "source_value_1" + assert source_state["source_key_2"] == "source_value_2" + assert source_state["source_key_3"] == "source_value_3" + # Only resource excluded in second run remains + assert source_state["resources"]["some_data_3"] == {"run1_1": "value1_1"} + assert not source_state["resources"]["some_data_2"] + assert not source_state["resources"]["some_data_1"] + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), + ids=lambda x: x.name, +) +def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration): + """Refresh drop_data should truncate all selected tables before load""" + # First run pipeline with load to destination so tables are created + pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_data") + + info = pipeline.run(refresh_source(first_run=True), write_disposition="append") + assert_load_info(info) + + first_schema_hash = pipeline.default_schema.version_hash + + # Second run of pipeline with only selected resources + info = pipeline.run( + refresh_source(first_run=False).with_resources("some_data_1", "some_data_2"), + write_disposition="append", + ) + assert_load_info(info) + + # Schema should not be mutated + assert pipeline.default_schema.version_hash == first_schema_hash + + # Tables selected in second run are truncated and should only have data from second run + data = load_tables_to_dicts(pipeline, "some_data_1", "some_data_2", "some_data_3") + # name column still remains when table was truncated instead of dropped + # (except on filesystem where truncate and drop are the same) + if destination_config.destination == "filesystem": + result = sorted([row["id"] for row in data["some_data_1"]]) + assert result == [3, 4] + + result = sorted([row["id"] for row in data["some_data_2"]]) + assert result == [7, 8] + else: + result = sorted([(row["id"], row["name"]) for row in data["some_data_1"]]) + assert result == [(3, None), (4, None)] + + result = sorted([(row["id"], row["name"]) for row in data["some_data_2"]]) + assert result == [(7, None), (8, None)] + + # Other tables still have data from first run + result = sorted([(row["id"], row["name"]) for row in data["some_data_3"]]) + assert result == [(9, "Jack"), (10, "Jill")] + + # State of selected resources is wiped, source level state is kept + with pipeline.destination_client() as dest_client: + destination_state = load_pipeline_state_from_destination( + pipeline.pipeline_name, dest_client # type: ignore[arg-type] + ) + + source_state = destination_state["sources"]["refresh_source"] + assert source_state["source_key_1"] == "source_value_1" + assert source_state["source_key_2"] == "source_value_2" + assert source_state["source_key_3"] == "source_value_3" + assert not source_state["resources"]["some_data_1"] + assert not source_state["resources"]["some_data_2"] + assert source_state["resources"]["some_data_3"] == {"run1_1": "value1_1"} + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["duckdb"]), + ids=lambda x: x.name, +) +def test_refresh_drop_sources_multiple_sources(destination_config: DestinationTestConfiguration): + """ + Ensure only state and tables for currently selected source is dropped + """ + + @dlt.source + def refresh_source_2(first_run=True): + @dlt.resource + def source_2_data_1(): + pipeline_state, _ = current_pipeline_state(pipeline._container) + if first_run: + dlt.state()["source_2_key_1"] = "source_2_value_1" + resource_state("source_2_data_1")["run1_1"] = "value1_1" + yield {"product": "apple", "price": 1} + yield {"product": "banana", "price": 2} + else: + # First source should not have state wiped + assert ( + pipeline_state["sources"]["refresh_source"]["source_key_1"] == "source_value_1" + ) + assert pipeline_state["sources"]["refresh_source"]["resources"]["some_data_1"] == { + "run1_1": "value1_1", + "run1_2": "value1_2", + } + # Source state is wiped + assert_source_state_is_wiped(dlt.state()) + yield {"product": "orange"} + yield {"product": "pear"} + + @dlt.resource + def source_2_data_2(): + if first_run: + dlt.state()["source_2_key_2"] = "source_2_value_2" + resource_state("source_2_data_2")["run1_1"] = "value1_1" + yield {"product": "carrot", "price": 3} + yield {"product": "potato", "price": 4} + else: + assert_source_state_is_wiped(dlt.state()) + yield {"product": "cabbage"} + yield {"product": "lettuce"} + + yield source_2_data_1 + yield source_2_data_2 + + pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources") + + # Run both sources + info = pipeline.run( + [refresh_source(first_run=True, drop_sources=True), refresh_source_2(first_run=True)] + ) + assert_load_info(info, 2) + # breakpoint() + info = pipeline.run(refresh_source_2(first_run=False).with_resources("source_2_data_1")) + assert_load_info(info, 2) + + # Check source 1 schema still has all tables + table_names = set( + t["name"] for t in pipeline.schemas["refresh_source"].data_tables(include_incomplete=True) + ) + assert table_names == {"some_data_1", "some_data_2", "some_data_3", "some_data_4"} + + # Source 2 has only the selected tables + table_names = set( + t["name"] for t in pipeline.schemas["refresh_source_2"].data_tables(include_incomplete=True) + ) + assert table_names == {"source_2_data_1"} + + # Destination still has tables from source 1 + data = load_tables_to_dicts(pipeline, "some_data_1") + result = sorted([(row["id"], row["name"]) for row in data["some_data_1"]]) + assert result == [(1, "John"), (2, "Jane")] + + # # First table from source2 exists, with only first column + data = load_tables_to_dicts(pipeline, "source_2_data_1", schema_name="refresh_source_2") + assert_only_table_columns( + pipeline, "source_2_data_1", ["product"], schema_name="refresh_source_2" + ) + result = sorted([row["product"] for row in data["source_2_data_1"]]) + assert result == ["orange", "pear"] + + # # Second table from source 2 is gone + assert not table_exists(pipeline, "source_2_data_2", schema_name="refresh_source_2") + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), + ids=lambda x: x.name, +) +def test_refresh_argument_to_run(destination_config: DestinationTestConfiguration): + pipeline = destination_config.setup_pipeline("refresh_full_test") + + info = pipeline.run(refresh_source(first_run=True)) + assert_load_info(info) + + info = pipeline.run( + refresh_source(first_run=False).with_resources("some_data_3"), + refresh="drop_sources", + ) + assert_load_info(info) + + # Check local schema to confirm refresh was at all applied + tables = set(t["name"] for t in pipeline.default_schema.data_tables()) + assert tables == {"some_data_3"} + + # Run again without refresh to confirm refresh option doesn't persist on pipeline + info = pipeline.run(refresh_source(first_run=False).with_resources("some_data_2")) + assert_load_info(info) + + # Nothing is dropped + tables = set(t["name"] for t in pipeline.default_schema.data_tables()) + assert tables == {"some_data_2", "some_data_3"} + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), + ids=lambda x: x.name, +) +def test_refresh_argument_to_extract(destination_config: DestinationTestConfiguration): + pipeline = destination_config.setup_pipeline("refresh_full_test") + + info = pipeline.run(refresh_source(first_run=True)) + assert_load_info(info) + + pipeline.extract( + refresh_source(first_run=False).with_resources("some_data_3"), + refresh="drop_sources", + ) + + tables = set(t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True)) + # All other data tables removed + assert tables == {"some_data_3", "some_data_4"} + + # Run again without refresh to confirm refresh option doesn't persist on pipeline + pipeline.extract(refresh_source(first_run=False).with_resources("some_data_2")) + + tables = set(t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True)) + assert tables == {"some_data_2", "some_data_3", "some_data_4"} diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index f3b58aa5f6..464b5aea1f 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -260,7 +260,7 @@ def test_replace_table_clearing( os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy pipeline = destination_config.setup_pipeline( - "test_replace_table_clearing", dataset_name="test_replace_table_clearing", full_refresh=True + "test_replace_table_clearing", dataset_name="test_replace_table_clearing", dev_mode=True ) @dlt.resource(name="main_resource", write_disposition="replace", primary_key="id") diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index a2d00001c2..b287619e8c 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -355,7 +355,7 @@ def some_data(): # full refresh will not restore pipeline even if requested p._wipe_working_folder() p = destination_config.setup_pipeline( - pipeline_name=pipeline_name, dataset_name=dataset_name, full_refresh=True + pipeline_name=pipeline_name, dataset_name=dataset_name, dev_mode=True ) p.run(loader_file_format=destination_config.file_format) assert p.default_schema_name is None diff --git a/tests/load/pipeline/test_write_disposition_changes.py b/tests/load/pipeline/test_write_disposition_changes.py index b9c2b35717..16c589352e 100644 --- a/tests/load/pipeline/test_write_disposition_changes.py +++ b/tests/load/pipeline/test_write_disposition_changes.py @@ -27,7 +27,7 @@ def data_with_subtables(offset: int) -> Any: ) def test_switch_from_merge(destination_config: DestinationTestConfiguration): pipeline = destination_config.setup_pipeline( - pipeline_name="test_switch_from_merge", full_refresh=True + pipeline_name="test_switch_from_merge", dev_mode=True ) info = pipeline.run( @@ -96,7 +96,7 @@ def test_switch_from_merge(destination_config: DestinationTestConfiguration): @pytest.mark.parametrize("with_root_key", [True, False]) def test_switch_to_merge(destination_config: DestinationTestConfiguration, with_root_key: bool): pipeline = destination_config.setup_pipeline( - pipeline_name="test_switch_to_merge", full_refresh=True + pipeline_name="test_switch_to_merge", dev_mode=True ) @dlt.source() diff --git a/tests/load/qdrant/test_pipeline.py b/tests/load/qdrant/test_pipeline.py index fcc8fcbd71..d50b50282a 100644 --- a/tests/load/qdrant/test_pipeline.py +++ b/tests/load/qdrant/test_pipeline.py @@ -301,7 +301,7 @@ def some_data(): def test_merge_github_nested() -> None: - p = dlt.pipeline(destination="qdrant", dataset_name="github1", full_refresh=True) + p = dlt.pipeline(destination="qdrant", dataset_name="github1", dev_mode=True) assert p.dataset_name.startswith("github1_202") with open( @@ -347,7 +347,7 @@ def test_merge_github_nested() -> None: def test_empty_dataset_allowed() -> None: # dataset_name is optional so dataset name won't be autogenerated when not explicitly passed - p = dlt.pipeline(destination="qdrant", full_refresh=True) + p = dlt.pipeline(destination="qdrant", dev_mode=True) client: QdrantClient = p.destination_client() # type: ignore[assignment] assert p.dataset_name is None diff --git a/tests/load/synapse/test_synapse_table_indexing.py b/tests/load/synapse/test_synapse_table_indexing.py index c9ecba17a1..a9d426ad4a 100644 --- a/tests/load/synapse/test_synapse_table_indexing.py +++ b/tests/load/synapse/test_synapse_table_indexing.py @@ -52,7 +52,7 @@ def items_without_table_index_type_specified() -> Iterator[Any]: pipeline_name=f"test_default_table_index_type_{table_index_type}", destination="synapse", dataset_name=f"test_default_table_index_type_{table_index_type}", - full_refresh=True, + dev_mode=True, ) job_client = pipeline.destination_client() @@ -118,7 +118,7 @@ def items_with_table_index_type_specified() -> Iterator[Any]: pipeline_name=f"test_table_index_type_{table_index_type}", destination="synapse", dataset_name=f"test_table_index_type_{table_index_type}", - full_refresh=True, + dev_mode=True, ) # An invalid value for `table_index_type` should raise a ValueError. diff --git a/tests/load/test_job_client.py b/tests/load/test_job_client.py index 08b80af928..7e360a6664 100644 --- a/tests/load/test_job_client.py +++ b/tests/load/test_job_client.py @@ -327,16 +327,18 @@ def test_drop_tables(client: SqlJobClientBase) -> None: del schema.tables[tbl] schema._bump_version() client.drop_tables(*tables_to_drop) + client._update_schema_in_storage(schema) # Schema was deleted, load it in again if isinstance(client, WithStagingDataset): with contextlib.suppress(DatabaseUndefinedRelation): with client.with_staging_dataset(): - client.drop_tables(*tables_to_drop, replace_schema=False) + client.drop_tables(*tables_to_drop, delete_schema=False) # drop again - should not break anything client.drop_tables(*tables_to_drop) + client._update_schema_in_storage(schema) if isinstance(client, WithStagingDataset): with contextlib.suppress(DatabaseUndefinedRelation): with client.with_staging_dataset(): - client.drop_tables(*tables_to_drop, replace_schema=False) + client.drop_tables(*tables_to_drop, delete_schema=False) # Verify requested tables are dropped for tbl in tables_to_drop: diff --git a/tests/load/utils.py b/tests/load/utils.py index 445f8d815b..5a999dc1b7 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -153,7 +153,7 @@ def setup(self) -> None: os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" def setup_pipeline( - self, pipeline_name: str, dataset_name: str = None, full_refresh: bool = False, **kwargs + self, pipeline_name: str, dataset_name: str = None, dev_mode: bool = False, **kwargs ) -> dlt.Pipeline: """Convenience method to setup pipeline with this configuration""" self.setup() @@ -162,7 +162,7 @@ def setup_pipeline( destination=self.destination, staging=self.staging, dataset_name=dataset_name or pipeline_name, - full_refresh=full_refresh, + dev_mode=dev_mode, **kwargs, ) return pipeline diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index fb089ad174..ee42ab59d8 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -303,7 +303,7 @@ def some_data(): def test_merge_github_nested() -> None: - p = dlt.pipeline(destination="weaviate", dataset_name="github1", full_refresh=True) + p = dlt.pipeline(destination="weaviate", dataset_name="github1", dev_mode=True) assert p.dataset_name.startswith("github1_202") with open( @@ -352,7 +352,7 @@ def test_merge_github_nested() -> None: def test_empty_dataset_allowed() -> None: # weaviate dataset_name is optional so dataset name won't be autogenerated when not explicitly passed - p = dlt.pipeline(destination="weaviate", full_refresh=True) + p = dlt.pipeline(destination="weaviate", dev_mode=True) # check if we use localhost client: WeaviateClient = p.destination_client() # type: ignore[assignment] if "localhost" not in client.config.credentials.url: diff --git a/tests/pipeline/cases/github_pipeline/github_extract.py b/tests/pipeline/cases/github_pipeline/github_extract.py index 6be6643947..c9ed672fad 100644 --- a/tests/pipeline/cases/github_pipeline/github_extract.py +++ b/tests/pipeline/cases/github_pipeline/github_extract.py @@ -5,9 +5,7 @@ from github_pipeline import github # type: ignore[import-not-found] if __name__ == "__main__": - p = dlt.pipeline( - "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", full_refresh=False - ) + p = dlt.pipeline("dlt_github_pipeline", destination="duckdb", dataset_name="github_3") github_source = github() if len(sys.argv) > 1: # load only N issues diff --git a/tests/pipeline/cases/github_pipeline/github_pipeline.py b/tests/pipeline/cases/github_pipeline/github_pipeline.py index c55bd02ba0..aa0f6d0e0e 100644 --- a/tests/pipeline/cases/github_pipeline/github_pipeline.py +++ b/tests/pipeline/cases/github_pipeline/github_pipeline.py @@ -33,9 +33,7 @@ def load_issues( if __name__ == "__main__": - p = dlt.pipeline( - "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", full_refresh=False - ) + p = dlt.pipeline("dlt_github_pipeline", destination="duckdb", dataset_name="github_3") github_source = github() if len(sys.argv) > 1: # load only N issues diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 1c4383405b..fd5099af9b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -94,15 +94,15 @@ def test_default_pipeline_dataset() -> None: assert p.dataset_name in possible_dataset_names -def test_run_full_refresh_default_dataset() -> None: - p = dlt.pipeline(full_refresh=True, destination="filesystem") +def test_run_dev_mode_default_dataset() -> None: + p = dlt.pipeline(dev_mode=True, destination="filesystem") assert p.dataset_name.endswith(p._pipeline_instance_id) # restore this pipeline - r_p = dlt.attach(full_refresh=False) + r_p = dlt.attach(dev_mode=False) assert r_p.dataset_name.endswith(p._pipeline_instance_id) # dummy does not need dataset - p = dlt.pipeline(full_refresh=True, destination="dummy") + p = dlt.pipeline(dev_mode=True, destination="dummy") assert p.dataset_name is None # simulate set new dataset p._set_destinations("filesystem") @@ -112,11 +112,11 @@ def test_run_full_refresh_default_dataset() -> None: assert p.dataset_name and p.dataset_name.endswith(p._pipeline_instance_id) -def test_run_full_refresh_underscored_dataset() -> None: - p = dlt.pipeline(full_refresh=True, dataset_name="_main_") +def test_run_dev_mode_underscored_dataset() -> None: + p = dlt.pipeline(dev_mode=True, dataset_name="_main_") assert p.dataset_name.endswith(p._pipeline_instance_id) # restore this pipeline - r_p = dlt.attach(full_refresh=False) + r_p = dlt.attach(dev_mode=False) assert r_p.dataset_name.endswith(p._pipeline_instance_id) @@ -895,7 +895,7 @@ def test_extract_all_data_types() -> None: def test_set_get_local_value() -> None: - p = dlt.pipeline(destination="dummy", full_refresh=True) + p = dlt.pipeline(destination="dummy", dev_mode=True) value = uniq_id() # value is set p.set_local_state_val(value, value) @@ -1862,8 +1862,8 @@ def _run_pipeline(pipeline, gen_) -> LoadInfo: return pipeline.run(gen_()) # declare pipelines in main thread then run them "async" - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) + pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", dev_mode=True) async def _run_async(): loop = asyncio.get_running_loop() @@ -1912,7 +1912,7 @@ def api_fetch(page_num): else: return [] - pipeline = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) load_info = pipeline.run(product()) assert_load_info(load_info) assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 5a85c06462..542d0209d6 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -30,7 +30,7 @@ async def __anext__(self): # return the counter value return {"i": self.counter} - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(AsyncIterator, table_name="async") with pipeline_1.sql_client() as c: with c.execute_query("SELECT * FROM async") as cur: @@ -53,7 +53,7 @@ async def async_gen_resource(): await asyncio.sleep(0.1) yield {"letter": l_} - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) # pure async function pipeline_1.run(async_gen_table(), table_name="async") @@ -81,7 +81,7 @@ async def _gen(idx): for idx_ in range(3): yield _gen(idx_) - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(async_inner_table(), table_name="async") with pipeline_1.sql_client() as c: with c.execute_query("SELECT * FROM async") as cur: @@ -114,7 +114,7 @@ async def async_transformer(item): "letter": item["letter"] + "t", } - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(async_transformer(), table_name="async") with pipeline_1.sql_client() as c: @@ -174,7 +174,7 @@ def source(): elif resource_mode == "second_async": return [sync_resource1(), async_resource2()] - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(source()) with pipeline_1.sql_client() as c: @@ -243,7 +243,7 @@ def resource2(): def source(): return [resource1(), resource2()] - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(source()) # all records should be here diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index 7eafb1ea24..4958299368 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -179,7 +179,7 @@ def get_pipeline(): pipeline_name=uniq_id(), destination="duckdb", credentials=duckdb.connect(":memory:"), - full_refresh=True, + dev_mode=True, ) diff --git a/tests/pipeline/test_schema_updates.py b/tests/pipeline/test_schema_updates.py index be397f796c..311bd55b28 100644 --- a/tests/pipeline/test_schema_updates.py +++ b/tests/pipeline/test_schema_updates.py @@ -5,7 +5,7 @@ def test_schema_updates() -> None: os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - p = dlt.pipeline(pipeline_name="test_schema_updates", full_refresh=True, destination="dummy") + p = dlt.pipeline(pipeline_name="test_schema_updates", dev_mode=True, destination="dummy") @dlt.source() def source(): diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 072a12782c..b4dae919f8 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -14,6 +14,7 @@ from dlt.destinations.fs_client import FSClientBase from dlt.pipeline.exceptions import SqlClientNotAvailable from dlt.common.storages import FileStorage +from dlt.destinations.exceptions import DatabaseUndefinedRelation from tests.utils import TEST_STORAGE_ROOT @@ -172,12 +173,13 @@ def _load_tables_to_dicts_fs(p: dlt.Pipeline, *table_names: str) -> Dict[str, Li def _load_tables_to_dicts_sql( - p: dlt.Pipeline, *table_names: str + p: dlt.Pipeline, *table_names: str, schema_name: str = None ) -> Dict[str, List[Dict[str, Any]]]: result = {} + schema = p.default_schema if not schema_name else p.schemas[schema_name] for table_name in table_names: table_rows = [] - columns = p.default_schema.get_table_columns(table_name).keys() + columns = schema.get_table_columns(table_name).keys() query_columns = ",".join(map(p.sql_client().capabilities.escape_identifier, columns)) with p.sql_client() as c: @@ -191,9 +193,23 @@ def _load_tables_to_dicts_sql( return result -def load_tables_to_dicts(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, Any]]]: - func = _load_tables_to_dicts_fs if _is_filesystem(p) else _load_tables_to_dicts_sql - return func(p, *table_names) +def load_tables_to_dicts( + p: dlt.Pipeline, *table_names: str, schema_name: str = None +) -> Dict[str, List[Dict[str, Any]]]: + if _is_filesystem(p): + return _load_tables_to_dicts_fs(p, *table_names) + return _load_tables_to_dicts_sql(p, *table_names, schema_name=schema_name) + + +def assert_only_table_columns( + p: dlt.Pipeline, table_name: str, expected_columns: Sequence[str], schema_name: str = None +) -> None: + """Table has all and only the expected columns (excluding _dlt columns)""" + rows = load_tables_to_dicts(p, table_name, schema_name=schema_name)[table_name] + assert rows, f"Table {table_name} is empty" + # Ignore _dlt columns + columns = set(col for col in rows[0].keys() if not col.startswith("_dlt")) + assert columns == set(expected_columns) # @@ -244,6 +260,22 @@ def assert_data_table_counts(p: dlt.Pipeline, expected_counts: DictStrAny) -> No # +def table_exists(p: dlt.Pipeline, table_name: str, schema_name: str = None) -> bool: + """Returns True if table exists in the destination database/filesystem""" + if _is_filesystem(p): + client = p._fs_client(schema_name=schema_name) + files = client.list_table_files(table_name) + return not not files + + with p.sql_client(schema_name=schema_name) as c: + try: + qual_table_name = c.make_qualified_table_name(table_name) + c.execute_sql(f"SELECT 1 FROM {qual_table_name} LIMIT 1") + return True + except DatabaseUndefinedRelation: + return False + + def _assert_table_sql( p: dlt.Pipeline, table_name: str,