Skip to content

Commit

Permalink
[module-loaders] Delete extra_source_assets param from assets_from_mo…
Browse files Browse the repository at this point in the history
…dules (#26494)

This PR was used to accumulate the upstack.
## Summary & Motivation

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
dpeng817 authored Dec 20, 2024
1 parent 4cc5726 commit 802ef32
Show file tree
Hide file tree
Showing 43 changed files with 1,378 additions and 768 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from docs_snippets.guides.dagster.asset_tutorial import cereal
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_cereal():
assets, source_assets, _ = assets_from_modules([cereal])
assert materialize([*assets, *source_assets])
assets = load_assets_from_modules([cereal])
assert materialize(assets)
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph
from docs_snippets.intro_tutorial.test_util import patch_cereal_requests


@patch_cereal_requests
def test_serial_asset_graph():
assets, source_assets, _ = assets_from_modules([serial_asset_graph])
assert materialize([*assets, *source_assets])
assets = load_assets_from_modules([serial_asset_graph])
assert materialize(assets)
16 changes: 10 additions & 6 deletions python_modules/dagster-test/dagster_test/toys/repo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import warnings
from typing import Sequence, cast

from dagster import ExperimentalWarning
from dagster._core.definitions.assets import AssetsDefinition
from dagster._time import get_current_timestamp

# squelch experimental warnings since we often include experimental things in toys for development
Expand Down Expand Up @@ -164,19 +166,21 @@ def column_schema_repository():
def table_metadata_repository():
from dagster_test.toys import table_metadata

return load_assets_from_modules([table_metadata])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([table_metadata]))


@repository
def long_asset_keys_repository():
from dagster_test.toys import long_asset_keys

return load_assets_from_modules([long_asset_keys])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([long_asset_keys]))


@repository # pyright: ignore[reportArgumentType]
@repository
def big_honkin_assets_repository():
return [load_assets_from_modules([big_honkin_asset_graph_module])]
return cast(
Sequence[AssetsDefinition], [load_assets_from_modules([big_honkin_asset_graph_module])]
)


@repository
Expand Down Expand Up @@ -208,11 +212,11 @@ def assets_with_sensors_repository():
def conditional_assets_repository():
from dagster_test.toys import conditional_assets

return load_assets_from_modules([conditional_assets])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([conditional_assets]))


@repository
def data_versions_repository():
from dagster_test.toys import data_versions

return load_assets_from_modules([data_versions])
return cast(Sequence[AssetsDefinition], load_assets_from_modules([data_versions]))
24 changes: 12 additions & 12 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,6 @@
InputMapping as InputMapping,
)
from dagster._core.definitions.job_definition import JobDefinition as JobDefinition
from dagster._core.definitions.load_asset_checks_from_modules import (
load_asset_checks_from_current_module as load_asset_checks_from_current_module,
load_asset_checks_from_modules as load_asset_checks_from_modules,
load_asset_checks_from_package_module as load_asset_checks_from_package_module,
load_asset_checks_from_package_name as load_asset_checks_from_package_name,
)
from dagster._core.definitions.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.logger_definition import (
LoggerDefinition as LoggerDefinition,
build_init_logger_context as build_init_logger_context,
Expand Down Expand Up @@ -309,6 +297,18 @@
TableRecord as TableRecord,
TableSchema as TableSchema,
)
from dagster._core.definitions.module_loaders.load_asset_checks_from_modules import (
load_asset_checks_from_current_module as load_asset_checks_from_current_module,
load_asset_checks_from_modules as load_asset_checks_from_modules,
load_asset_checks_from_package_module as load_asset_checks_from_package_module,
load_asset_checks_from_package_name as load_asset_checks_from_package_name,
)
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.multi_asset_sensor_definition import (
MultiAssetSensorDefinition as MultiAssetSensorDefinition,
MultiAssetSensorEvaluationContext as MultiAssetSensorEvaluationContext,
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/_core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,15 @@ def describe(self) -> str:


def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object:
from dagster._core.definitions.load_assets_from_modules import assets_from_modules
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_modules,
)
from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS

