Skip to content

Commit

Permalink
reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 7, 2023
1 parent 6c2afe3 commit 39bb474
Showing 1 changed file with 67 additions and 65 deletions.
132 changes: 67 additions & 65 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,6 @@ def partition_key(self) -> str:
"""
return self._step_execution_context.partition_key

@deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.")
@public
@property
def asset_partition_key_range(self) -> PartitionKeyRange:
"""The range of partition keys for the current run.
If run is for a single partition key, return a `PartitionKeyRange` with the same start and
end. Raises an error if the current run is not a partitioned run.
"""
return self.partition_key_range

@public
@property
def partition_key_range(self) -> PartitionKeyRange:
Expand Down Expand Up @@ -540,13 +529,69 @@ def asset_key_for_input(self, input_name: str) -> AssetKey:
else:
return key

@public
@experimental
def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]:
"""Return the provenance information for the most recent materialization of an asset.
Args:
asset_key (AssetKey): Key of the asset for which to retrieve provenance.
Returns:
Optional[DataProvenance]: Provenance information for the most recent
materialization of the asset. Returns `None` if the asset was never materialized or
the materialization record is too old to contain provenance information.
"""
record = self.instance.get_latest_data_version_record(asset_key)

return (
None if record is None else extract_data_provenance_from_entry(record.event_log_entry)
)

def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None:
"""Set the data version for an asset being materialized by the currently executing step.
This is useful for external execution situations where it is not possible to return
an `Output`.
Args:
asset_key (AssetKey): Key of the asset for which to set the data version.
data_version (DataVersion): The data version to set.
"""
self._step_execution_context.set_data_version(asset_key, data_version)

@property
def asset_check_spec(self) -> AssetCheckSpec:
asset_checks_def = check.not_none(
self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle),
"This context does not correspond to an AssetChecksDefinition",
)
return asset_checks_def.spec

### Partition methods

@deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.")
@public
@property
def asset_partition_key_range(self) -> PartitionKeyRange:
"""The range of partition keys for the current run.
If run is for a single partition key, return a `PartitionKeyRange` with the same start and
end. Raises an error if the current run is not a partitioned run.
"""
return self.partition_key_range

@public
def asset_partition_key_for_output(self, output_name: str = "result") -> str:
"""Returns the asset partition key for the given output. Defaults to "result", which is the
name of the default output.
"""
return self._step_execution_context.asset_partition_key_for_output(output_name)

@public
def asset_partition_key_for_input(self, input_name: str) -> str:
"""Returns the partition key of the upstream asset corresponding to the given input."""
return self._step_execution_context.asset_partition_key_for_input(input_name)

@public
def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow:
"""The time window for the partitions of the output asset.
Expand All @@ -558,6 +603,17 @@ def asset_partitions_time_window_for_output(self, output_name: str = "result") -
"""
return self._step_execution_context.asset_partitions_time_window_for_output(output_name)

@public
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow:
"""The time window for the partitions of the input asset.
Raises an error if either of the following are true:
- The input asset has no partitioning.
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a
MultiPartitionsDefinition with one time-partitioned dimension.
"""
return self._step_execution_context.asset_partitions_time_window_for_input(input_name)

@public
def asset_partition_key_range_for_output(
self, output_name: str = "result"
Expand All @@ -570,11 +626,6 @@ def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRa
"""Return the PartitionKeyRange for the corresponding input. Errors if there is more or less than one."""
return self._step_execution_context.asset_partition_key_range_for_input(input_name)

@public
def asset_partition_key_for_input(self, input_name: str) -> str:
"""Returns the partition key of the upstream asset corresponding to the given input."""
return self._step_execution_context.asset_partition_key_for_input(input_name)

@public
def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition:
"""The PartitionsDefinition on the upstream asset corresponding to this input."""
Expand Down Expand Up @@ -624,55 +675,6 @@ def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]:
).get_partition_keys()
)

@public
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow:
"""The time window for the partitions of the input asset.
Raises an error if either of the following are true:
- The input asset has no partitioning.
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a
MultiPartitionsDefinition with one time-partitioned dimension.
"""
return self._step_execution_context.asset_partitions_time_window_for_input(input_name)

@public
@experimental
def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]:
"""Return the provenance information for the most recent materialization of an asset.
Args:
asset_key (AssetKey): Key of the asset for which to retrieve provenance.
Returns:
Optional[DataProvenance]: Provenance information for the most recent
materialization of the asset. Returns `None` if the asset was never materialized or
the materialization record is too old to contain provenance information.
"""
record = self.instance.get_latest_data_version_record(asset_key)

return (
None if record is None else extract_data_provenance_from_entry(record.event_log_entry)
)

def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None:
"""Set the data version for an asset being materialized by the currently executing step.
This is useful for external execution situations where it is not possible to return
an `Output`.
Args:
asset_key (AssetKey): Key of the asset for which to set the data version.
data_version (DataVersion): The data version to set.
"""
self._step_execution_context.set_data_version(asset_key, data_version)

@property
def asset_check_spec(self) -> AssetCheckSpec:
asset_checks_def = check.not_none(
self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle),
"This context does not correspond to an AssetChecksDefinition",
)
return asset_checks_def.spec


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
Expand Down

0 comments on commit 39bb474

Please sign in to comment.