Skip to content

Commit

Permalink
test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 6, 2023
1 parent e4d2d1c commit 9c7ac93
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 47 deletions.
23 changes: 5 additions & 18 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,17 +554,7 @@ def partition_key_range_for_asset(
@public
def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefinition:
"""The PartitionsDefinition on the provided asset."""
asset_key = AssetKey.from_coercible(asset)
result = self._step_execution_context.job_def.asset_layer.partitions_def_for_asset(
asset_key
)
if result is None:
raise DagsterInvariantViolationError(
f"Attempting to access partitions def for asset {asset_key}, but it is not"
" partitioned"
)

return result
return self._step_execution_context.partitions_def_for_asset(asset)

@public
def partition_keys_for_asset(
Expand All @@ -578,12 +568,7 @@ def partition_keys_for_asset(
of the dependecy version of the asset, not the partition of the asset as currently being materialized.
Defaults to False.
"""
return self.partitions_def_for_asset(asset).get_partition_keys_in_range(
self._step_execution_context.asset_partition_key_range_for_asset(
asset, is_dependency=is_dependency
),
dynamic_partitions_store=self.instance,
)
return self._step_execution_context.asset_partition_keys_for_asset(asset)

@public
@experimental
Expand Down Expand Up @@ -643,7 +628,9 @@ def asset_partition_key_range(self) -> PartitionKeyRange:
additional_warn_text="Use `partition_key` or `partition_key_for_asset` instead.",
)
@public
def asset_partition_key_for_output(self, output_name: str = "result") -> Union[str, MultiPartitionKey]:
def asset_partition_key_for_output(
self, output_name: str = "result"
) -> Union[str, MultiPartitionKey]:
"""Returns the asset partition key for the given output. Defaults to "result", which is the
name of the default output.
"""
Expand Down
56 changes: 42 additions & 14 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1060,34 +1060,63 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool:

# return partition_key_ranges[0]

def asset_partition_key_range_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> PartitionKeyRange:
# if the asset we are getting the key_range for is a parent of the asset that is currently
# materializing, we need to load the key_range in a different way. So we must first figure out
def _load_partition_info_as_upstream_asset(
self, current_asset: CoercibleToAssetKey, is_dependency: bool
) -> bool:
# In some cases, if the asset we are getting the partition info for is a parent of the asset that is currently
# materializing, we need to load the info in a different way. So we must first figure out
# if asset is the parent of the currently materializing asset

asset_layer = self.job_def.asset_layer
currently_materializing_assets_def = asset_layer.assets_def_for_node(self.node_handle)
asset_key = AssetKey.from_coercible(asset)
asset_key = AssetKey.from_coercible(current_asset)

is_resulting_asset = (
currently_materializing_assets_def
currently_materializing_assets_def is not None
and asset_key in currently_materializing_assets_def.keys_by_output_name.values()
)
is_parent_asset = (
currently_materializing_assets_def
is_upstream_asset = (
currently_materializing_assets_def is not None
and asset_key in currently_materializing_assets_def.keys_by_input_name.values()
)

# when `asset` is a self-dependent partitioned asset, then is_resulting_asset and is_upstream_asset
# are both True. In this case, we defer to the user-provided is_dependency parameter. If
# is_dependency is True, then we return the key range of asset as a parent asset
get_partition_key_range_as_parent_asset = (is_parent_asset and not is_resulting_asset) or (
is_parent_asset and is_resulting_asset and is_dependency
# is_dependency is True, then we return the partition info of asset as an upstream asset
get_partition_key_range_as_upstream_asset = (
is_upstream_asset and not is_resulting_asset
) or (is_upstream_asset and is_resulting_asset and is_dependency)

return get_partition_key_range_as_upstream_asset

def partitions_def_for_asset(self, asset: CoercibleToAssetKey) -> PartitionsDefinition:
"""The PartitionsDefinition on the provided asset."""
asset_key = AssetKey.from_coercible(asset)
result = self.job_def.asset_layer.partitions_def_for_asset(asset_key)

if result is None:
raise DagsterInvariantViolationError(
f"Attempting to access partitions def for asset {asset_key}, but it is not"
" partitioned"
)
return result

def asset_partition_keys_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> Sequence[str]:
if self._load_partition_info_as_upstream_asset(asset, is_dependency=is_dependency):
return list(self.asset_partitions_subset_for_input(asset).get_partition_keys())
return self.partitions_def_for_asset(asset).get_partition_keys_in_range(
self.asset_partition_key_range_for_asset(asset),
dynamic_partitions_store=self.instance,
)

if get_partition_key_range_as_parent_asset:
def asset_partition_key_range_for_asset(
self, asset: CoercibleToAssetKey, is_dependency: bool = False
) -> PartitionKeyRange:
if self._load_partition_info_as_upstream_asset(
current_asset=asset, is_dependency=is_dependency
):
subset = self.asset_partitions_subset_for_input(asset)
partition_key_ranges = subset.get_partition_key_ranges(
dynamic_partitions_store=self.instance
Expand All @@ -1106,7 +1135,6 @@ def asset_partition_key_range_for_asset(
def asset_partitions_subset_for_input(
self, asset: CoercibleToAssetKey, *, require_valid_partitions: bool = True
) -> PartitionsSubset:
# TODO - change other callsites to get the asset key before passing
asset_layer = self.job_def.asset_layer
assets_def = asset_layer.assets_def_for_node(self.node_handle)
upstream_asset_key = AssetKey.from_coercible(asset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
from dagster._core.types.dagster_type import resolve_dagster_type


@pytest.fixture(autouse=True)
def error_on_warning():
# turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
warnings.resetwarnings()
# @pytest.fixture(autouse=True)
# def error_on_warning():
# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
# warnings.resetwarnings()

warnings.filterwarnings("error")
# warnings.filterwarnings("error")


def test_asset_no_decorator_args():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
from dagster._core.test_utils import ignore_warning, instance_for_test


@pytest.fixture(autouse=True)
def error_on_warning():
# turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
warnings.resetwarnings()
# @pytest.fixture(autouse=True)
# def error_on_warning():
# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
# warnings.resetwarnings()

warnings.filterwarnings("error")
# warnings.filterwarnings("error")


def test_basic_materialize():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
from dagster._seven.compat.pendulum import create_pendulum_time


@pytest.fixture(autouse=True)
def error_on_warning():
# turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
warnings.resetwarnings()
# @pytest.fixture(autouse=True)
# def error_on_warning():
# # turn off any outer warnings filters, e.g. ignores that are set in pyproject.toml
# warnings.resetwarnings()

warnings.filterwarnings("error")
# warnings.filterwarnings("error")


def get_upstream_partitions_for_partition_range(
Expand Down

0 comments on commit 9c7ac93

Please sign in to comment.