Skip to content

Commit

Permalink
maybe fix the issue??
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Sep 6, 2023
1 parent 18af806 commit 3b2dd28
Showing 1 changed file with 129 additions and 65 deletions.
194 changes: 129 additions & 65 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ def for_input_manager(
)
asset_partitions_subset = (
# TODO - need to solve the self partition issue with method refactor!!
self.asset_partitions_subset_for_input(name)
self.asset_partitions_subset_for_input(asset_key)
if self.has_asset_partitions_for_input(name)
else None
)
Expand Down Expand Up @@ -934,9 +934,7 @@ def _fetch_input_asset_version_info(self, key: AssetKey) -> None:
# Exclude AllPartitionMapping for now to avoid huge queries
# TODO - need to solve the self dependent partition issue with the method refactors
if input_name and self.has_asset_partitions_for_input(input_name):
subset = self.asset_partitions_subset_for_input(
input_name, require_valid_partitions=False
)
subset = self.asset_partitions_subset_for_input(key, require_valid_partitions=False)
input_keys = list(subset.get_partition_keys())

# This check represents a temporary constraint that prevents huge query results for upstream
Expand Down Expand Up @@ -1062,84 +1060,77 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool:
def asset_partition_key_range_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> PartitionKeyRange:
subset = self.asset_partitions_subset_for_asset(asset, is_dependency=is_dependency)
partition_key_ranges = subset.get_partition_key_ranges(
dynamic_partitions_store=self.instance
# if the asset we are getting the key_range for is a parent of the asset that is currently
# materializing, we need to load the key_range in a different way. So we must first figure out
# if asset is the parent of the currently materializing asset

asset_layer = self.job_def.asset_layer
currently_materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle)
asset_key = AssetKey.from_coercible(asset)

is_resulting_asset = (
currently_materializing_assets_def
and asset_key in currently_materializing_assets_def.keys_by_output_name.values()
)
is_parent_asset = (
currently_materializing_assets_def
and asset_key in currently_materializing_assets_def.keys_by_input_name.values()
)

if len(partition_key_ranges) != 1:
check.failed(
"Tried to access asset partition key range, but there are "
f"({len(partition_key_ranges)}) key ranges associated with this input.",
# when `asset` is a self-dependent partitioned asset, then is_resulting_asset and is_upstream_asset
# are both True. In this case, we defer to the user-provided is_dependency parameter. If
# is_dependency is True, then we return the key range of asset as a parent asset
get_partition_key_range_as_parent_asset = (is_parent_asset and not is_resulting_asset) or (
is_parent_asset and is_resulting_asset and is_dependency
)

if get_partition_key_range_as_parent_asset:
subset = self.asset_partitions_subset_for_input(asset)
partition_key_ranges = subset.get_partition_key_ranges(
dynamic_partitions_store=self.instance
)

return partition_key_ranges[0]
if len(partition_key_ranges) != 1:
check.failed(
"Tried to access asset partition key range, but there are "
f"({len(partition_key_ranges)}) key ranges associated with this input.",
)

return partition_key_ranges[0]
else:
return self.asset_partition_key_range

def asset_partitions_subset_for_input(
self, input_name: str, *, require_valid_partitions: bool = True
self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True
) -> PartitionsSubset:
# TODO - change other callsites to get the asset key before passing
asset_layer = self.job_def.asset_layer
upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)
assets_def = asset_layer.assets_def_for_node(self.node_handle)
upstream_asset_key = AssetKey.from_coercible(asset)

if upstream_asset_key is not None:
return self.asset_partitions_subset_for_asset(upstream_asset_key, is_dependency=True)

# this check message isn't quite accurate - it's really that the input doesn't correspond to an asset
check.failed("The input has no asset partitions")

def asset_partitions_subset_for_asset(
self,
asset: CoercibleToAssetKey,
*,
require_valid_partitions: bool = True,
is_dependency: bool = False,
) -> PartitionsSubset:
asset_layer = self.job_def.asset_layer
materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle)
asset_key = AssetKey.from_coercible(asset)

if asset_layer.has_assets_def_for_asset(asset_key) or (
asset_layer.source_assets_by_key.get(asset_key, None) is not None
):
asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key)
upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key)

if asset_partitions_def is None:
check.failed(f"The asset {asset_key} has no partitions")

