From cf95455137bd40644c6278f60e66fcbe1e588bd7 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Sep 2023 11:46:45 -0400 Subject: [PATCH] reorg --- .../_core/execution/context/compute.py | 132 +++++++++--------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 17b4566ae2cb7..73db6551f561b 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -264,17 +264,6 @@ def partition_key(self) -> str: """ return self._step_execution_context.partition_key - @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") - @public - @property - def asset_partition_key_range(self) -> PartitionKeyRange: - """The range of partition keys for the current run. - - If run is for a single partition key, return a `PartitionKeyRange` with the same start and - end. Raises an error if the current run is not a partitioned run. - """ - return self.partition_key_range - @public @property def partition_key_range(self) -> PartitionKeyRange: @@ -540,6 +529,57 @@ def asset_key_for_input(self, input_name: str) -> AssetKey: else: return key + @public + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + """Return the provenance information for the most recent materialization of an asset. + + Args: + asset_key (AssetKey): Key of the asset for which to retrieve provenance. + + Returns: + Optional[DataProvenance]: Provenance information for the most recent + materialization of the asset. Returns `None` if the asset was never materialized or + the materialization record is too old to contain provenance information. + """ + record = self.instance.get_latest_data_version_record(asset_key) + + return ( + None if record is None else extract_data_provenance_from_entry(record.event_log_entry) + ) + + def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: + """Set the data version for an asset being materialized by the currently executing step. + This is useful for external execution situations where it is not possible to return + an `Output`. + + Args: + asset_key (AssetKey): Key of the asset for which to set the data version. + data_version (DataVersion): The data version to set. + """ + self._step_execution_context.set_data_version(asset_key, data_version) + + @property + def asset_check_spec(self) -> AssetCheckSpec: + asset_checks_def = check.not_none( + self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle), + "This context does not correspond to an AssetChecksDefinition", + ) + return asset_checks_def.spec + + ### Partition methods + + @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") + @public + @property + def asset_partition_key_range(self) -> PartitionKeyRange: + """The range of partition keys for the current run. + + If run is for a single partition key, return a `PartitionKeyRange` with the same start and + end. Raises an error if the current run is not a partitioned run. + """ + return self.partition_key_range + @public def asset_partition_key_for_output(self, output_name: str = "result") -> str: """Returns the asset partition key for the given output. Defaults to "result", which is the @@ -547,6 +587,11 @@ def asset_partition_key_for_output(self, output_name: str = "result") -> str: """ return self._step_execution_context.asset_partition_key_for_output(output_name) + @public + def asset_partition_key_for_input(self, input_name: str) -> str: + """Returns the partition key of the upstream asset corresponding to the given input.""" + return self._step_execution_context.asset_partition_key_for_input(input_name) + @public def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: """The time window for the partitions of the output asset. @@ -558,6 +603,17 @@ def asset_partitions_time_window_for_output(self, output_name: str = "result") - """ return self._step_execution_context.asset_partitions_time_window_for_output(output_name) + @public + def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: + """The time window for the partitions of the input asset. + + Raises an error if either of the following are true: + - The input asset has no partitioning. + - The input asset is not partitioned with a TimeWindowPartitionsDefinition or a + MultiPartitionsDefinition with one time-partitioned dimension. + """ + return self._step_execution_context.asset_partitions_time_window_for_input(input_name) + @public def asset_partition_key_range_for_output( self, output_name: str = "result" @@ -570,11 +626,6 @@ def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRa """Return the PartitionKeyRange for the corresponding input. Errors if there is more or less than one.""" return self._step_execution_context.asset_partition_key_range_for_input(input_name) - @public - def asset_partition_key_for_input(self, input_name: str) -> str: - """Returns the partition key of the upstream asset corresponding to the given input.""" - return self._step_execution_context.asset_partition_key_for_input(input_name) - @public def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: """The PartitionsDefinition on the upstream asset corresponding to this input.""" @@ -624,55 +675,6 @@ def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: ).get_partition_keys() ) - @public - def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: - """The time window for the partitions of the input asset. - - Raises an error if either of the following are true: - - The input asset has no partitioning. - - The input asset is not partitioned with a TimeWindowPartitionsDefinition or a - MultiPartitionsDefinition with one time-partitioned dimension. - """ - return self._step_execution_context.asset_partitions_time_window_for_input(input_name) - - @public - @experimental - def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: - """Return the provenance information for the most recent materialization of an asset. - - Args: - asset_key (AssetKey): Key of the asset for which to retrieve provenance. - - Returns: - Optional[DataProvenance]: Provenance information for the most recent - materialization of the asset. Returns `None` if the asset was never materialized or - the materialization record is too old to contain provenance information. - """ - record = self.instance.get_latest_data_version_record(asset_key) - - return ( - None if record is None else extract_data_provenance_from_entry(record.event_log_entry) - ) - - def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: - """Set the data version for an asset being materialized by the currently executing step. - This is useful for external execution situations where it is not possible to return - an `Output`. - - Args: - asset_key (AssetKey): Key of the asset for which to set the data version. - data_version (DataVersion): The data version to set. - """ - self._step_execution_context.set_data_version(asset_key, data_version) - - @property - def asset_check_spec(self) -> AssetCheckSpec: - asset_checks_def = check.not_none( - self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle), - "This context does not correspond to an AssetChecksDefinition", - ) - return asset_checks_def.spec - # actually forking the object type for assets is tricky for users in the cases of: # * manually constructing ops to make AssetsDefinitions