Skip to content

Commit

Permalink
some renaming and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 6, 2023
1 parent 658e8c4 commit da34a6a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# "2023-08-21"
"""
return self._step_execution_context.asset_partition_key_for_asset(
return self._step_execution_context.partition_key_for_asset(
asset, is_dependency=is_dependency
)

Expand Down Expand Up @@ -736,7 +736,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# PartitionKeyRange(start="2023-08-21", end="2023-08-25")
"""
return self._step_execution_context.asset_partition_key_range_for_asset(
return self._step_execution_context.partition_key_range_for_asset(
asset, is_dependency=is_dependency
)

Expand Down Expand Up @@ -830,7 +830,7 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
"""
return self._step_execution_context.asset_partition_keys_for_asset(asset)
return self._step_execution_context.partition_keys_for_asset(asset)

@public
@experimental
Expand Down
172 changes: 18 additions & 154 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,7 @@ def for_input_manager(
node_handle=self.node_handle, input_name=name
)
asset_partitions_subset = (
# TODO - need to solve the self partition issue with method refactor!!
self.asset_partitions_subset_for_input(asset_key)
self.partitions_subset_for_upstream_asset(asset_key)
if self.has_asset_partitions_for_input(name)
else None
)
Expand Down Expand Up @@ -937,7 +936,9 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
# Exclude AllPartitionMapping for now to avoid huge queries
# TODO - need to solve the self dependent partition issue with the method refactors
if input_name and self.has_asset_partitions_for_input(input_name):
subset = self.asset_partitions_subset_for_input(key, require_valid_partitions=False)
subset = self.partitions_subset_for_upstream_asset(
key, require_valid_partitions=False
)
input_keys = list(subset.get_partition_keys())

# This check represents a temporary constraint that prevents huge query results for upstream
Expand All @@ -960,24 +961,6 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
storage_id, data_version, event.run_id, event.timestamp
)

# def partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
# asset_layer = self.job_def.asset_layer
# upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
# if upstream_asset_key:
# upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key)
# assets_def = asset_layer.assets_def_for_node(self.node_handle)
# partitions_def = assets_def.partitions_def if assets_def else None
# explicit_partition_mapping = self.job_def.asset_layer.partition_mapping_for_node_input(
# self.node_handle, upstream_asset_key
# )
# return infer_partition_mapping(
# explicit_partition_mapping,
# partitions_def,
# upstream_asset_partitions_def,
# )
# else:
# return None

def _get_input_asset_event(self, key: AssetKey) -> Optional["EventLogRecord"]:
event = self.instance.get_latest_data_version_record(key)
if event:
Expand Down Expand Up @@ -1037,6 +1020,8 @@ def get_output_asset_keys(self) -> AbstractSet[AssetKey]:
output_keys.add(asset_info.key)
return output_keys

#### Partitions methods

def has_asset_partitions_for_input(self, input_name: str) -> bool:
asset_layer = self.job_def.asset_layer
upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
Expand All @@ -1046,20 +1031,6 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool:
and asset_layer.partitions_def_for_asset(upstream_asset_key) is not None
)

# def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange:
# subset = self.asset_partitions_subset_for_input(input_name)
# partition_key_ranges = subset.get_partition_key_ranges(
# dynamic_partitions_store=self.instance
# )

# if len(partition_key_ranges) != 1:
# check.failed(
# "Tried to access asset partition key range, but there are "
# f"({len(partition_key_ranges)}) key ranges associated with this input.",
# )

# return partition_key_ranges[0]

def _load_partition_info_as_upstream_asset(
self, current_asset: CoercibleToAssetKey, is_dependency: bool
) -> bool:
Expand Down Expand Up @@ -1101,23 +1072,23 @@ def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefi
)
return result

def asset_partition_keys_for_asset(
def partition_keys_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> Sequence[str]:
if self._load_partition_info_as_upstream_asset(asset, is_dependency=is_dependency):
return list(self.asset_partitions_subset_for_input(asset).get_partition_keys())
return list(self.partitions_subset_for_upstream_asset(asset).get_partition_keys())
return self.partitions_def_for_asset(asset).get_partition_keys_in_range(
self.asset_partition_key_range_for_asset(asset),
self.partition_key_range_for_asset(asset),
dynamic_partitions_store=self.instance,
)

def asset_partition_key_range_for_asset(
def partition_key_range_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> PartitionKeyRange:
if self._load_partition_info_as_upstream_asset(
current_asset=asset, is_dependency=is_dependency
):
subset = self.asset_partitions_subset_for_input(asset)
subset = self.partitions_subset_for_upstream_asset(asset)
partition_key_ranges = subset.get_partition_key_ranges(
dynamic_partitions_store=self.instance
)
Expand All @@ -1132,7 +1103,7 @@ def asset_partition_key_range_for_asset(
else:
return self.asset_partition_key_range

def asset_partitions_subset_for_input(
def partitions_subset_for_upstream_asset(
self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True
) -> PartitionsSubset:
asset_layer = self.job_def.asset_layer
Expand Down Expand Up @@ -1181,99 +1152,10 @@ def asset_partitions_subset_for_input(

check.failed("The input has no asset partitions")

# def asset_partitions_subset_for_asset(
# self,
# asset: CoercibleToAssetKey,
# *,
# require_valid_partitions: bool = True,
# is_dependency: bool = False,
# ) -> PartitionsSubset:
# asset_layer = self.job_def.asset_layer
# materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle)
# asset_key = AssetKey.from_coercible(asset)

# if asset_layer.has_assets_def_for_asset(asset_key) or (
# asset_layer.source_assets_by_key.get(asset_key, None) is not None
# ):
# asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key)

# if asset_partitions_def is None:
# check.failed(f"The asset {asset_key} has no partitions")

# is_resulting_asset = (
# materializing_assets_def
# and asset_key in materializing_assets_def.keys_by_output_name.values()
# )
# is_upstream_asset = (
# materializing_assets_def
# and asset_key in materializing_assets_def.keys_by_input_name.values()
# )

# get_partition_subset_as_upstream_asset = (
# is_upstream_asset and not is_resulting_asset
# ) or (is_upstream_asset and is_resulting_asset and is_dependency)

# materializing_assets_partitions_def = (
# materializing_assets_def.partitions_def if materializing_assets_def else None
# )
# partitions_subset = (
# materializing_assets_partitions_def.empty_subset().with_partition_key_range(
# self.asset_partition_key_range, dynamic_partitions_store=self.instance
# )
# if materializing_assets_partitions_def
# else None
# )

# if get_partition_subset_as_upstream_asset:

# partition_mapping = infer_partition_mapping(
# asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key),
# materializing_assets_partitions_def,
# asset_partitions_def,
# )
# mapped_partitions_result = (
# partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
# partitions_subset,
# asset_partitions_def,
# dynamic_partitions_store=self.instance,
# )
# )

# if (
# require_valid_partitions
# and mapped_partitions_result.required_but_nonexistent_partition_keys
# ):
# raise DagsterInvariantViolationError(
# f"Partition key range {self.asset_partition_key_range} in"
# f" {self.node_handle.name} depends on invalid partition keys"
# f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in"
# f" asset {asset_key}"
# )

# return mapped_partitions_result.partitions_subset
# else:
# return partitions_subset
# check.failed(f"The asset {asset} has no partitions")

# def asset_partition_key_for_input(self, input_name: str) -> Union[str, MultiPartitionKey]:
# asset_layer = self.job_def.asset_layer
# upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
# if upstream_asset_key is not None:
# start, end = self.asset_partition_key_range_for_asset(upstream_asset_key)
# else:
# check.failed(f"No asset corresponds to input '{input_name}' or step '{self.step.key}'")
# if start == end:
# return start
# else:
# check.failed(
# f"Tried to access partition key for input '{input_name}' of step '{self.step.key}',"
# f" but the step input has a partition range: '{start}' to '{end}'."
# )

def asset_partition_key_for_asset(
def partition_key_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> Union[str, MultiPartitionKey]:
start, end = self.asset_partition_key_range_for_asset(asset, is_dependency=is_dependency)
start, end = self.partition_key_range_for_asset(asset, is_dependency=is_dependency)
if start == end:
return start
else:
Expand Down Expand Up @@ -1315,7 +1197,7 @@ def asset_partition_key_for_output(self, output_name: str) -> str:
)

def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindow:
"""The time window for the partitions of the asset correponding to the given output.
"""The time window for the partitions of the asset corresponding to the given output.
Raises an error if either of the following are true:
- The output asset has no partitioning.
Expand All @@ -1326,27 +1208,11 @@ def asset_partitions_time_window_for_output(self, output_name: str) -> TimeWindo
node_handle=self.node_handle, output_name=output_name
)
if asset_info:
return self.asset_partitions_time_window_for_asset(asset_info.key)
return self.partitions_time_window_for_asset(asset_info.key)

raise ValueError("The provided output does not correspond to an asset.")

# def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow:
# """The time window for the partitions of the asset corresponding to the given input.

# Raises an error if either of the following are true:
# - The input asset has no partitioning.
# - The input asset is not partitioned with a TimeWindowPartitionsDefinition or a
# MultiPartitionsDefinition with one time-partitioned dimension.
# """
# asset_layer = self.job_def.asset_layer
# upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)

# if upstream_asset_key is not None:
# return self.asset_partitions_time_window_for_asset(upstream_asset_key)

# raise ValueError("The provided input does not correspond to an asset.")

def asset_partitions_time_window_for_asset(
def partitions_time_window_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> TimeWindow:
"""The time window for the partitions of the asset.
Expand Down Expand Up @@ -1383,9 +1249,7 @@ def asset_partitions_time_window_for_asset(
Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition],
asset_partitions_def,
)
partition_key_range = self.asset_partition_key_range_for_asset(
asset, is_dependency=is_dependency
)
partition_key_range = self.partition_key_range_for_asset(asset, is_dependency=is_dependency)

return TimeWindow(
asset_partitions_def.time_window_for_partition_key(partition_key_range.start).start,
Expand Down

0 comments on commit da34a6a

Please sign in to comment.