Skip to content

Commit

Permalink
deprecations
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 10, 2024
1 parent f6928e5 commit 90ceb0f
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,17 @@ def _copy_docs_from_op_execution_context(obj):
"dagster_run": "run",
"run_config": "run.run_config",
"run_tags": "run.tags",
"asset_partition_key_for_output": "partition_info.key",
"asset_partitions_time_window_for_output": "partition_info.time_window",
"asset_partition_key_range_for_output": "partition_info.key_range",
"asset_partition_key_range_for_input": "upstream_partition_info(asset_key).key_range",
"asset_partition_key_for_input": "upstream_partition_info(asset_key).key",
"asset_partitions_def_for_output": "partition_info.definition",
"asset_partitions_def_for_input": "upstream_partition_info(asset_key).definition",
"asset_partition_keys_for_output": "partition_info.keys",
"asset_partition_keys_for_input": "upstream_partition_info(asset_key).keys",
"asset_partitions_time_window_for_input": "upstream_partition_info(asset_key).time_window",
"has_partition_key": "is_partitioned_materialization",
}

ALTERNATE_EXPRESSIONS = {
Expand Down Expand Up @@ -1496,11 +1507,16 @@ def latest_materialization_event(
AssetKey.from_coercible(key)
)

@public
@property
def is_partitioned_materialization(self) -> bool:
return self.op_execution_context.has_partition_key

@cached_property
def partition_info(self) -> PartitionInfo:
"""Returns a filled out PartitionInfo for the currently materializing asset."""
partitions_def = self.assets_def.partitions_def
if self.op_execution_context.has_partition_key and partitions_def:
if self.is_partitioned_materialization and partitions_def:
key_range = self.op_execution_context.partition_key_range
return PartitionInfo(
key=self.op_execution_context.partition_key
Expand Down Expand Up @@ -1528,7 +1544,7 @@ def upstream_partition_info(self, key: CoercibleToAssetKey) -> PartitionInfo:
partitions_def = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset(
asset_key
)
if self.op_execution_context.has_partition_key and partitions_def:
if self.is_partitioned_materialization and partitions_def:
key_range = self._step_execution_context.asset_partition_key_range_for_upstream(
asset_key
)
Expand Down Expand Up @@ -1593,18 +1609,21 @@ def has_tag(self, key: str) -> bool:
def get_tag(self, key: str) -> Optional[str]:
return self.op_execution_context.get_tag(key)

@deprecated(**_get_deprecation_kwargs("has_partition_key"))
@public
@property
@_copy_docs_from_op_execution_context
def has_partition_key(self) -> bool:
return self.op_execution_context.has_partition_key

@deprecated(**_get_deprecation_kwargs("partition_key"))
@public
@property
@_copy_docs_from_op_execution_context
def partition_key(self) -> str:
return self.op_execution_context.partition_key

@deprecated(**_get_deprecation_kwargs("partition_keys"))
@public
@property
@_copy_docs_from_op_execution_context
Expand All @@ -1618,65 +1637,77 @@ def partition_keys(self) -> Sequence[str]:
def asset_partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range

@deprecated(**_get_deprecation_kwargs("partition_key_range"))
@public
@property
@_copy_docs_from_op_execution_context
def partition_key_range(self) -> PartitionKeyRange:
return self.op_execution_context.partition_key_range

@deprecated(**_get_deprecation_kwargs("partition_time_window"))
@public
@property
@_copy_docs_from_op_execution_context
def partition_time_window(self) -> TimeWindow:
return self.op_execution_context.partition_time_window

@deprecated(**_get_deprecation_kwargs("asset_partition_key_for_output"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_key_for_output(self, output_name: str = "result") -> str:
return self.op_execution_context.asset_partition_key_for_output(output_name=output_name)

@deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_output"))
@public
@_copy_docs_from_op_execution_context
def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow:
return self.op_execution_context.asset_partitions_time_window_for_output(output_name)

@deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_output"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_key_range_for_output(
self, output_name: str = "result"
) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range_for_output(output_name)

@deprecated(**_get_deprecation_kwargs("asset_partition_key_range_for_input"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange:
return self.op_execution_context.asset_partition_key_range_for_input(input_name)

@deprecated(**_get_deprecation_kwargs("asset_partition_key_for_input"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_key_for_input(self, input_name: str) -> str:
return self.op_execution_context.asset_partition_key_for_input(input_name)

@deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_output"))
@public
@_copy_docs_from_op_execution_context
def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition:
return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name)

@deprecated(**_get_deprecation_kwargs("asset_partitions_def_for_input"))
@public
@_copy_docs_from_op_execution_context
def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition:
return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name)

@deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_output"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]:
return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name)

@deprecated(**_get_deprecation_kwargs("asset_partition_keys_for_input"))
@public
@_copy_docs_from_op_execution_context
def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]:
return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name)

@deprecated(**_get_deprecation_kwargs("asset_partitions_time_window_for_input"))
@public
@_copy_docs_from_op_execution_context
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow:
Expand Down

0 comments on commit 90ceb0f

Please sign in to comment.