if fn_name == LOAD_ALL_ASSETS:
# LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular
# attribute, we should load all the assets in the module.
module_assets, module_source_assets, _ = assets_from_modules([module])
return [*module_assets, *module_source_assets]
return load_assets_from_modules([module])
else:
if not hasattr(module, fn_name):
raise DagsterInvariantViolationError(f"{fn_name} not found {error_suffix}")
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@
)
from dagster._core.definitions.graph_definition import GraphDefinition as GraphDefinition
from dagster._core.definitions.job_definition import JobDefinition as JobDefinition
from dagster._core.definitions.load_assets_from_modules import (
from dagster._core.definitions.materialize import (
materialize as materialize,
materialize_to_memory as materialize_to_memory,
)
from dagster._core.definitions.module_loaders.load_assets_from_modules import (
load_assets_from_current_module as load_assets_from_current_module,
load_assets_from_modules as load_assets_from_modules,
load_assets_from_package_module as load_assets_from_package_module,
load_assets_from_package_name as load_assets_from_package_name,
)
from dagster._core.definitions.materialize import (
materialize as materialize,
materialize_to_memory as materialize_to_memory,
)
from dagster._core.definitions.op_definition import OpDefinition as OpDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition as DynamicPartitionsDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ def get_python_identifier(self) -> str:
@property
def key(self) -> AssetCheckKey:
return AssetCheckKey(self.asset_key, self.name)

def replace_key(self, key: AssetCheckKey) -> "AssetCheckSpec":
return self._replace(asset_key=key.asset_key, name=key.name)
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def normalize_assets(
# AssetKey not subject to any further manipulation.
resolved_deps = ResolvedAssetDependencies(assets_defs, [])

input_asset_key_replacements = [
asset_key_replacements = [
{
raw_key: normalized_key
for input_name, raw_key in ad.keys_by_input_name.items()
Expand All @@ -218,8 +218,8 @@ def normalize_assets(

# Only update the assets defs if we're actually replacing input asset keys
assets_defs = [
ad.with_attributes(input_asset_key_replacements=reps) if reps else ad
for ad, reps in zip(assets_defs, input_asset_key_replacements)
ad.with_attributes(asset_key_replacements=reps) if reps else ad
for ad, reps in zip(assets_defs, asset_key_replacements)
]

# Create unexecutable external assets definitions for any referenced keys for which no
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ def from_db_string(db_string: str) -> Optional["AssetCheckKey"]:
def to_db_string(self) -> str:
return seven.json.dumps({"asset_key": self.asset_key.to_string(), "check_name": self.name})

def with_asset_key_prefix(self, prefix: CoercibleToAssetKeyPrefix) -> "AssetCheckKey":
return AssetCheckKey(self.asset_key.with_prefix(prefix), self.name)

def replace_asset_key(self, asset_key: AssetKey) -> "AssetCheckKey":
return AssetCheckKey(asset_key, self.name)


EntityKey = Union[AssetKey, AssetCheckKey]
T_EntityKey = TypeVar("T_EntityKey", AssetKey, AssetCheckKey, EntityKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def replace_attributes(
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
Expand Down
78 changes: 50 additions & 28 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings

if TYPE_CHECKING:
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.graph_definition import GraphDefinition

ASSET_SUBSET_INPUT_PREFIX = "__subset_input__"
Expand Down Expand Up @@ -1171,11 +1172,34 @@ def get_op_def_for_asset_key(self, key: AssetKey) -> Optional[OpDefinition]:
output_name = self.get_output_name_for_asset_key(key)
return self.node_def.resolve_output_to_origin_op_def(output_name)

def coerce_to_checks_def(self) -> "AssetChecksDefinition":
from dagster._core.definitions.asset_checks import (
AssetChecksDefinition,
has_only_asset_checks,
)

if not has_only_asset_checks(self):
raise DagsterInvalidDefinitionError(
"Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains "
"non-check assets."
)
if len(self.check_keys) == 0:
raise DagsterInvalidDefinitionError(
"Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains no "
"checks."
)
return AssetChecksDefinition.create(
keys_by_input_name=self.keys_by_input_name,
node_def=self.op,
check_specs_by_output_name=self.check_specs_by_output_name,
resource_defs=self.resource_defs,
can_subset=self.can_subset,
)

def with_attributes(
self,
*,
output_asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
input_asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
asset_key_replacements: Mapping[AssetKey, AssetKey] = {},
group_names_by_key: Mapping[AssetKey, str] = {},
tags_by_key: Mapping[AssetKey, Mapping[str, str]] = {},
freshness_policy: Optional[
Expand Down Expand Up @@ -1220,16 +1244,13 @@ def update_replace_dict_and_conflicts(
default_value=DEFAULT_GROUP_NAME,
)

if key in output_asset_key_replacements:
replace_dict["key"] = output_asset_key_replacements[key]
if key in asset_key_replacements:
replace_dict["key"] = asset_key_replacements[key]

if input_asset_key_replacements or output_asset_key_replacements:
if asset_key_replacements:
new_deps = []
for dep in spec.deps:
replacement_key = input_asset_key_replacements.get(
dep.asset_key,
output_asset_key_replacements.get(dep.asset_key),
)
replacement_key = asset_key_replacements.get(dep.asset_key, dep.asset_key)
if replacement_key is not None:
new_deps.append(dep._replace(asset_key=replacement_key))
else:
Expand All @@ -1246,33 +1267,31 @@ def update_replace_dict_and_conflicts(
)

check_specs_by_output_name = {
output_name: check_spec._replace(
asset_key=output_asset_key_replacements.get(
check_spec.asset_key, check_spec.asset_key
output_name: check_spec.replace_key(
key=check_spec.key.replace_asset_key(
asset_key_replacements.get(check_spec.asset_key, check_spec.asset_key)
)
)
for output_name, check_spec in self.node_check_specs_by_output_name.items()
}

selected_asset_check_keys = {
check_key._replace(
asset_key=output_asset_key_replacements.get(
check_key.asset_key, check_key.asset_key
)
check_key.replace_asset_key(
asset_key_replacements.get(check_key.asset_key, check_key.asset_key)
)
for check_key in self.check_keys
}

replaced_attributes = dict(
keys_by_input_name={
input_name: input_asset_key_replacements.get(key, key)
input_name: asset_key_replacements.get(key, key)
for input_name, key in self.node_keys_by_input_name.items()
},
keys_by_output_name={
output_name: output_asset_key_replacements.get(key, key)
output_name: asset_key_replacements.get(key, key)
for output_name, key in self.node_keys_by_output_name.items()
},
selected_asset_keys={output_asset_key_replacements.get(key, key) for key in self.keys},
selected_asset_keys={asset_key_replacements.get(key, key) for key in self.keys},
backfill_policy=backfill_policy if backfill_policy else self.backfill_policy,
is_subset=self.is_subset,
check_specs_by_output_name=check_specs_by_output_name,
Expand Down Expand Up @@ -1859,15 +1878,18 @@ def replace_specs_on_asset(
from dagster._builtins import Nothing
from dagster._core.definitions.input import In

new_deps = set().union(*(spec.deps for spec in replaced_specs))
previous_deps = set().union(*(spec.deps for spec in assets_def.specs))
added_deps = new_deps - previous_deps
removed_deps = previous_deps - new_deps
remaining_original_deps = previous_deps - removed_deps
new_deps_by_key = {dep.asset_key: dep for spec in replaced_specs for dep in spec.deps}
previous_deps_by_key = {dep.asset_key: dep for spec in assets_def.specs for dep in spec.deps}
added_dep_keys = set(new_deps_by_key.keys()) - set(previous_deps_by_key.keys())
removed_dep_keys = set(previous_deps_by_key.keys()) - set(new_deps_by_key.keys())
remaining_original_deps_by_key = {
key: previous_deps_by_key[key]
for key in set(previous_deps_by_key.keys()) - removed_dep_keys
}
original_key_to_input_mapping = reverse_dict(assets_def.node_keys_by_input_name)

# If there are no changes to the dependency structure, we don't need to make any changes to the underlying node.
if not assets_def.is_executable or (not added_deps and not removed_deps):
if not assets_def.is_executable or (not added_dep_keys and not removed_dep_keys):
return assets_def.__class__.dagster_internal_init(
**{**assets_def.get_attributes_dict(), "specs": replaced_specs}
)
Expand All @@ -1881,15 +1903,15 @@ def replace_specs_on_asset(
"Can only add additional deps to an op-backed asset.",
)
# for each deleted dep, we need to make sure it is not an argument-based dep. Argument-based deps cannot be removed.
for dep in removed_deps:
for dep_key in removed_dep_keys:
dep = previous_deps_by_key[dep_key]
input_name = original_key_to_input_mapping[dep.asset_key]
input_def = assets_def.node_def.input_def_named(input_name)
check.invariant(
input_def.dagster_type.is_nothing,
f"Attempted to remove argument-backed dependency {dep.asset_key} (mapped to argument {input_name}) from the asset. Only non-argument dependencies can be changed or removed using map_asset_specs.",
)

remaining_original_deps_by_key = {dep.asset_key: dep for dep in remaining_original_deps}
remaining_ins = {
input_name: the_in
for input_name, the_in in assets_def.node_def.input_dict.items()
Expand All @@ -1899,7 +1921,7 @@ def replace_specs_on_asset(
remaining_ins,
{
stringify_asset_key_to_input_name(dep.asset_key): In(dagster_type=Nothing)
for dep in new_deps
for dep in new_deps_by_key.values()
},
)

Expand Down
Loading

1 comment on commit 802ef32

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-h4ror661m-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 802ef32.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.