is_resulting_asset = (
materializing_assets_def
and asset_key in materializing_assets_def.keys_by_output_name.values()
)
is_upstream_asset = (
materializing_assets_def
and asset_key in materializing_assets_def.keys_by_input_name.values()
)

get_partition_subset_as_upstream_asset = (
is_upstream_asset and not is_resulting_asset
) or (is_upstream_asset and is_resulting_asset and is_dependency)

if get_partition_subset_as_upstream_asset:
materializing_assets_partitions_def = (
materializing_assets_def.partitions_def if materializing_assets_def else None
)
if upstream_asset_partitions_def is not None:
partitions_def = assets_def.partitions_def if assets_def else None
partitions_subset = (
materializing_assets_partitions_def.empty_subset().with_partition_key_range(
partitions_def.empty_subset().with_partition_key_range(
self.asset_partition_key_range, dynamic_partitions_store=self.instance
)
if materializing_assets_partitions_def
if partitions_def
else None
)

partition_mapping = infer_partition_mapping(
asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key),
materializing_assets_partitions_def,
asset_partitions_def,
asset_layer.partition_mapping_for_node_input(
self.node_handle, upstream_asset_key
),
partitions_def,
upstream_asset_partitions_def,
)
mapped_partitions_result = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
partitions_subset,
asset_partitions_def,
upstream_asset_partitions_def,
dynamic_partitions_store=self.instance,
)
)
Expand All @@ -1152,13 +1143,86 @@ def asset_partitions_subset_for_asset(
f"Partition key range {self.asset_partition_key_range} in"
f" {self.node_handle.name} depends on invalid partition keys"
f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in"
f" asset {asset_key}"
f" upstream asset {upstream_asset_key}"
)

return mapped_partitions_result.partitions_subset
else:
return partitions_subset
check.failed(f"The asset {asset} has no partitions")

check.failed("The input has no asset partitions")

# def asset_partitions_subset_for_asset(
# self,
# asset: CoercibleToAssetKey,
# *,
# require_valid_partitions: bool = True,
# is_dependency: bool = False,
# ) -> PartitionsSubset:
# asset_layer = self.job_def.asset_layer
# materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle)
# asset_key = AssetKey.from_coercible(asset)

# if asset_layer.has_assets_def_for_asset(asset_key) or (
# asset_layer.source_assets_by_key.get(asset_key, None) is not None
# ):
# asset_partitions_def = asset_layer.partitions_def_for_asset(asset_key)

# if asset_partitions_def is None:
# check.failed(f"The asset {asset_key} has no partitions")

# is_resulting_asset = (
# materializing_assets_def
# and asset_key in materializing_assets_def.keys_by_output_name.values()
# )
# is_upstream_asset = (
# materializing_assets_def
# and asset_key in materializing_assets_def.keys_by_input_name.values()
# )

# get_partition_subset_as_upstream_asset = (
# is_upstream_asset and not is_resulting_asset
# ) or (is_upstream_asset and is_resulting_asset and is_dependency)

# materializing_assets_partitions_def = (
# materializing_assets_def.partitions_def if materializing_assets_def else None
# )
# partitions_subset = (
# materializing_assets_partitions_def.empty_subset().with_partition_key_range(
# self.asset_partition_key_range, dynamic_partitions_store=self.instance
# )
# if materializing_assets_partitions_def
# else None
# )

# if get_partition_subset_as_upstream_asset:

# partition_mapping = infer_partition_mapping(
# asset_layer.partition_mapping_for_node_input(self.node_handle, asset_key),
# materializing_assets_partitions_def,
# asset_partitions_def,
# )
# mapped_partitions_result = (
# partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
# partitions_subset,
# asset_partitions_def,
# dynamic_partitions_store=self.instance,
# )
# )

# if (
# require_valid_partitions
# and mapped_partitions_result.required_but_nonexistent_partition_keys
# ):
# raise DagsterInvariantViolationError(
# f"Partition key range {self.asset_partition_key_range} in"
# f" {self.node_handle.name} depends on invalid partition keys"
# f" {mapped_partitions_result.required_but_nonexistent_partition_keys} in"
# f" asset {asset_key}"
# )

# return mapped_partitions_result.partitions_subset
# else:
# return partitions_subset
# check.failed(f"The asset {asset} has no partitions")

# def asset_partition_key_for_input(self, input_name: str) -> str:
# asset_layer = self.job_def.asset_layer
Expand Down

0 comments on commit 3b2dd28

Please sign in to comment.