diff --git a/tests/cli/cases/deploy_pipeline/debug_pipeline.py b/tests/cli/cases/deploy_pipeline/debug_pipeline.py index c49e8b524d..1f5bfad976 100644 --- a/tests/cli/cases/deploy_pipeline/debug_pipeline.py +++ b/tests/cli/cases/deploy_pipeline/debug_pipeline.py @@ -17,7 +17,7 @@ def example_source(api_url=dlt.config.value, api_key=dlt.secrets.value, last_id= pipeline_name="debug_pipeline", destination="postgres", dataset_name="debug_pipeline_data", - full_refresh=False, + dev_mode=False, ) load_info = p.run(example_source(last_id=819273998)) print(load_info) diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 5895c3b658..4f9600f29a 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -293,7 +293,7 @@ def some_data(param: str): # create two resource instances and extract in single ad hoc resource data1 = some_data("state1") data1._pipe.name = "state1_data" - dlt.pipeline(full_refresh=True).extract([data1, some_data("state2")], schema=Schema("default")) + dlt.pipeline(dev_mode=True).extract([data1, some_data("state2")], schema=Schema("default")) # both should be extracted. what we test here is the combination of binding the resource by calling it that clones the internal pipe # and then creating a source with both clones. if we keep same pipe id when cloning on call, a single pipe would be created shared by two resources assert all_yields == ["state1", "state2"] @@ -735,7 +735,7 @@ def test_source(no_resources): def test_source_resource_attrs_with_conflicting_attrs() -> None: """Resource names that conflict with DltSource attributes do not work with attribute access""" - dlt.pipeline(full_refresh=True) # Create pipeline so state property can be accessed + dlt.pipeline(dev_mode=True) # Create pipeline so state property can be accessed names = ["state", "resources", "schema", "name", "clone"] @dlt.source @@ -839,7 +839,7 @@ def test_source(expected_state): with pytest.raises(PipelineStateNotAvailable): test_source({}).state - dlt.pipeline(full_refresh=True) + dlt.pipeline(dev_mode=True) assert test_source({}).state == {} # inject state to see if what we write in state is there @@ -869,7 +869,7 @@ def test_source(): with pytest.raises(PipelineStateNotAvailable): s.test_resource.state - p = dlt.pipeline(full_refresh=True) + p = dlt.pipeline(dev_mode=True) assert r.state == {} assert s.state == {} assert s.test_resource.state == {} diff --git a/tests/helpers/dbt_tests/local/test_runner_destinations.py b/tests/helpers/dbt_tests/local/test_runner_destinations.py index c9e4b7c83b..244f06e9ce 100644 --- a/tests/helpers/dbt_tests/local/test_runner_destinations.py +++ b/tests/helpers/dbt_tests/local/test_runner_destinations.py @@ -99,7 +99,7 @@ def test_dbt_test_no_raw_schema(destination_info: DBTDestinationInfo) -> None: assert isinstance(prq_ex.value.args[0], DBTProcessingError) -def test_dbt_run_full_refresh(destination_info: DBTDestinationInfo) -> None: +def test_dbt_run_dev_mode(destination_info: DBTDestinationInfo) -> None: if destination_info.destination_name == "redshift": pytest.skip("redshift disabled due to missing fixtures") runner = setup_rasa_runner(destination_info.destination_name) diff --git a/tests/load/athena_iceberg/test_athena_iceberg.py b/tests/load/athena_iceberg/test_athena_iceberg.py index 6804b98427..41711162d6 100644 --- a/tests/load/athena_iceberg/test_athena_iceberg.py +++ b/tests/load/athena_iceberg/test_athena_iceberg.py @@ -30,7 +30,7 @@ def test_iceberg() -> None: pipeline_name="athena-iceberg", destination="athena", staging="filesystem", - full_refresh=True, + dev_mode=True, ) def items() -> Iterator[Any]: diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index a223de9b26..5fcf73d9a5 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -196,7 +196,7 @@ def test_create_table_with_integer_partition(gcp_client: BigQueryClient) -> None ids=lambda x: x.name, ) def test_bigquery_partition_by_date(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -231,7 +231,7 @@ def demo_source() -> DltResource: ids=lambda x: x.name, ) def test_bigquery_no_partition_by_date(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -266,7 +266,7 @@ def demo_source() -> DltResource: ids=lambda x: x.name, ) def test_bigquery_partition_by_timestamp(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -305,7 +305,7 @@ def demo_source() -> DltResource: def test_bigquery_no_partition_by_timestamp( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( write_disposition="merge", @@ -342,7 +342,7 @@ def demo_source() -> DltResource: ids=lambda x: x.name, ) def test_bigquery_partition_by_integer(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( columns={"some_int": {"data_type": "bigint", "partition": True, "nullable": False}}, @@ -375,7 +375,7 @@ def demo_source() -> DltResource: ids=lambda x: x.name, ) def test_bigquery_no_partition_by_integer(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True) @dlt.resource( columns={"some_int": {"data_type": "bigint", "partition": False, "nullable": False}}, @@ -463,7 +463,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -523,7 +523,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -583,7 +583,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -708,7 +708,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -756,7 +756,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -844,7 +844,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -894,7 +894,7 @@ def sources() -> List[DltResource]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(sources()) @@ -937,7 +937,7 @@ def hints() -> Iterator[Dict[str, Any]]: pipeline = destination_config.setup_pipeline( f"bigquery_{uniq_id()}", - full_refresh=True, + dev_mode=True, ) pipeline.run(hints) diff --git a/tests/load/pipeline/test_athena.py b/tests/load/pipeline/test_athena.py index 9c17be318f..d3f18def44 100644 --- a/tests/load/pipeline/test_athena.py +++ b/tests/load/pipeline/test_athena.py @@ -18,7 +18,7 @@ ids=lambda x: x.name, ) def test_athena_destinations(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) @dlt.resource(name="items", write_disposition="append") def items(): @@ -76,7 +76,7 @@ def items2(): def test_athena_all_datatypes_and_timestamps( destination_config: DestinationTestConfiguration, ) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) # TIME is not supported column_schemas, data_types = table_update_and_row(exclude_types=["time"]) @@ -164,7 +164,7 @@ def my_source() -> Any: ids=lambda x: x.name, ) def test_athena_blocks_time_column(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) column_schemas, data_types = table_update_and_row() diff --git a/tests/load/pipeline/test_dbt_helper.py b/tests/load/pipeline/test_dbt_helper.py index 91318d0f34..7dbe14c478 100644 --- a/tests/load/pipeline/test_dbt_helper.py +++ b/tests/load/pipeline/test_dbt_helper.py @@ -39,7 +39,7 @@ def test_run_jaffle_package( pytest.skip( "dbt-athena requires database to be created and we don't do it in case of Jaffle" ) - pipeline = destination_config.setup_pipeline("jaffle_jaffle", full_refresh=True) + pipeline = destination_config.setup_pipeline("jaffle_jaffle", dev_mode=True) # get runner, pass the env from fixture dbt = dlt.dbt.package(pipeline, "https://github.com/dbt-labs/jaffle_shop.git", venv=dbt_venv) # no default schema @@ -76,7 +76,7 @@ def test_run_chess_dbt(destination_config: DestinationTestConfiguration, dbt_ven os.environ["CHESS_URL"] = "https://api.chess.com/pub/" pipeline = destination_config.setup_pipeline( - "chess_games", dataset_name="chess_dbt_test", full_refresh=True + "chess_games", dataset_name="chess_dbt_test", dev_mode=True ) assert pipeline.default_schema_name is None # get the runner for the "dbt_transform" package @@ -129,7 +129,7 @@ def test_run_chess_dbt_to_other_dataset( os.environ["CHESS_URL"] = "https://api.chess.com/pub/" pipeline = destination_config.setup_pipeline( - "chess_games", dataset_name="chess_dbt_test", full_refresh=True + "chess_games", dataset_name="chess_dbt_test", dev_mode=True ) # load each schema in separate dataset pipeline.config.use_single_dataset = False diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index cd18454d7c..788622658b 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -119,7 +119,7 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon """Test the drop command with resource and state path options and verify correct data is deleted from destination and locally""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -145,7 +145,7 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio """Test the drop command with resource and state path options and verify correct data is deleted from destination and locally""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -168,7 +168,7 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio def test_drop_destination_tables_fails(destination_config: DestinationTestConfiguration) -> None: """Fail on drop tables. Command runs again.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -194,7 +194,7 @@ def test_drop_destination_tables_fails(destination_config: DestinationTestConfig def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration) -> None: """Fail directly after drop tables. Command runs again ignoring destination tables missing.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -218,7 +218,7 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration def test_load_step_fails(destination_config: DestinationTestConfiguration) -> None: """Test idempotence. pipeline.load() fails. Command can be run again successfully""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -240,7 +240,7 @@ def test_load_step_fails(destination_config: DestinationTestConfiguration) -> No ) def test_resource_regex(destination_config: DestinationTestConfiguration) -> None: source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -259,7 +259,7 @@ def test_resource_regex(destination_config: DestinationTestConfiguration) -> Non def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None: """No resources, no state keys. Nothing is changed.""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) attached = _attach(pipeline) @@ -277,7 +277,7 @@ def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None: def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None: """Using drop_all flag. Destination dataset and all local state is deleted""" source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(source) dlt_tables = [ t["name"] for t in pipeline.default_schema.dlt_tables() @@ -303,7 +303,7 @@ def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None ) def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources""" - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(droppable_source()) attached = _attach(pipeline) @@ -322,7 +322,7 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf ) def test_drop_state_only(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources""" - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) pipeline.run(droppable_source()) attached = _attach(pipeline) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 19ee9a34c8..0dee6e3849 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -30,7 +30,7 @@ "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_on_keys_in_schema(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("eth_2", full_refresh=True) + p = destination_config.setup_pipeline("eth_2", dev_mode=True) with open("tests/common/cases/schemas/eth/ethereum_schema_v5.yml", "r", encoding="utf-8") as f: schema = dlt.Schema.from_dict(yaml.safe_load(f)) @@ -96,7 +96,7 @@ def test_merge_on_keys_in_schema(destination_config: DestinationTestConfiguratio "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_on_ad_hoc_primary_key(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_1", full_refresh=True) + p = destination_config.setup_pipeline("github_1", dev_mode=True) with open( "tests/normalize/cases/github.issues.load_page_5_duck.json", "r", encoding="utf-8" @@ -159,7 +159,7 @@ def load_issues(): def test_merge_source_compound_keys_and_changes( destination_config: DestinationTestConfiguration, ) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) info = p.run(github(), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -208,7 +208,7 @@ def test_merge_source_compound_keys_and_changes( "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() assert github_data.max_table_nesting is None assert github_data.root_key is True @@ -241,7 +241,7 @@ def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # remove all keys github_data.load_issues.apply_hints(merge_key=(), primary_key=()) @@ -269,7 +269,7 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_merge_keys_non_existing_columns(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # set keys names that do not exist in the data github_data.load_issues.apply_hints(merge_key=("mA1", "Ma2"), primary_key=("123-x",)) @@ -308,7 +308,7 @@ def test_merge_keys_non_existing_columns(destination_config: DestinationTestConf ids=lambda x: x.name, ) def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) github_data = github() # generate some complex types github_data.max_table_nesting = 2 @@ -437,7 +437,7 @@ def _updated_event(node_id): ] # load to destination - p = destination_config.setup_pipeline("github_3", full_refresh=True) + p = destination_config.setup_pipeline("github_3", dev_mode=True) info = p.run( _get_shuffled_events(True) | github_resource, loader_file_format=destination_config.file_format, @@ -497,7 +497,7 @@ def _updated_event(node_id): "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_deduplicate_single_load(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource(write_disposition="merge", primary_key="id") def duplicates(): @@ -528,7 +528,7 @@ def duplicates_no_child(): "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_no_deduplicate_only_merge_key(destination_config: DestinationTestConfiguration) -> None: - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource(write_disposition="merge", merge_key="id") def duplicates(): @@ -565,7 +565,7 @@ def test_complex_column_missing(destination_config: DestinationTestConfiguration def r(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) data = [{"id": 1, "simple": "foo", "complex": [1, 2, 3]}] info = p.run(r(data), loader_file_format=destination_config.file_format) @@ -603,7 +603,7 @@ def data_resource(data): elif key_type == "merge_key": data_resource.apply_hints(primary_key="", merge_key="id") - p = destination_config.setup_pipeline(f"abstract_{key_type}", full_refresh=True) + p = destination_config.setup_pipeline(f"abstract_{key_type}", dev_mode=True) # insert two records data = [ @@ -744,7 +744,7 @@ def test_hard_delete_hint_config(destination_config: DestinationTestConfiguratio def data_resource(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) # insert two records data = [ @@ -802,7 +802,7 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No def data_resource(data): yield data - p = destination_config.setup_pipeline("abstract", full_refresh=True) + p = destination_config.setup_pipeline("abstract", dev_mode=True) # three records with same primary key data = [ diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a93599831d..a7715560db 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -209,7 +209,7 @@ def _data(): for d in data: yield d - p = destination_config.setup_pipeline("test_skip_sync_schema_for_tables", full_refresh=True) + p = destination_config.setup_pipeline("test_skip_sync_schema_for_tables", dev_mode=True) p.extract(_data) schema = p.default_schema assert "data_table" in schema.tables @@ -232,7 +232,7 @@ def _data(): destinations_configs(default_sql_configs=True, all_buckets_filesystem_configs=True), ids=lambda x: x.name, ) -def test_run_full_refresh(destination_config: DestinationTestConfiguration) -> None: +def test_run_dev_mode(destination_config: DestinationTestConfiguration) -> None: data = ["a", ["a", "b", "c"], ["a", "b", "c"]] destination_config.setup() @@ -243,7 +243,7 @@ def d(): def _data(): return dlt.resource(d(), name="lists", write_disposition="replace") - p = dlt.pipeline(full_refresh=True) + p = dlt.pipeline(dev_mode=True) info = p.run( _data(), destination=destination_config.destination, @@ -259,7 +259,7 @@ def _data(): # restore the pipeline p = dlt.attach() # restored pipeline should be never put in full refresh - assert p.full_refresh is False + assert p.dev_mode is False # assert parent table (easy), None First (db order) assert_table(p, "lists", [None, None, "a"], info=info) # child tables contain nested lists @@ -448,7 +448,7 @@ def test_dataset_name_change(destination_config: DestinationTestConfiguration) - ds_2_name = "IteRation" + uniq_id() # illegal name that will be later normalized ds_3_name = "1it/era 👍 tion__" + uniq_id() - p, s = simple_nested_pipeline(destination_config, dataset_name=ds_1_name, full_refresh=False) + p, s = simple_nested_pipeline(destination_config, dataset_name=ds_1_name, dev_mode=False) try: info = p.run(s(), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -581,7 +581,7 @@ def conflict(): # conflict deselected assert "conflict" not in discover_2.tables - p = dlt.pipeline(pipeline_name="multi", destination="duckdb", full_refresh=True) + p = dlt.pipeline(pipeline_name="multi", destination="duckdb", dev_mode=True) p.extract([source_1(), source_2()]) default_schema = p.default_schema gen1_table = default_schema.tables["gen1"] @@ -606,7 +606,7 @@ def conflict(): drop_active_pipeline_data() # same pipeline but enable conflict - p = dlt.pipeline(pipeline_name="multi", destination="duckdb", full_refresh=True) + p = dlt.pipeline(pipeline_name="multi", destination="duckdb", dev_mode=True) with pytest.raises(PipelineStepFailed) as py_ex: p.extract([source_1(), source_2().with_resources("conflict")]) assert isinstance(py_ex.value.__context__, CannotCoerceColumnException) @@ -881,7 +881,7 @@ def test_pipeline_upfront_tables_two_loads( pipeline = destination_config.setup_pipeline( "test_pipeline_upfront_tables_two_loads", dataset_name="test_pipeline_upfront_tables_two_loads", - full_refresh=True, + dev_mode=True, ) @dlt.source @@ -1016,7 +1016,7 @@ def table_3(make_data=False): # pipeline = destination_config.setup_pipeline( # "test_load_non_utc_timestamps", # dataset_name="test_load_non_utc_timestamps", -# full_refresh=True, +# dev_mode=True, # ) # info = pipeline.run(some_data()) # # print(pipeline.default_schema.to_pretty_yaml()) @@ -1026,7 +1026,7 @@ def table_3(make_data=False): def simple_nested_pipeline( - destination_config: DestinationTestConfiguration, dataset_name: str, full_refresh: bool + destination_config: DestinationTestConfiguration, dataset_name: str, dev_mode: bool ) -> Tuple[dlt.Pipeline, Callable[[], DltSource]]: data = ["a", ["a", "b", "c"], ["a", "b", "c"]] @@ -1039,7 +1039,7 @@ def _data(): p = dlt.pipeline( pipeline_name=f"pipeline_{dataset_name}", - full_refresh=full_refresh, + dev_mode=dev_mode, destination=destination_config.destination, staging=destination_config.staging, dataset_name=dataset_name, diff --git a/tests/load/pipeline/test_redshift.py b/tests/load/pipeline/test_redshift.py index 44234ec64b..580129c759 100644 --- a/tests/load/pipeline/test_redshift.py +++ b/tests/load/pipeline/test_redshift.py @@ -15,7 +15,7 @@ ids=lambda x: x.name, ) def test_redshift_blocks_time_column(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), full_refresh=True) + pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True) column_schemas, data_types = table_update_and_row() diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index a69d4440dc..7e1680bfcc 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -267,7 +267,7 @@ def test_replace_table_clearing( os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy pipeline = destination_config.setup_pipeline( - "test_replace_table_clearing", dataset_name="test_replace_table_clearing", full_refresh=True + "test_replace_table_clearing", dataset_name="test_replace_table_clearing", dev_mode=True ) @dlt.resource(name="main_resource", write_disposition="replace", primary_key="id") diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index 5ef2206031..5f2d5af2de 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -340,7 +340,7 @@ def some_data(): # full refresh will not restore pipeline even if requested p._wipe_working_folder() p = destination_config.setup_pipeline( - pipeline_name=pipeline_name, dataset_name=dataset_name, full_refresh=True + pipeline_name=pipeline_name, dataset_name=dataset_name, dev_mode=True ) p.run(loader_file_format=destination_config.file_format) assert p.default_schema_name is None diff --git a/tests/load/pipeline/test_write_disposition_changes.py b/tests/load/pipeline/test_write_disposition_changes.py index 164243cd55..0711e98f7d 100644 --- a/tests/load/pipeline/test_write_disposition_changes.py +++ b/tests/load/pipeline/test_write_disposition_changes.py @@ -24,7 +24,7 @@ def data_with_subtables(offset: int) -> Any: ) def test_switch_from_merge(destination_config: DestinationTestConfiguration): pipeline = destination_config.setup_pipeline( - pipeline_name="test_switch_from_merge", full_refresh=True + pipeline_name="test_switch_from_merge", dev_mode=True ) info = pipeline.run( @@ -93,7 +93,7 @@ def test_switch_from_merge(destination_config: DestinationTestConfiguration): @pytest.mark.parametrize("with_root_key", [True, False]) def test_switch_to_merge(destination_config: DestinationTestConfiguration, with_root_key: bool): pipeline = destination_config.setup_pipeline( - pipeline_name="test_switch_to_merge", full_refresh=True + pipeline_name="test_switch_to_merge", dev_mode=True ) @dlt.resource() diff --git a/tests/load/qdrant/test_pipeline.py b/tests/load/qdrant/test_pipeline.py index 4c1361dcca..ca2f3d2e84 100644 --- a/tests/load/qdrant/test_pipeline.py +++ b/tests/load/qdrant/test_pipeline.py @@ -298,7 +298,7 @@ def some_data(): def test_merge_github_nested() -> None: - p = dlt.pipeline(destination="qdrant", dataset_name="github1", full_refresh=True) + p = dlt.pipeline(destination="qdrant", dataset_name="github1", dev_mode=True) assert p.dataset_name.startswith("github1_202") with open( @@ -344,7 +344,7 @@ def test_merge_github_nested() -> None: def test_empty_dataset_allowed() -> None: # dataset_name is optional so dataset name won't be autogenerated when not explicitly passed - p = dlt.pipeline(destination="qdrant", full_refresh=True) + p = dlt.pipeline(destination="qdrant", dev_mode=True) client: QdrantClient = p.destination_client() # type: ignore[assignment] assert p.dataset_name is None diff --git a/tests/load/synapse/test_synapse_table_indexing.py b/tests/load/synapse/test_synapse_table_indexing.py index df90933de4..1f119c33d9 100644 --- a/tests/load/synapse/test_synapse_table_indexing.py +++ b/tests/load/synapse/test_synapse_table_indexing.py @@ -50,7 +50,7 @@ def items_without_table_index_type_specified() -> Iterator[Any]: pipeline_name=f"test_default_table_index_type_{table_index_type}", destination="synapse", dataset_name=f"test_default_table_index_type_{table_index_type}", - full_refresh=True, + dev_mode=True, ) job_client = pipeline.destination_client() @@ -123,7 +123,7 @@ def items_with_table_index_type_specified() -> Iterator[Any]: pipeline_name=f"test_table_index_type_{table_index_type}", destination="synapse", dataset_name=f"test_table_index_type_{table_index_type}", - full_refresh=True, + dev_mode=True, ) # An invalid value for `table_index_type` should raise a ValueError. diff --git a/tests/load/utils.py b/tests/load/utils.py index 7b4cf72b47..9b64bd2b8e 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -135,7 +135,7 @@ def setup(self) -> None: os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" def setup_pipeline( - self, pipeline_name: str, dataset_name: str = None, full_refresh: bool = False, **kwargs + self, pipeline_name: str, dataset_name: str = None, dev_mode: bool = False, **kwargs ) -> dlt.Pipeline: """Convenience method to setup pipeline with this configuration""" self.setup() @@ -144,7 +144,7 @@ def setup_pipeline( destination=self.destination, staging=self.staging, dataset_name=dataset_name or pipeline_name, - full_refresh=full_refresh, + dev_mode=dev_mode, **kwargs, ) return pipeline diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index dc23644940..11ad07c969 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -300,7 +300,7 @@ def some_data(): def test_merge_github_nested() -> None: - p = dlt.pipeline(destination="weaviate", dataset_name="github1", full_refresh=True) + p = dlt.pipeline(destination="weaviate", dataset_name="github1", dev_mode=True) assert p.dataset_name.startswith("github1_202") with open( @@ -349,7 +349,7 @@ def test_merge_github_nested() -> None: def test_empty_dataset_allowed() -> None: # weaviate dataset_name is optional so dataset name won't be autogenerated when not explicitly passed - p = dlt.pipeline(destination="weaviate", full_refresh=True) + p = dlt.pipeline(destination="weaviate", dev_mode=True) # check if we use localhost client: WeaviateClient = p.destination_client() # type: ignore[assignment] if "localhost" not in client.config.credentials.url: diff --git a/tests/pipeline/cases/github_pipeline/github_extract.py b/tests/pipeline/cases/github_pipeline/github_extract.py index 6be6643947..d796080988 100644 --- a/tests/pipeline/cases/github_pipeline/github_extract.py +++ b/tests/pipeline/cases/github_pipeline/github_extract.py @@ -6,7 +6,7 @@ if __name__ == "__main__": p = dlt.pipeline( - "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", full_refresh=False + "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", dev_mode=False ) github_source = github() if len(sys.argv) > 1: diff --git a/tests/pipeline/cases/github_pipeline/github_pipeline.py b/tests/pipeline/cases/github_pipeline/github_pipeline.py index c55bd02ba0..f38440d296 100644 --- a/tests/pipeline/cases/github_pipeline/github_pipeline.py +++ b/tests/pipeline/cases/github_pipeline/github_pipeline.py @@ -34,7 +34,7 @@ def load_issues( if __name__ == "__main__": p = dlt.pipeline( - "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", full_refresh=False + "dlt_github_pipeline", destination="duckdb", dataset_name="github_3", dev_mode=False ) github_source = github() if len(sys.argv) > 1: diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 0cebeb2ff7..13352ce989 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -87,15 +87,15 @@ def test_default_pipeline_dataset() -> None: assert p.dataset_name in possible_dataset_names -def test_run_full_refresh_default_dataset() -> None: - p = dlt.pipeline(full_refresh=True, destination="filesystem") +def test_run_dev_mode_default_dataset() -> None: + p = dlt.pipeline(dev_mode=True, destination="filesystem") assert p.dataset_name.endswith(p._pipeline_instance_id) # restore this pipeline - r_p = dlt.attach(full_refresh=False) + r_p = dlt.attach(dev_mode=False) assert r_p.dataset_name.endswith(p._pipeline_instance_id) # dummy does not need dataset - p = dlt.pipeline(full_refresh=True, destination="dummy") + p = dlt.pipeline(dev_mode=True, destination="dummy") assert p.dataset_name is None # simulate set new dataset p._set_destinations("filesystem") @@ -105,11 +105,11 @@ def test_run_full_refresh_default_dataset() -> None: assert p.dataset_name and p.dataset_name.endswith(p._pipeline_instance_id) -def test_run_full_refresh_underscored_dataset() -> None: - p = dlt.pipeline(full_refresh=True, dataset_name="_main_") +def test_run_dev_mode_underscored_dataset() -> None: + p = dlt.pipeline(dev_mode=True, dataset_name="_main_") assert p.dataset_name.endswith(p._pipeline_instance_id) # restore this pipeline - r_p = dlt.attach(full_refresh=False) + r_p = dlt.attach(dev_mode=False) assert r_p.dataset_name.endswith(p._pipeline_instance_id) @@ -782,7 +782,7 @@ def test_extract_all_data_types() -> None: def test_set_get_local_value() -> None: - p = dlt.pipeline(destination="dummy", full_refresh=True) + p = dlt.pipeline(destination="dummy", dev_mode=True) value = uniq_id() # value is set p.set_local_state_val(value, value) @@ -1608,8 +1608,8 @@ def _run_pipeline(pipeline, gen_) -> LoadInfo: return pipeline.run(gen_()) # declare pipelines in main thread then run them "async" - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) + pipeline_2 = dlt.pipeline("pipeline_2", destination="duckdb", dev_mode=True) async def _run_async(): loop = asyncio.get_running_loop() @@ -1658,7 +1658,7 @@ def api_fetch(page_num): else: return [] - pipeline = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) load_info = pipeline.run(product()) assert_load_info(load_info) assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 5a85c06462..542d0209d6 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -30,7 +30,7 @@ async def __anext__(self): # return the counter value return {"i": self.counter} - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(AsyncIterator, table_name="async") with pipeline_1.sql_client() as c: with c.execute_query("SELECT * FROM async") as cur: @@ -53,7 +53,7 @@ async def async_gen_resource(): await asyncio.sleep(0.1) yield {"letter": l_} - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) # pure async function pipeline_1.run(async_gen_table(), table_name="async") @@ -81,7 +81,7 @@ async def _gen(idx): for idx_ in range(3): yield _gen(idx_) - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(async_inner_table(), table_name="async") with pipeline_1.sql_client() as c: with c.execute_query("SELECT * FROM async") as cur: @@ -114,7 +114,7 @@ async def async_transformer(item): "letter": item["letter"] + "t", } - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(async_transformer(), table_name="async") with pipeline_1.sql_client() as c: @@ -174,7 +174,7 @@ def source(): elif resource_mode == "second_async": return [sync_resource1(), async_resource2()] - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(source()) with pipeline_1.sql_client() as c: @@ -243,7 +243,7 @@ def resource2(): def source(): return [resource1(), resource2()] - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", dev_mode=True) pipeline_1.run(source()) # all records should be here diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index 2f2e6b6932..bc5fc2c635 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -143,7 +143,7 @@ def get_pipeline(): pipeline_name=uniq_id(), destination="duckdb", credentials=duckdb.connect(":memory:"), - full_refresh=True, + dev_mode=True, ) diff --git a/tests/pipeline/test_schema_updates.py b/tests/pipeline/test_schema_updates.py index be397f796c..311bd55b28 100644 --- a/tests/pipeline/test_schema_updates.py +++ b/tests/pipeline/test_schema_updates.py @@ -5,7 +5,7 @@ def test_schema_updates() -> None: os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately - p = dlt.pipeline(pipeline_name="test_schema_updates", full_refresh=True, destination="dummy") + p = dlt.pipeline(pipeline_name="test_schema_updates", dev_mode=True, destination="dummy") @dlt.source() def source():