From b2b732b927d3be67a72c809b94bf3566ad39f30d Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Wed, 31 Jan 2024 12:54:42 -0500 Subject: [PATCH] deprecate asset_partition_*_for_output on AssetExecutionContext (#19436) --- .../_core/execution/context/compute.py | 64 +++++++++++-------- .../_core/execution/context/invocation.py | 4 ++ .../test_asset_partition_mappings.py | 38 +++++------ .../asset_defs_tests/test_materialize.py | 5 +- .../test_materialize_to_memory.py | 4 +- .../test_partitioned_assets.py | 32 +++++----- .../test_asset_execution_context.py | 5 -- .../core_tests/test_op_invocation.py | 19 +++--- .../test_dynamic_partitions.py | 9 +-- .../test_multi_partitions.py | 16 +++-- .../dagster_dbt/asset_decorator.py | 4 +- .../test_type_handler.py | 19 +++--- .../test_type_handler.py | 13 ++-- .../test_type_handler.py | 21 +++--- .../test_type_handler.py | 19 +++--- .../test_type_handler.py | 19 +++--- .../test_type_handler.py | 19 +++--- .../bigquery/test_type_handler.py | 19 +++--- .../bigquery/test_type_handler.py | 19 +++--- .../test_snowflake_pandas_type_handler.py | 19 +++--- .../test_snowflake_pyspark_type_handler.py | 19 +++--- 21 files changed, 208 insertions(+), 178 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index fb859cb42cee5..7ef14e7ed43c1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1364,6 +1364,11 @@ def _copy_docs_from_op_execution_context(obj): "run_config": "run.run_config", "run_tags": "run.tags", "get_op_execution_context": "op_execution_context", + "asset_partition_key_for_output": "partition_key", + "asset_partitions_time_window_for_output": "partition_time_window", + "asset_partition_key_range_for_output": "partition_key_range", + "asset_partitions_def_for_output": "assets_def.partitions_def", + "asset_partition_keys_for_output": "partition_keys", } ALTERNATE_EXPRESSIONS = { @@ -1501,6 +1506,38 @@ def get_tag(self, key: str) -> Optional[str]: def get_op_execution_context(self) -> "OpExecutionContext": return self.op_execution_context + @deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output")) + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_for_output(self, output_name: str = "result") -> str: + return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output")) + @public + @_copy_docs_from_op_execution_context + def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_output(output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output")) + @public + @_copy_docs_from_op_execution_context + def asset_partition_key_range_for_output( + self, output_name: str = "result" + ) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_output(output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output")) + @public + @_copy_docs_from_op_execution_context + def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output")) + @public + @_copy_docs_from_op_execution_context + def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) + ########## pass-through to op context #### op related @@ -1657,23 +1694,6 @@ def partition_key_range(self) -> PartitionKeyRange: def partition_time_window(self) -> TimeWindow: return self.op_execution_context.partition_time_window - @public - @_copy_docs_from_op_execution_context - def asset_partition_key_for_output(self, output_name: str = "result") -> str: - return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) - - @public - @_copy_docs_from_op_execution_context - def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: - return self.op_execution_context.asset_partitions_time_window_for_output(output_name) - - @public - @_copy_docs_from_op_execution_context - def asset_partition_key_range_for_output( - self, output_name: str = "result" - ) -> PartitionKeyRange: - return self.op_execution_context.asset_partition_key_range_for_output(output_name) - @public @_copy_docs_from_op_execution_context def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: @@ -1684,21 +1704,11 @@ def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRa def asset_partition_key_for_input(self, input_name: str) -> str: return self.op_execution_context.asset_partition_key_for_input(input_name) - @public - @_copy_docs_from_op_execution_context - def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: - return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) - @public @_copy_docs_from_op_execution_context def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) - @public - @_copy_docs_from_op_execution_context - def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: - return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) - @public @_copy_docs_from_op_execution_context def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index ca3b80c1e630e..6962fe76caa76 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -646,6 +646,10 @@ def asset_partitions_time_window_for_output(self, output_name: str = "result") - Union[MultiPartitionsDefinition, TimeWindowPartitionsDefinition], partitions_def ).time_window_for_partition_key(self.partition_key) + @property + def partition_time_window(self) -> TimeWindow: + return self.asset_partitions_time_window_for_output() + def add_output_metadata( self, metadata: Mapping[str, Any], diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 7c1a07f8916a7..0a93730ca1001 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -105,15 +105,15 @@ def load_input(self, context): assert context.asset_partitions_def == upstream_partitions_def @asset(partitions_def=upstream_partitions_def) - def upstream_asset(context): - assert context.asset_partition_key_for_output() == "2" + def upstream_asset(context: AssetExecutionContext): + assert context.partition_key == "2" @asset( partitions_def=downstream_partitions_def, ins={"upstream_asset": AssetIn(partition_mapping=TrailingWindowPartitionMapping())}, ) - def downstream_asset(context, upstream_asset): - assert context.asset_partition_key_for_output() == "2" + def downstream_asset(context: AssetExecutionContext, upstream_asset): + assert context.partition_key == "2" assert upstream_asset is None assert context.asset_partitions_def_for_input("upstream_asset") == upstream_partitions_def @@ -341,9 +341,8 @@ def test_partition_keys_in_range(): ] @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-09-11")) - def upstream(context): - assert context.asset_partition_keys_for_output("result") == ["2022-09-11"] - assert context.asset_partition_keys_for_output() == ["2022-09-11"] + def upstream(context: AssetExecutionContext): + assert context.partition_keys == ["2022-09-11"] @asset(partitions_def=WeeklyPartitionsDefinition(start_date="2022-09-11")) def downstream(context, upstream): @@ -383,8 +382,8 @@ def test_dependency_resolution_partition_mapping(): partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"), key_prefix=["staging"], ) - def upstream(context): - partition_date_str = context.asset_partition_key_for_output() + def upstream(context: AssetExecutionContext): + partition_date_str = context.partition_key return partition_date_str @asset( @@ -441,11 +440,8 @@ def upstream(context): return 1 @asset(partitions_def=composite) - def downstream(context, upstream): - assert ( - context.asset_partition_keys_for_input("upstream") - == context.asset_partition_keys_for_output() - ) + def downstream(context: AssetExecutionContext, upstream): + assert context.asset_partition_keys_for_input("upstream") == context.partition_keys return 1 asset_graph = AssetGraph.from_assets([upstream, downstream]) @@ -471,16 +467,14 @@ def test_multipartitions_def_partition_mapping_infer_single_dim_to_multi(): ) @asset(partitions_def=abc_def) - def upstream(context): - assert context.asset_partition_keys_for_output("result") == ["a"] + def upstream(context: AssetExecutionContext): + assert context.partition_keys == ["a"] return 1 @asset(partitions_def=composite) - def downstream(context, upstream): + def downstream(context: AssetExecutionContext, upstream): assert context.asset_partition_keys_for_input("upstream") == ["a"] - assert context.asset_partition_keys_for_output("result") == [ - MultiPartitionKey({"abc": "a", "123": "1"}) - ] + assert context.partition_keys == [MultiPartitionKey({"abc": "a", "123": "1"})] return 1 asset_graph = AssetGraph.from_assets([upstream, downstream]) @@ -533,9 +527,9 @@ def upstream(context): return 1 @asset(partitions_def=abc_def) - def downstream(context, upstream): + def downstream(context: AssetExecutionContext, upstream): assert set(context.asset_partition_keys_for_input("upstream")) == a_multipartition_keys - assert context.asset_partition_keys_for_output("result") == ["a"] + assert context.partition_keys == ["a"] return 1 asset_graph = AssetGraph.from_assets([upstream, downstream]) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index 4005c3abc383c..d62817fc807c5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -5,6 +5,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetKey, AssetOut, AssetsDefinition, @@ -278,8 +279,8 @@ def the_asset(context): def test_materialize_partition_key(): @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")) - def the_asset(context): - assert context.asset_partition_key_for_output() == "2022-02-02" + def the_asset(context: AssetExecutionContext): + assert context.partition_key == "2022-02-02" with instance_for_test() as instance: result = materialize([the_asset], partition_key="2022-02-02", instance=instance) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py index dd3941b1afb78..955285658e52a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py @@ -244,8 +244,8 @@ def multi_asset_with_internal_deps(thing): def test_materialize_to_memory_partition_key(): @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")) - def the_asset(context): - assert context.asset_partition_key_for_output() == "2022-02-02" + def the_asset(context: AssetExecutionContext): + assert context.partition_key == "2022-02-02" result = materialize_to_memory([the_asset], partition_key="2022-02-02") assert result.success diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 1aca27d7b363b..925195bada409 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -5,6 +5,7 @@ import pendulum import pytest from dagster import ( + AssetExecutionContext, AssetMaterialization, AssetOut, AssetsDefinition, @@ -155,8 +156,8 @@ def load_input(self, context): assert False, "shouldn't get here" @asset(partitions_def=partitions_def) - def my_asset(context): - assert context.asset_partitions_def_for_output() == partitions_def + def my_asset(context: AssetExecutionContext): + assert context.assets_def.partitions_def == partitions_def my_job = build_assets_job( "my_job", @@ -212,24 +213,24 @@ def test_access_partition_keys_from_context_direct_invocation(): partitions_def = StaticPartitionsDefinition(["a"]) @asset(partitions_def=partitions_def) - def partitioned_asset(context): - assert context.asset_partition_key_for_output() == "a" + def partitioned_asset(context: AssetExecutionContext): + assert context.partition_key == "a" context = build_asset_context(partition_key="a") # check unbound context - assert context.asset_partition_key_for_output() == "a" + assert context.partition_key == "a" # check bound context partitioned_asset(context) # check failure for non-partitioned asset @asset - def non_partitioned_asset(context): + def non_partitioned_asset(context: AssetExecutionContext): with pytest.raises( CheckError, match="Tried to access partition_key for a non-partitioned run" ): - context.asset_partition_key_for_output() + _ = context.partition_key context = build_asset_context() non_partitioned_asset(context) @@ -257,8 +258,8 @@ def load_input(self, context): assert context.asset_partition_key_range == PartitionKeyRange("a", "c") @asset(partitions_def=upstream_partitions_def) - def upstream_asset(context): - assert context.asset_partition_key_for_output() == "b" + def upstream_asset(context: AssetExecutionContext): + assert context.partition_key == "b" @asset def downstream_asset(upstream_asset): @@ -606,7 +607,6 @@ def test_partition_range_single_run(): @asset(partitions_def=partitions_def) def upstream_asset(context) -> None: key_range = PartitionKeyRange(start="2020-01-01", end="2020-01-03") - assert context.asset_partition_key_range_for_output() == key_range assert context.partition_key_range == key_range assert context.partition_time_window == TimeWindow( partitions_def.time_window_for_partition_key(key_range.start).start, @@ -615,11 +615,11 @@ def upstream_asset(context) -> None: assert context.partition_keys == partitions_def.get_partition_keys_in_range(key_range) @asset(partitions_def=partitions_def, deps=["upstream_asset"]) - def downstream_asset(context) -> None: + def downstream_asset(context: AssetExecutionContext) -> None: assert context.asset_partition_key_range_for_input("upstream_asset") == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) - assert context.asset_partition_key_range_for_output() == PartitionKeyRange( + assert context.partition_key_range == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) @@ -653,17 +653,15 @@ def test_multipartition_range_single_run(): ) @asset(partitions_def=partitions_def) - def multipartitioned_asset(context) -> None: - key_range = context.asset_partition_key_range_for_output() + def multipartitioned_asset(context: AssetExecutionContext) -> None: + key_range = context.partition_key_range assert isinstance(key_range.start, MultiPartitionKey) assert isinstance(key_range.end, MultiPartitionKey) assert key_range.start == MultiPartitionKey({"date": "2020-01-01", "abc": "a"}) assert key_range.end == MultiPartitionKey({"date": "2020-01-03", "abc": "a"}) - assert all( - isinstance(key, MultiPartitionKey) for key in context.asset_partition_keys_for_output() - ) + assert all(isinstance(key, MultiPartitionKey) for key in context.partition_keys) the_job = define_asset_job("job").resolve( asset_graph=AssetGraph.from_assets([multipartitioned_asset]) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 860d9ee33b92f..0c0a11b3b71f9 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -56,16 +56,11 @@ def test_deprecation_warnings(): "asset_key_for_input", "asset_key_for_output", "asset_partition_key_for_input", - "asset_partition_key_for_output", "asset_partition_key_range", "asset_partition_key_range_for_input", - "asset_partition_key_range_for_output", "asset_partition_keys_for_input", - "asset_partition_keys_for_output", "asset_partitions_def_for_input", - "asset_partitions_def_for_output", "asset_partitions_time_window_for_input", - "asset_partitions_time_window_for_output", "assets_def", "get_output_metadata", "has_asset_checks_def", diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index 7ceb3bf2ea220..15fda7f695fc4 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1259,8 +1259,8 @@ def test_partitions_time_window_asset_invocation(): @asset( partitions_def=partitions_def, ) - def partitioned_asset(context): - start, end = context.asset_partitions_time_window_for_output() + def partitioned_asset(context: AssetExecutionContext): + start, end = context.partition_time_window assert start == pendulum.instance(datetime(2023, 2, 2), tz=partitions_def.timezone) assert end == pendulum.instance(datetime(2023, 2, 3), tz=partitions_def.timezone) @@ -1279,18 +1279,21 @@ def test_multipartitioned_time_window_asset_invocation(): ) @asset(partitions_def=partitions_def) - def my_asset(context): + def my_asset(context: AssetExecutionContext): + time_partition = get_time_partitions_def(partitions_def) + if time_partition is None: + assert False, "partitions def does not have a time component" time_window = TimeWindow( start=pendulum.instance( datetime(year=2020, month=1, day=1), - tz=get_time_partitions_def(partitions_def).timezone, + tz=time_partition.timezone, ), end=pendulum.instance( datetime(year=2020, month=1, day=2), - tz=get_time_partitions_def(partitions_def).timezone, + tz=time_partition.timezone, ), ) - assert context.asset_partitions_time_window_for_output() == time_window + assert context.partition_time_window == time_window return 1 context = build_asset_context( @@ -1306,12 +1309,12 @@ def my_asset(context): ) @asset(partitions_def=partitions_def) - def static_multipartitioned_asset(context): + def static_multipartitioned_asset(context: AssetExecutionContext): with pytest.raises( DagsterInvariantViolationError, match="with a single time dimension", ): - context.asset_partitions_time_window_for_output() + _ = context.partition_time_window context = build_asset_context( partition_key="a|a", diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_dynamic_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_dynamic_partitions.py index 5129b181b5648..23496dc5dbb87 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_dynamic_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_dynamic_partitions.py @@ -3,6 +3,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetKey, DagsterUnknownPartitionError, IOManager, @@ -147,14 +148,14 @@ def test_dynamic_partitions_mapping(): partitions_def = DynamicPartitionsDefinition(name="fruits") @asset(partitions_def=partitions_def) - def dynamic1(context): - assert context.asset_partition_key_for_output() == "apple" + def dynamic1(context: AssetExecutionContext): + assert context.partition_key == "apple" return 1 @asset(partitions_def=partitions_def) - def dynamic2(context, dynamic1): + def dynamic2(context: AssetExecutionContext, dynamic1): assert context.asset_partition_keys_for_input("dynamic1") == ["apple"] - assert context.asset_partition_key_for_output() == "apple" + assert context.partition_key == "apple" return 1 @asset diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py index 53f2cf0502fa4..eae136d2baa67 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py @@ -3,6 +3,7 @@ import pendulum import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DagsterEventType, @@ -517,19 +518,22 @@ def test_context_partition_time_window(): ) @asset(partitions_def=partitions_def) - def my_asset(context): + def my_asset(context: AssetExecutionContext): + time_partition = get_time_partitions_def(partitions_def) + if time_partition is None: + assert False, "expected a time component in the partitions definition" + time_window = TimeWindow( start=pendulum.instance( datetime(year=2020, month=1, day=1), - tz=get_time_partitions_def(partitions_def).timezone, + tz=time_partition.timezone, ), end=pendulum.instance( datetime(year=2020, month=1, day=2), - tz=get_time_partitions_def(partitions_def).timezone, + tz=time_partition.timezone, ), ) assert context.partition_time_window == time_window - assert context.asset_partitions_time_window_for_output() == time_window return 1 multipartitioned_job = define_asset_job( @@ -638,14 +642,14 @@ def upstream(context): assert isinstance(context.partition_key, MultiPartitionKey) @asset(partitions_def=partitions_def) - def downstream(context, upstream): + def downstream(context: AssetExecutionContext, upstream): assert isinstance(context.partition_key, MultiPartitionKey) input_range = context.asset_partition_key_range_for_input("upstream") assert isinstance(input_range.start, MultiPartitionKey) assert isinstance(input_range.end, MultiPartitionKey) - output = context.asset_partition_key_range_for_output("result") + output = context.partition_key_range assert isinstance(output.start, MultiPartitionKey) assert isinstance(output.end, MultiPartitionKey) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index 4f449f6ce1454..2caa4547bffc3 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -278,9 +278,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: M partitions_def=DailyPartitionsDefinition(start_date="2023-01-01") ) def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): - time_window = context.asset_partitions_time_window_for_output( - list(context.selected_output_names)[0] - ) + time_window = context.partition_time_window dbt_vars = { "min_date": time_window.start.isoformat(), diff --git a/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py index 772c991fc2041..1781416fc4622 100644 --- a/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py @@ -4,6 +4,7 @@ import pandas as pd import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -173,10 +174,8 @@ def test_not_supported_type(tmp_path, io_manager): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> pd.DataFrame: - partition = datetime.strptime( - context.asset_partition_key_for_output(), DELTA_DATE_FORMAT - ).date() +def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() value = context.op_config["value"] return pd.DataFrame( @@ -267,7 +266,7 @@ def test_load_partitioned_asset(tmp_path, io_manager): config_schema={"value": str}, ) def static_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( @@ -397,8 +396,8 @@ def test_multi_partitioned_asset(tmp_path, io_manager): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( { @@ -471,8 +470,10 @@ def test_self_dependent_asset(tmp_path, io_manager): }, config_schema={"value": str, "last_partition_key": str}, ) - def self_dependent_asset(context, self_dependent_asset: pd.DataFrame) -> pd.DataFrame: - key = datetime.strptime(context.asset_partition_key_for_output(), DELTA_DATE_FORMAT).date() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: pd.DataFrame + ) -> pd.DataFrame: + key = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() if self_dependent_asset.num_rows > 0: assert self_dependent_asset.num_rows == 3 diff --git a/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py index ad1c15f1146ba..329031ce51653 100644 --- a/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py @@ -4,6 +4,7 @@ import polars as pl import pytest from dagster import ( + AssetExecutionContext, AssetIn, DailyPartitionsDefinition, DynamicPartitionsDefinition, @@ -171,10 +172,8 @@ def test_not_supported_type(tmp_path, io_manager): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> pl.DataFrame: - partition = datetime.strptime( - context.asset_partition_key_for_output(), DELTA_DATE_FORMAT - ).date() +def daily_partitioned(context: AssetExecutionContext) -> pl.DataFrame: + partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() value = context.op_config["value"] return pl.DataFrame( @@ -265,7 +264,7 @@ def test_load_partitioned_asset(tmp_path, io_manager): config_schema={"value": str}, ) def static_partitioned(context) -> pl.DataFrame: - partition = context.asset_partition_key_for_output() + partition = context.partition_key value = context.op_config["value"] return pl.DataFrame( @@ -415,8 +414,8 @@ def test_multi_partitioned_asset(tmp_path, io_manager): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> pl.DataFrame: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> pl.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pl.DataFrame( { diff --git a/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py index 8ff5f2d1611ad..d04701ddd3643 100644 --- a/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py @@ -4,6 +4,7 @@ import pyarrow as pa import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -172,10 +173,8 @@ def test_not_supported_type(tmp_path, io_manager): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> pa.Table: - partition = datetime.strptime( - context.asset_partition_key_for_output(), DELTA_DATE_FORMAT - ).date() +def daily_partitioned(context: AssetExecutionContext) -> pa.Table: + partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() value = context.op_config["value"] return pa.Table.from_pydict( @@ -265,8 +264,8 @@ def test_load_partitioned_asset(tmp_path, io_manager): metadata={"partition_expr": "color"}, config_schema={"value": str}, ) -def static_partitioned(context) -> pa.Table: - partition = context.asset_partition_key_for_output() +def static_partitioned(context: AssetExecutionContext) -> pa.Table: + partition = context.partition_key value = context.op_config["value"] return pa.Table.from_pydict( @@ -396,8 +395,8 @@ def test_multi_partitioned_asset(tmp_path, io_manager): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> pa.Table: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> pa.Table: + partition = context.partition_key value = context.op_config["value"] return pa.Table.from_pydict( { @@ -470,8 +469,10 @@ def test_self_dependent_asset(tmp_path, io_manager): }, config_schema={"value": str, "last_partition_key": str}, ) - def self_dependent_asset(context, self_dependent_asset: pa.Table) -> pa.Table: - key = datetime.strptime(context.asset_partition_key_for_output(), DELTA_DATE_FORMAT).date() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: pa.Table + ) -> pa.Table: + key = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() if self_dependent_asset.num_rows > 0: assert self_dependent_asset.num_rows == 3 diff --git a/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py index c9bf03367f674..2b1dbd7380020 100644 --- a/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py @@ -4,6 +4,7 @@ import pandas as pd import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -193,8 +194,8 @@ def test_not_supported_type(tmp_path, io_managers): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> pd.DataFrame: - partition = pd.Timestamp(context.asset_partition_key_for_output()) +def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = pd.Timestamp(context.partition_key) value = context.op_config["value"] return pd.DataFrame( @@ -259,8 +260,8 @@ def test_time_window_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "color"}, config_schema={"value": str}, ) -def static_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() +def static_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( { @@ -403,8 +404,8 @@ def test_multi_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( { @@ -483,8 +484,10 @@ def test_self_dependent_asset(tmp_path, io_managers): }, config_schema={"value": str, "last_partition_key": str}, ) - def self_dependent_asset(context, self_dependent_asset: pd.DataFrame) -> pd.DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: pd.DataFrame + ) -> pd.DataFrame: + key = context.partition_key if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 diff --git a/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py index 87d56119a2974..ab4da094157b8 100644 --- a/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py @@ -4,6 +4,7 @@ import polars as pl import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -197,8 +198,8 @@ def test_not_supported_type(tmp_path, io_managers): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> pl.DataFrame: - df = pl.DataFrame({"date": context.asset_partition_key_for_output()}) +def daily_partitioned(context: AssetExecutionContext) -> pl.DataFrame: + df = pl.DataFrame({"date": context.partition_key}) partition = df.with_columns( pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d", strict=False).cast(pl.Datetime) )["date"][0] @@ -269,8 +270,8 @@ def test_time_window_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "color"}, config_schema={"value": str}, ) -def static_partitioned(context) -> pl.DataFrame: - partition = context.asset_partition_key_for_output() +def static_partitioned(context: AssetExecutionContext) -> pl.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pl.DataFrame( { @@ -427,8 +428,8 @@ def test_multi_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> pl.DataFrame: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> pl.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pl.DataFrame( { @@ -513,8 +514,10 @@ def test_self_dependent_asset(tmp_path, io_managers): }, config_schema={"value": str, "last_partition_key": str}, ) - def self_dependent_asset(context, self_dependent_asset: pl.DataFrame) -> pl.DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: pl.DataFrame + ) -> pl.DataFrame: + key = context.partition_key if not self_dependent_asset.is_empty(): assert len(self_dependent_asset["key"]) == 3 diff --git a/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py index 58897d9a13a44..05961cbd6e0b8 100644 --- a/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py @@ -4,6 +4,7 @@ import pandas as pd import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -176,8 +177,8 @@ def test_not_supported_type(tmp_path, io_managers): metadata={"partition_expr": "time"}, config_schema={"value": str}, ) -def daily_partitioned(context) -> SparkDF: - partition = pd.Timestamp(context.asset_partition_key_for_output()) +def daily_partitioned(context: AssetExecutionContext) -> SparkDF: + partition = pd.Timestamp(context.partition_key) value = context.op_config["value"] pd_df = pd.DataFrame( @@ -241,8 +242,8 @@ def test_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "color"}, config_schema={"value": str}, ) -def static_partitioned(context) -> SparkDF: - partition = context.asset_partition_key_for_output() +def static_partitioned(context: AssetExecutionContext) -> SparkDF: + partition = context.partition_key value = context.op_config["value"] pd_df = pd.DataFrame( { @@ -391,8 +392,8 @@ def test_multi_partitioned_asset(tmp_path, io_managers): metadata={"partition_expr": "fruit"}, config_schema={"value": str}, ) -def dynamic_partitioned(context) -> SparkDF: - partition = context.asset_partition_key_for_output() +def dynamic_partitioned(context: AssetExecutionContext) -> SparkDF: + partition = context.partition_key value = context.op_config["value"] pd_df = pd.DataFrame( { @@ -474,8 +475,10 @@ def test_self_dependent_asset(tmp_path, io_managers): }, config_schema={"value": str, "last_partition_key": str}, ) - def self_dependent_asset(context, self_dependent_asset: SparkDF) -> SparkDF: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: SparkDF + ) -> SparkDF: + key = context.partition_key if not self_dependent_asset.isEmpty(): pd_df = self_dependent_asset.toPandas() diff --git a/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py b/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py index c9a4a34b86344..a864470861239 100644 --- a/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py +++ b/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py @@ -7,6 +7,7 @@ import pandas_gbq import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -141,8 +142,8 @@ def test_time_window_partitioned_asset(io_manager): key_prefix=SCHEMA, name=table_name, ) - def daily_partitioned(context) -> pd.DataFrame: - partition = pd.Timestamp(context.asset_partition_key_for_output()) + def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = pd.Timestamp(context.partition_key) value = context.op_config["value"] return pd.DataFrame( @@ -220,8 +221,8 @@ def test_static_partitioned_asset(io_manager): config_schema={"value": str}, name=table_name, ) - def static_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() + def static_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( { @@ -394,8 +395,8 @@ def test_dynamic_partitioned_asset(io_manager): config_schema={"value": str}, name=table_name, ) - def dynamic_partitioned(context) -> pd.DataFrame: - partition = context.asset_partition_key_for_output() + def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: + partition = context.partition_key value = context.op_config["value"] return pd.DataFrame( { @@ -488,8 +489,10 @@ def test_self_dependent_asset(io_manager): config_schema={"value": str, "last_partition_key": str}, name=table_name, ) - def self_dependent_asset(context, self_dependent_asset: pd.DataFrame) -> pd.DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: pd.DataFrame + ) -> pd.DataFrame: + key = context.partition_key if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 diff --git a/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py b/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py index 5c6d89e1ec94d..30b9ef125b782 100644 --- a/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py +++ b/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py @@ -7,6 +7,7 @@ import pandas_gbq import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -192,8 +193,8 @@ def test_time_window_partitioned_asset(spark, io_manager): key_prefix=SCHEMA, name=table_name, ) - def daily_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def daily_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -281,8 +282,8 @@ def test_static_partitioned_asset(spark, io_manager): config_schema={"value": str}, name=table_name, ) - def static_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def static_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -470,8 +471,8 @@ def test_dynamic_partitions(spark, io_manager): config_schema={"value": str}, name=table_name, ) - def dynamic_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -572,8 +573,10 @@ def test_self_dependent_asset(spark, io_manager): config_schema={"value": str, "last_partition_key": str}, name=table_name, ) - def self_dependent_asset(context, self_dependent_asset: DataFrame) -> DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: DataFrame + ) -> DataFrame: + key = context.partition_key if not self_dependent_asset.isEmpty(): pd_df = self_dependent_asset.toPandas() diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index b9951ff5b1ccd..a263250eb4b56 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -7,6 +7,7 @@ import pandas import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -313,8 +314,8 @@ def test_time_window_partitioned_asset(io_manager): key_prefix=SCHEMA, name=table_name, ) - def daily_partitioned(context) -> DataFrame: - partition = Timestamp(context.asset_partition_key_for_output()) + def daily_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = Timestamp(context.partition_key) value = context.op_config["value"] return DataFrame( @@ -400,8 +401,8 @@ def test_static_partitioned_asset(io_manager): config_schema={"value": str}, name=table_name, ) - def static_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def static_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] return DataFrame( { @@ -591,8 +592,8 @@ def test_dynamic_partitions(io_manager): config_schema={"value": str}, name=table_name, ) - def dynamic_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] return DataFrame( { @@ -705,8 +706,10 @@ def test_self_dependent_asset(io_manager): config_schema={"value": str, "last_partition_key": str}, name=table_name, ) - def self_dependent_asset(context, self_dependent_asset: DataFrame) -> DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: DataFrame + ) -> DataFrame: + key = context.partition_key if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 diff --git a/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py b/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py index db357fce7eb18..677b0a66cadcf 100644 --- a/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py @@ -6,6 +6,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetIn, AssetKey, DailyPartitionsDefinition, @@ -202,8 +203,8 @@ def test_time_window_partitioned_asset(spark, io_manager): key_prefix=SCHEMA, name=table_name, ) - def daily_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def daily_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -311,8 +312,8 @@ def test_static_partitioned_asset(spark, io_manager): config_schema={"value": str}, name=table_name, ) - def static_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def static_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -544,8 +545,8 @@ def test_dynamic_partitions(spark, io_manager): config_schema={"value": str}, name=table_name, ) - def dynamic_partitioned(context) -> DataFrame: - partition = context.asset_partition_key_for_output() + def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: + partition = context.partition_key value = context.op_config["value"] schema = StructType( @@ -666,8 +667,10 @@ def test_self_dependent_asset(spark, io_manager): config_schema={"value": str, "last_partition_key": str}, name=table_name, ) - def self_dependent_asset(context, self_dependent_asset: DataFrame) -> DataFrame: - key = context.asset_partition_key_for_output() + def self_dependent_asset( + context: AssetExecutionContext, self_dependent_asset: DataFrame + ) -> DataFrame: + key = context.partition_key if not self_dependent_asset.isEmpty(): pd_df = self_dependent_asset.toPandas()