diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py index 4f435b2a491ff..d693b73d05388 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_cereal.py @@ -1,10 +1,12 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + load_assets_from_modules, +) from docs_snippets.guides.dagster.asset_tutorial import cereal from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @patch_cereal_requests def test_cereal(): - assets, source_assets, _ = assets_from_modules([cereal]) - assert materialize([*assets, *source_assets]) + assets = load_assets_from_modules([cereal]) + assert materialize(assets) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py index a68ed34ea861f..80f95ee6e920e 100644 --- a/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/asset_tutorial_tests/test_serial_asset_graph.py @@ -1,10 +1,12 @@ -from dagster._core.definitions.load_assets_from_modules import assets_from_modules from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + load_assets_from_modules, +) from docs_snippets.guides.dagster.asset_tutorial import serial_asset_graph from docs_snippets.intro_tutorial.test_util import patch_cereal_requests @patch_cereal_requests def test_serial_asset_graph(): - assets, source_assets, _ = assets_from_modules([serial_asset_graph]) - assert materialize([*assets, *source_assets]) + assets = load_assets_from_modules([serial_asset_graph]) + assert materialize(assets) diff --git a/python_modules/dagster-test/dagster_test/toys/repo.py b/python_modules/dagster-test/dagster_test/toys/repo.py index 317d0f7339652..3f5b7177e57c1 100644 --- a/python_modules/dagster-test/dagster_test/toys/repo.py +++ b/python_modules/dagster-test/dagster_test/toys/repo.py @@ -1,6 +1,8 @@ import warnings +from typing import Sequence, cast from dagster import ExperimentalWarning +from dagster._core.definitions.assets import AssetsDefinition from dagster._time import get_current_timestamp # squelch experimental warnings since we often include experimental things in toys for development @@ -164,19 +166,21 @@ def column_schema_repository(): def table_metadata_repository(): from dagster_test.toys import table_metadata - return load_assets_from_modules([table_metadata]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([table_metadata])) @repository def long_asset_keys_repository(): from dagster_test.toys import long_asset_keys - return load_assets_from_modules([long_asset_keys]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([long_asset_keys])) -@repository # pyright: ignore[reportArgumentType] +@repository def big_honkin_assets_repository(): - return [load_assets_from_modules([big_honkin_asset_graph_module])] + return cast( + Sequence[AssetsDefinition], [load_assets_from_modules([big_honkin_asset_graph_module])] + ) @repository @@ -208,11 +212,11 @@ def assets_with_sensors_repository(): def conditional_assets_repository(): from dagster_test.toys import conditional_assets - return load_assets_from_modules([conditional_assets]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([conditional_assets])) @repository def data_versions_repository(): from dagster_test.toys import data_versions - return load_assets_from_modules([data_versions]) + return cast(Sequence[AssetsDefinition], load_assets_from_modules([data_versions])) diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index 8c5ee3805c460..441135c0c3217 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -250,18 +250,6 @@ InputMapping as InputMapping, ) from dagster._core.definitions.job_definition import JobDefinition as JobDefinition -from dagster._core.definitions.load_asset_checks_from_modules import ( - load_asset_checks_from_current_module as load_asset_checks_from_current_module, - load_asset_checks_from_modules as load_asset_checks_from_modules, - load_asset_checks_from_package_module as load_asset_checks_from_package_module, - load_asset_checks_from_package_name as load_asset_checks_from_package_name, -) -from dagster._core.definitions.load_assets_from_modules import ( - load_assets_from_current_module as load_assets_from_current_module, - load_assets_from_modules as load_assets_from_modules, - load_assets_from_package_module as load_assets_from_package_module, - load_assets_from_package_name as load_assets_from_package_name, -) from dagster._core.definitions.logger_definition import ( LoggerDefinition as LoggerDefinition, build_init_logger_context as build_init_logger_context, @@ -309,6 +297,18 @@ TableRecord as TableRecord, TableSchema as TableSchema, ) +from dagster._core.definitions.module_loaders.load_asset_checks_from_modules import ( + load_asset_checks_from_current_module as load_asset_checks_from_current_module, + load_asset_checks_from_modules as load_asset_checks_from_modules, + load_asset_checks_from_package_module as load_asset_checks_from_package_module, + load_asset_checks_from_package_name as load_asset_checks_from_package_name, +) +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + load_assets_from_current_module as load_assets_from_current_module, + load_assets_from_modules as load_assets_from_modules, + load_assets_from_package_module as load_assets_from_package_module, + load_assets_from_package_name as load_assets_from_package_name, +) from dagster._core.definitions.multi_asset_sensor_definition import ( MultiAssetSensorDefinition as MultiAssetSensorDefinition, MultiAssetSensorEvaluationContext as MultiAssetSensorEvaluationContext, diff --git a/python_modules/dagster/dagster/_core/code_pointer.py b/python_modules/dagster/dagster/_core/code_pointer.py index cda304cc0f418..915ecf29b7b60 100644 --- a/python_modules/dagster/dagster/_core/code_pointer.py +++ b/python_modules/dagster/dagster/_core/code_pointer.py @@ -184,14 +184,15 @@ def describe(self) -> str: def _load_target_from_module(module: ModuleType, fn_name: str, error_suffix: str) -> object: - from dagster._core.definitions.load_assets_from_modules import assets_from_modules + from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + load_assets_from_modules, + ) from dagster._core.workspace.autodiscovery import LOAD_ALL_ASSETS if fn_name == LOAD_ALL_ASSETS: # LOAD_ALL_ASSETS is a special symbol that's returned when, instead of loading a particular # attribute, we should load all the assets in the module. - module_assets, module_source_assets, _ = assets_from_modules([module]) - return [*module_assets, *module_source_assets] + return load_assets_from_modules([module]) else: if not hasattr(module, fn_name): raise DagsterInvariantViolationError(f"{fn_name} not found {error_suffix}") diff --git a/python_modules/dagster/dagster/_core/definitions/__init__.py b/python_modules/dagster/dagster/_core/definitions/__init__.py index e8ad5b2d6d2a5..75fb66abfa513 100644 --- a/python_modules/dagster/dagster/_core/definitions/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/__init__.py @@ -148,16 +148,16 @@ ) from dagster._core.definitions.graph_definition import GraphDefinition as GraphDefinition from dagster._core.definitions.job_definition import JobDefinition as JobDefinition -from dagster._core.definitions.load_assets_from_modules import ( +from dagster._core.definitions.materialize import ( + materialize as materialize, + materialize_to_memory as materialize_to_memory, +) +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( load_assets_from_current_module as load_assets_from_current_module, load_assets_from_modules as load_assets_from_modules, load_assets_from_package_module as load_assets_from_package_module, load_assets_from_package_name as load_assets_from_package_name, ) -from dagster._core.definitions.materialize import ( - materialize as materialize, - materialize_to_memory as materialize_to_memory, -) from dagster._core.definitions.op_definition import OpDefinition as OpDefinition from dagster._core.definitions.partition import ( DynamicPartitionsDefinition as DynamicPartitionsDefinition, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py index b25243bda00f6..99ca04779d492 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py @@ -116,3 +116,6 @@ def get_python_identifier(self) -> str: @property def key(self) -> AssetCheckKey: return AssetCheckKey(self.asset_key, self.name) + + def replace_key(self, key: AssetCheckKey) -> "AssetCheckSpec": + return self._replace(asset_key=key.asset_key, name=key.name) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 57ac569366530..f4f64b68098d6 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -204,7 +204,7 @@ def normalize_assets( # AssetKey not subject to any further manipulation. resolved_deps = ResolvedAssetDependencies(assets_defs, []) - input_asset_key_replacements = [ + asset_key_replacements = [ { raw_key: normalized_key for input_name, raw_key in ad.keys_by_input_name.items() @@ -218,8 +218,8 @@ def normalize_assets( # Only update the assets defs if we're actually replacing input asset keys assets_defs = [ - ad.with_attributes(input_asset_key_replacements=reps) if reps else ad - for ad, reps in zip(assets_defs, input_asset_key_replacements) + ad.with_attributes(asset_key_replacements=reps) if reps else ad + for ad, reps in zip(assets_defs, asset_key_replacements) ] # Create unexecutable external assets definitions for any referenced keys for which no diff --git a/python_modules/dagster/dagster/_core/definitions/asset_key.py b/python_modules/dagster/dagster/_core/definitions/asset_key.py index 70f691cdd60c3..607f2f420432b 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_key.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_key.py @@ -256,6 +256,12 @@ def from_db_string(db_string: str) -> Optional["AssetCheckKey"]: def to_db_string(self) -> str: return seven.json.dumps({"asset_key": self.asset_key.to_string(), "check_name": self.name}) + def with_asset_key_prefix(self, prefix: CoercibleToAssetKeyPrefix) -> "AssetCheckKey": + return AssetCheckKey(self.asset_key.with_prefix(prefix), self.name) + + def replace_asset_key(self, asset_key: AssetKey) -> "AssetCheckKey": + return AssetCheckKey(asset_key, self.name) + EntityKey = Union[AssetKey, AssetCheckKey] T_EntityKey = TypeVar("T_EntityKey", AssetKey, AssetCheckKey, EntityKey) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 12ff89d279894..b2e52fe8bf5d9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -313,6 +313,7 @@ def replace_attributes( tags: Optional[Mapping[str, str]] = ..., kinds: Optional[Set[str]] = ..., partitions_def: Optional[PartitionsDefinition] = ..., + freshness_policy: Optional[FreshnessPolicy] = ..., ) -> "AssetSpec": """Returns a new AssetSpec with the specified attributes replaced.""" current_tags_without_kinds = { diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 1eb903545f868..d2c7aec174c21 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -80,6 +80,7 @@ from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings if TYPE_CHECKING: + from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.graph_definition import GraphDefinition ASSET_SUBSET_INPUT_PREFIX = "__subset_input__" @@ -1171,11 +1172,34 @@ def get_op_def_for_asset_key(self, key: AssetKey) -> Optional[OpDefinition]: output_name = self.get_output_name_for_asset_key(key) return self.node_def.resolve_output_to_origin_op_def(output_name) + def coerce_to_checks_def(self) -> "AssetChecksDefinition": + from dagster._core.definitions.asset_checks import ( + AssetChecksDefinition, + has_only_asset_checks, + ) + + if not has_only_asset_checks(self): + raise DagsterInvalidDefinitionError( + "Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains " + "non-check assets." + ) + if len(self.check_keys) == 0: + raise DagsterInvalidDefinitionError( + "Cannot coerce an AssetsDefinition to an AssetChecksDefinition if it contains no " + "checks." + ) + return AssetChecksDefinition.create( + keys_by_input_name=self.keys_by_input_name, + node_def=self.op, + check_specs_by_output_name=self.check_specs_by_output_name, + resource_defs=self.resource_defs, + can_subset=self.can_subset, + ) + def with_attributes( self, *, - output_asset_key_replacements: Mapping[AssetKey, AssetKey] = {}, - input_asset_key_replacements: Mapping[AssetKey, AssetKey] = {}, + asset_key_replacements: Mapping[AssetKey, AssetKey] = {}, group_names_by_key: Mapping[AssetKey, str] = {}, tags_by_key: Mapping[AssetKey, Mapping[str, str]] = {}, freshness_policy: Optional[ @@ -1220,16 +1244,13 @@ def update_replace_dict_and_conflicts( default_value=DEFAULT_GROUP_NAME, ) - if key in output_asset_key_replacements: - replace_dict["key"] = output_asset_key_replacements[key] + if key in asset_key_replacements: + replace_dict["key"] = asset_key_replacements[key] - if input_asset_key_replacements or output_asset_key_replacements: + if asset_key_replacements: new_deps = [] for dep in spec.deps: - replacement_key = input_asset_key_replacements.get( - dep.asset_key, - output_asset_key_replacements.get(dep.asset_key), - ) + replacement_key = asset_key_replacements.get(dep.asset_key, dep.asset_key) if replacement_key is not None: new_deps.append(dep._replace(asset_key=replacement_key)) else: @@ -1246,33 +1267,31 @@ def update_replace_dict_and_conflicts( ) check_specs_by_output_name = { - output_name: check_spec._replace( - asset_key=output_asset_key_replacements.get( - check_spec.asset_key, check_spec.asset_key + output_name: check_spec.replace_key( + key=check_spec.key.replace_asset_key( + asset_key_replacements.get(check_spec.asset_key, check_spec.asset_key) ) ) for output_name, check_spec in self.node_check_specs_by_output_name.items() } selected_asset_check_keys = { - check_key._replace( - asset_key=output_asset_key_replacements.get( - check_key.asset_key, check_key.asset_key - ) + check_key.replace_asset_key( + asset_key_replacements.get(check_key.asset_key, check_key.asset_key) ) for check_key in self.check_keys } replaced_attributes = dict( keys_by_input_name={ - input_name: input_asset_key_replacements.get(key, key) + input_name: asset_key_replacements.get(key, key) for input_name, key in self.node_keys_by_input_name.items() }, keys_by_output_name={ - output_name: output_asset_key_replacements.get(key, key) + output_name: asset_key_replacements.get(key, key) for output_name, key in self.node_keys_by_output_name.items() }, - selected_asset_keys={output_asset_key_replacements.get(key, key) for key in self.keys}, + selected_asset_keys={asset_key_replacements.get(key, key) for key in self.keys}, backfill_policy=backfill_policy if backfill_policy else self.backfill_policy, is_subset=self.is_subset, check_specs_by_output_name=check_specs_by_output_name, @@ -1859,15 +1878,18 @@ def replace_specs_on_asset( from dagster._builtins import Nothing from dagster._core.definitions.input import In - new_deps = set().union(*(spec.deps for spec in replaced_specs)) - previous_deps = set().union(*(spec.deps for spec in assets_def.specs)) - added_deps = new_deps - previous_deps - removed_deps = previous_deps - new_deps - remaining_original_deps = previous_deps - removed_deps + new_deps_by_key = {dep.asset_key: dep for spec in replaced_specs for dep in spec.deps} + previous_deps_by_key = {dep.asset_key: dep for spec in assets_def.specs for dep in spec.deps} + added_dep_keys = set(new_deps_by_key.keys()) - set(previous_deps_by_key.keys()) + removed_dep_keys = set(previous_deps_by_key.keys()) - set(new_deps_by_key.keys()) + remaining_original_deps_by_key = { + key: previous_deps_by_key[key] + for key in set(previous_deps_by_key.keys()) - removed_dep_keys + } original_key_to_input_mapping = reverse_dict(assets_def.node_keys_by_input_name) # If there are no changes to the dependency structure, we don't need to make any changes to the underlying node. - if not assets_def.is_executable or (not added_deps and not removed_deps): + if not assets_def.is_executable or (not added_dep_keys and not removed_dep_keys): return assets_def.__class__.dagster_internal_init( **{**assets_def.get_attributes_dict(), "specs": replaced_specs} ) @@ -1881,7 +1903,8 @@ def replace_specs_on_asset( "Can only add additional deps to an op-backed asset.", ) # for each deleted dep, we need to make sure it is not an argument-based dep. Argument-based deps cannot be removed. - for dep in removed_deps: + for dep_key in removed_dep_keys: + dep = previous_deps_by_key[dep_key] input_name = original_key_to_input_mapping[dep.asset_key] input_def = assets_def.node_def.input_def_named(input_name) check.invariant( @@ -1889,7 +1912,6 @@ def replace_specs_on_asset( f"Attempted to remove argument-backed dependency {dep.asset_key} (mapped to argument {input_name}) from the asset. Only non-argument dependencies can be changed or removed using map_asset_specs.", ) - remaining_original_deps_by_key = {dep.asset_key: dep for dep in remaining_original_deps} remaining_ins = { input_name: the_in for input_name, the_in in assets_def.node_def.input_dict.items() @@ -1899,7 +1921,7 @@ def replace_specs_on_asset( remaining_ins, { stringify_asset_key_to_input_name(dep.asset_key): In(dagster_type=Nothing) - for dep in new_deps + for dep in new_deps_by_key.values() }, ) diff --git a/python_modules/dagster/dagster/_core/definitions/cacheable_assets.py b/python_modules/dagster/dagster/_core/definitions/cacheable_assets.py index 5445636a14ed6..70ea705998d90 100644 --- a/python_modules/dagster/dagster/_core/definitions/cacheable_assets.py +++ b/python_modules/dagster/dagster/_core/definitions/cacheable_assets.py @@ -153,8 +153,7 @@ def with_resources( def with_attributes( self, - output_asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, - input_asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, + asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, group_names_by_key: Optional[Mapping[AssetKey, str]] = None, freshness_policy: Optional[ Union[FreshnessPolicy, Mapping[AssetKey, FreshnessPolicy]] @@ -162,8 +161,7 @@ def with_attributes( ) -> "CacheableAssetsDefinition": return PrefixOrGroupWrappedCacheableAssetsDefinition( self, - output_asset_key_replacements=output_asset_key_replacements, - input_asset_key_replacements=input_asset_key_replacements, + asset_key_replacements=asset_key_replacements, group_names_by_key=group_names_by_key, freshness_policy=freshness_policy, ) @@ -247,8 +245,7 @@ class PrefixOrGroupWrappedCacheableAssetsDefinition(WrappedCacheableAssetsDefini def __init__( self, wrapped: CacheableAssetsDefinition, - output_asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, - input_asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, + asset_key_replacements: Optional[Mapping[AssetKey, AssetKey]] = None, group_names_by_key: Optional[Mapping[AssetKey, str]] = None, group_name_for_all_assets: Optional[str] = None, prefix_for_all_assets: Optional[List[str]] = None, @@ -260,8 +257,7 @@ def __init__( ] = None, backfill_policy: Optional[BackfillPolicy] = None, ): - self._output_asset_key_replacements = output_asset_key_replacements or {} - self._input_asset_key_replacements = input_asset_key_replacements or {} + self._asset_key_replacements = asset_key_replacements or {} self._group_names_by_key = group_names_by_key or {} self._group_name_for_all_assets = group_name_for_all_assets self._prefix_for_all_assets = prefix_for_all_assets @@ -274,12 +270,8 @@ def __init__( "Cannot set both group_name_for_all_assets and group_names_by_key", ) check.invariant( - not ( - prefix_for_all_assets - and (output_asset_key_replacements or input_asset_key_replacements) - ), - "Cannot set both prefix_for_all_assets and output_asset_key_replacements or" - " input_asset_key_replacements", + not (prefix_for_all_assets and (asset_key_replacements)), + "Cannot set both prefix_for_all_assets and asset_key_replacements", ) super().__init__( @@ -290,22 +282,10 @@ def __init__( def _get_hash(self) -> str: """Generate a stable hash of the various prefix/group mappings.""" contents = hashlib.sha1() - if self._output_asset_key_replacements: + if self._asset_key_replacements: contents.update( _map_to_hashable( - { - tuple(k.path): tuple(v.path) - for k, v in self._output_asset_key_replacements.items() - } - ) - ) - if self._input_asset_key_replacements: - contents.update( - _map_to_hashable( - { - tuple(k.path): tuple(v.path) - for k, v in self._input_asset_key_replacements.items() - } + {tuple(k.path): tuple(v.path) for k, v in self._asset_key_replacements.items()} ) ) if self._group_names_by_key: @@ -330,33 +310,13 @@ def transformed_assets_def(self, assets_def: AssetsDefinition) -> AssetsDefiniti if self._group_name_for_all_assets else self._group_names_by_key ) - output_asset_key_replacements = ( - { - k: AssetKey( - path=( - self._prefix_for_all_assets + list(k.path) - if self._prefix_for_all_assets - else k.path - ) - ) - for k in assets_def.keys - } - if self._prefix_for_all_assets - else self._output_asset_key_replacements - ) - input_asset_key_replacements = ( + asset_key_replacements = ( { - k: AssetKey( - path=( - self._prefix_for_all_assets + list(k.path) - if self._prefix_for_all_assets - else k.path - ) - ) - for k in assets_def.dependency_keys + k: k.with_prefix(self._prefix_for_all_assets) if self._prefix_for_all_assets else k + for k in assets_def.keys | set(assets_def.dependency_keys) } if self._prefix_for_all_assets - else self._input_asset_key_replacements + else self._asset_key_replacements ) if isinstance(self._auto_materialize_policy, dict): automation_condition = { @@ -367,8 +327,7 @@ def transformed_assets_def(self, assets_def: AssetsDefinition) -> AssetsDefiniti else: automation_condition = None return assets_def.with_attributes( - output_asset_key_replacements=output_asset_key_replacements, - input_asset_key_replacements=input_asset_key_replacements, + asset_key_replacements=asset_key_replacements, group_names_by_key=group_names_by_key, freshness_policy=self._freshness_policy, automation_condition=automation_condition, diff --git a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py deleted file mode 100644 index 284d76795e505..0000000000000 --- a/python_modules/dagster/dagster/_core/definitions/load_assets_from_modules.py +++ /dev/null @@ -1,511 +0,0 @@ -import inspect -import pkgutil -from importlib import import_module -from types import ModuleType -from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Type, Union, cast - -import dagster._check as check -from dagster._core.definitions.asset_key import ( - AssetKey, - CoercibleToAssetKeyPrefix, - check_opt_coercible_to_asset_key_prefix_param, -) -from dagster._core.definitions.assets import AssetsDefinition -from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy -from dagster._core.definitions.backfill_policy import BackfillPolicy -from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition -from dagster._core.definitions.declarative_automation.automation_condition import ( - AutomationCondition, -) -from dagster._core.definitions.freshness_policy import FreshnessPolicy -from dagster._core.definitions.source_asset import SourceAsset -from dagster._core.definitions.utils import resolve_automation_condition -from dagster._core.errors import DagsterInvalidDefinitionError - - -def find_objects_in_module_of_types( - module: ModuleType, - types: Union[Type, Tuple[Type, ...]], -) -> Iterator: - """Yields instances or subclasses of the given type(s).""" - for attr in dir(module): - value = getattr(module, attr) - if isinstance(value, types): - yield value - elif isinstance(value, list) and all(isinstance(el, types) for el in value): - yield from value - - -def find_subclasses_in_module( - module: ModuleType, - types: Union[Type, Tuple[Type, ...]], -) -> Iterator: - """Yields instances or subclasses of the given type(s).""" - for attr in dir(module): - value = getattr(module, attr) - if isinstance(value, type) and issubclass(value, types): - yield value - - -def assets_from_modules( - modules: Iterable[ModuleType], extra_source_assets: Optional[Sequence[SourceAsset]] = None -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable - assets from the given modules. - - Args: - modules (Iterable[ModuleType]): The Python modules to look for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. - - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]]: - A tuple containing a list of assets, a list of source assets, and a list of - cacheable assets defined in the given modules. - """ - asset_ids: Set[int] = set() - asset_keys: Dict[AssetKey, ModuleType] = dict() - source_assets: List[SourceAsset] = list( - check.opt_sequence_param(extra_source_assets, "extra_source_assets", of_type=SourceAsset) - ) - cacheable_assets: List[CacheableAssetsDefinition] = [] - assets: Dict[AssetKey, AssetsDefinition] = {} - for module in modules: - for asset in find_objects_in_module_of_types( - module, (AssetsDefinition, SourceAsset, CacheableAssetsDefinition) - ): - asset = cast(Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition], asset) - if id(asset) not in asset_ids: - asset_ids.add(id(asset)) - if isinstance(asset, CacheableAssetsDefinition): - cacheable_assets.append(asset) - else: - keys = asset.keys if isinstance(asset, AssetsDefinition) else [asset.key] - for key in keys: - if key in asset_keys: - modules_str = ", ".join( - set([asset_keys[key].__name__, module.__name__]) - ) - error_str = ( - f"Asset key {key} is defined multiple times. Definitions found in" - f" modules: {modules_str}. " - ) - - if key in assets and isinstance(asset, AssetsDefinition): - if assets[key].node_def == asset.node_def: - error_str += ( - "One possible cause of this bug is a call to with_resources" - " outside of a repository definition, causing a duplicate" - " asset definition." - ) - - raise DagsterInvalidDefinitionError(error_str) - else: - asset_keys[key] = module - if isinstance(asset, AssetsDefinition): - assets[key] = asset - if isinstance(asset, SourceAsset): - source_assets.append(asset) - return list(set(assets.values())), source_assets, cacheable_assets - - -def load_assets_from_modules( - modules: Iterable[ModuleType], - group_name: Optional[str] = None, - key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, - *, - freshness_policy: Optional[FreshnessPolicy] = None, - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, - automation_condition: Optional[AutomationCondition] = None, - backfill_policy: Optional[BackfillPolicy] = None, - source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - """Constructs a list of assets and source assets from the given modules. - - Args: - modules (Iterable[ModuleType]): The Python modules to look for assets inside. - group_name (Optional[str]): - Group name to apply to the loaded assets. The returned assets will be copies of the - loaded objects, with the group name added. - key_prefix (Optional[Union[str, Sequence[str]]]): - Prefix to prepend to the keys of the loaded assets. The returned assets will be copies - of the loaded objects, with the prefix prepended. - freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded - assets. - automation_condition (Optional[AutomationCondition]): AutomationCondition to apply - to all the loaded assets. - backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. - source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned - assets will be copies of the loaded objects, with the prefix prepended. - - Returns: - Sequence[Union[AssetsDefinition, SourceAsset]]: - A list containing assets and source assets defined in the given modules. - """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_modules(modules) - - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, - freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, - ) - - -def load_assets_from_current_module( - group_name: Optional[str] = None, - key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, - *, - freshness_policy: Optional[FreshnessPolicy] = None, - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, - automation_condition: Optional[AutomationCondition] = None, - backfill_policy: Optional[BackfillPolicy] = None, - source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - """Constructs a list of assets, source assets, and cacheable assets from the module where - this function is called. - - Args: - group_name (Optional[str]): - Group name to apply to the loaded assets. The returned assets will be copies of the - loaded objects, with the group name added. - key_prefix (Optional[Union[str, Sequence[str]]]): - Prefix to prepend to the keys of the loaded assets. The returned assets will be copies - of the loaded objects, with the prefix prepended. - freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded - assets. - automation_condition (Optional[AutomationCondition]): AutomationCondition to apply - to all the loaded assets. - backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. - source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned - assets will be copies of the loaded objects, with the prefix prepended. - - Returns: - Sequence[Union[AssetsDefinition, SourceAsset, CachableAssetsDefinition]]: - A list containing assets, source assets, and cacheable assets defined in the module. - """ - caller = inspect.stack()[1] - module = inspect.getmodule(caller[0]) - if module is None: - check.failed("Could not find a module for the caller") - - return load_assets_from_modules( - [module], - group_name=group_name, - key_prefix=key_prefix, - freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, - ) - - -def assets_from_package_module( - package_module: ModuleType, - extra_source_assets: Optional[Sequence[SourceAsset]] = None, -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable assets - from the given package module. - - Args: - package_module (ModuleType): The package module to looks for assets inside. - extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the - group in addition to the source assets found in the modules. - - Returns: - Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: - A tuple containing a list of assets, a list of source assets, and a list of cacheable assets - defined in the given modules. - """ - return assets_from_modules( - find_modules_in_package(package_module), extra_source_assets=extra_source_assets - ) - - -def load_assets_from_package_module( - package_module: ModuleType, - group_name: Optional[str] = None, - key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, - *, - freshness_policy: Optional[FreshnessPolicy] = None, - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, - automation_condition: Optional[AutomationCondition] = None, - backfill_policy: Optional[BackfillPolicy] = None, - source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - """Constructs a list of assets and source assets that includes all asset - definitions, source assets, and cacheable assets in all sub-modules of the given package module. - - A package module is the result of importing a package. - - Args: - package_module (ModuleType): The package module to looks for assets inside. - group_name (Optional[str]): - Group name to apply to the loaded assets. The returned assets will be copies of the - loaded objects, with the group name added. - key_prefix (Optional[Union[str, Sequence[str]]]): - Prefix to prepend to the keys of the loaded assets. The returned assets will be copies - of the loaded objects, with the prefix prepended. - freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded - assets. - automation_condition (Optional[AutomationCondition]): AutomationCondition to apply - to all the loaded assets. - backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. - source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned - assets will be copies of the loaded objects, with the prefix prepended. - - Returns: - Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - A list containing assets, source assets, and cacheable assets defined in the module. - """ - group_name = check.opt_str_param(group_name, "group_name") - key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") - freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) - backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) - - ( - assets, - source_assets, - cacheable_assets, - ) = assets_from_package_module(package_module) - return assets_with_attributes( - assets, - source_assets, - cacheable_assets, - key_prefix=key_prefix, - group_name=group_name, - freshness_policy=freshness_policy, - automation_condition=resolve_automation_condition( - automation_condition, auto_materialize_policy - ), - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, - ) - - -def load_assets_from_package_name( - package_name: str, - group_name: Optional[str] = None, - key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, - *, - freshness_policy: Optional[FreshnessPolicy] = None, - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, - backfill_policy: Optional[BackfillPolicy] = None, - source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - """Constructs a list of assets, source assets, and cacheable assets that includes all asset - definitions and source assets in all sub-modules of the given package. - - Args: - package_name (str): The name of a Python package to look for assets inside. - group_name (Optional[str]): - Group name to apply to the loaded assets. The returned assets will be copies of the - loaded objects, with the group name added. - key_prefix (Optional[Union[str, Sequence[str]]]): - Prefix to prepend to the keys of the loaded assets. The returned assets will be copies - of the loaded objects, with the prefix prepended. - freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded - assets. - auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply - to all the loaded assets. - backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. - source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned - assets will be copies of the loaded objects, with the prefix prepended. - - Returns: - Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - A list containing assets, source assets, and cacheable assets defined in the module. - """ - package_module = import_module(package_name) - return load_assets_from_package_module( - package_module, - group_name=group_name, - key_prefix=key_prefix, - freshness_policy=freshness_policy, - auto_materialize_policy=auto_materialize_policy, - backfill_policy=backfill_policy, - source_key_prefix=source_key_prefix, - ) - - -def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: - yield package_module - if package_module.__file__: - for _, modname, is_pkg in pkgutil.walk_packages( - package_module.__path__, prefix=package_module.__name__ + "." - ): - submodule = import_module(modname) - if is_pkg: - yield from find_modules_in_package(submodule) - else: - yield submodule - else: - raise ValueError( - f"Tried to find modules in package {package_module}, but its __file__ is None" - ) - - -def prefix_assets( - assets_defs: Sequence[AssetsDefinition], - key_prefix: CoercibleToAssetKeyPrefix, - source_assets: Sequence[SourceAsset], - source_key_prefix: Optional[CoercibleToAssetKeyPrefix], -) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]: - """Given a list of assets, prefix the input and output asset keys and check specs with key_prefix. - The prefix is not added to source assets. - - Input asset keys that reference other assets within assets_defs are "brought along" - - i.e. prefixed as well. - - Example with a single asset: - - .. code-block:: python - - @asset - def asset1(): - ... - - result = prefixed_asset_key_replacements([asset_1], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - - Example with dependencies within the list of assets: - - .. code-block:: python - - @asset - def asset1(): - ... - - @asset - def asset2(asset1): - ... - - result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix") - assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) - assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"]) - assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])} - - """ - asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys} - check_target_keys = { - key.asset_key for assets_def in assets_defs for key in assets_def.check_keys - } - source_asset_keys = {source_asset.key for source_asset in source_assets} - - if isinstance(key_prefix, str): - key_prefix = [key_prefix] - key_prefix = check.is_list(key_prefix, of_type=str) - - if isinstance(source_key_prefix, str): - source_key_prefix = [source_key_prefix] - - result_assets: List[AssetsDefinition] = [] - for assets_def in assets_defs: - output_asset_key_replacements = { - asset_key: AssetKey([*key_prefix, *asset_key.path]) - for asset_key in ( - assets_def.keys | {check_key.asset_key for check_key in assets_def.check_keys} - ) - } - input_asset_key_replacements = {} - for dep_asset_key in assets_def.keys_by_input_name.values(): - if dep_asset_key in asset_keys or dep_asset_key in check_target_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*key_prefix, *dep_asset_key.path] - ) - elif source_key_prefix and dep_asset_key in source_asset_keys: - input_asset_key_replacements[dep_asset_key] = AssetKey( - [*source_key_prefix, *dep_asset_key.path] - ) - - result_assets.append( - assets_def.with_attributes( - output_asset_key_replacements=output_asset_key_replacements, - input_asset_key_replacements=input_asset_key_replacements, - ) - ) - - if source_key_prefix: - result_source_assets = [ - source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path])) - for source_asset in source_assets - ] - else: - result_source_assets = source_assets - - return result_assets, result_source_assets - - -def assets_with_attributes( - assets_defs: Sequence[AssetsDefinition], - source_assets: Sequence[SourceAsset], - cacheable_assets: Sequence[CacheableAssetsDefinition], - key_prefix: Optional[Sequence[str]], - group_name: Optional[str], - freshness_policy: Optional[FreshnessPolicy], - automation_condition: Optional[AutomationCondition], - backfill_policy: Optional[BackfillPolicy], - source_key_prefix: Optional[Sequence[str]], -) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: - # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, - # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since - # at prefix-time it is not known that its dependency is one of the cacheable assets. - # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 - if key_prefix: - assets_defs, source_assets = prefix_assets( - assets_defs, key_prefix, source_assets, source_key_prefix - ) - cacheable_assets = [ - cached_asset.with_prefix_for_all(key_prefix) for cached_asset in cacheable_assets - ] - - if group_name or freshness_policy or automation_condition or backfill_policy: - assets_defs = [ - asset.with_attributes( - group_names_by_key=( - {asset_key: group_name for asset_key in asset.keys} - if group_name is not None - else {} - ), - freshness_policy=freshness_policy, - automation_condition=automation_condition, - backfill_policy=backfill_policy, - ) - for asset in assets_defs - ] - if group_name: - source_assets = [ - source_asset.with_attributes(group_name=group_name) - for source_asset in source_assets - ] - cacheable_assets = [ - cached_asset.with_attributes_for_all( - group_name, - freshness_policy=freshness_policy, - auto_materialize_policy=automation_condition.as_auto_materialize_policy() - if automation_condition - else None, - backfill_policy=backfill_policy, - ) - for cached_asset in cacheable_assets - ] - - return [*assets_defs, *source_assets, *cacheable_assets] diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py index 0ada758d6d752..69e7e3c6d27f8 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py @@ -16,7 +16,7 @@ from dagster._serdes import whitelist_for_serdes if TYPE_CHECKING: - from dagster._core.definitions.assets import AssetsDefinition, SourceAsset + from dagster._core.definitions.assets import AssetsDefinition, AssetSpec, SourceAsset from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition DEFAULT_SOURCE_FILE_KEY = "asset_definition" @@ -86,11 +86,11 @@ def namespace(cls) -> str: def _with_code_source_single_definition( - assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], -) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: + assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"], +) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]: from dagster._core.definitions.assets import AssetsDefinition - # SourceAsset doesn't have an op definition to point to - cacheable assets + # SourceAsset and AssetSpec don't have an op definition to point to - cacheable assets # will be supported eventually but are a bit trickier if not isinstance(assets_def, AssetsDefinition): return assets_def @@ -242,8 +242,8 @@ def convert_local_path_to_git_path( def _convert_local_path_to_git_path_single_definition( base_git_url: str, file_path_mapping: FilePathMapping, - assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], -) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: + assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"], +) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]: from dagster._core.definitions.assets import AssetsDefinition # SourceAsset doesn't have an op definition to point to - cacheable assets @@ -293,11 +293,13 @@ def _build_gitlab_url(url: str, branch: str) -> str: @experimental def link_code_references_to_git( - assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], + assets_defs: Sequence[ + Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"] + ], git_url: str, git_branch: str, file_path_mapping: FilePathMapping, -) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: +) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]]: """Wrapper function which converts local file path code references to source control URLs based on the provided source control URL and branch. @@ -353,8 +355,10 @@ def link_code_references_to_git( @experimental def with_source_code_references( - assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], -) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: + assets_defs: Sequence[ + Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"] + ], +) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition", "AssetSpec"]]: """Wrapper function which attaches local code reference metadata to the provided asset definitions. This points to the filepath and line number where the asset body is defined. diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/__init__.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/__init__.py rename to python_modules/dagster/dagster/_core/definitions/module_loaders/__init__.py diff --git a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py similarity index 68% rename from python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py rename to python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py index 5a5d3ec99af40..e56b63ebf3d72 100644 --- a/python_modules/dagster/dagster/_core/definitions/load_asset_checks_from_modules.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_asset_checks_from_modules.py @@ -1,51 +1,16 @@ import inspect from importlib import import_module from types import ModuleType -from typing import Iterable, Optional, Sequence, Set, cast +from typing import Iterable, Optional, Sequence import dagster._check as check -from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_key import ( CoercibleToAssetKeyPrefix, check_opt_coercible_to_asset_key_prefix_param, ) -from dagster._core.definitions.assets import AssetsDefinition -from dagster._core.definitions.load_assets_from_modules import ( - find_modules_in_package, - find_objects_in_module_of_types, - prefix_assets, -) - - -def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]: - checks = [] - ids: Set[int] = set() - for module in modules: - for c in find_objects_in_module_of_types(module, AssetsDefinition): - if has_only_asset_checks(c) and id(c) not in ids: - checks.append(cast(AssetChecksDefinition, c)) - ids.add(id(c)) - return checks - - -def _checks_with_attributes( - checks_defs: Sequence[AssetChecksDefinition], - asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, -) -> Sequence[AssetChecksDefinition]: - if asset_key_prefix: - modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None) - return [ - AssetChecksDefinition.create( - keys_by_input_name=c.keys_by_input_name, - node_def=c.op, - check_specs_by_output_name=c.check_specs_by_output_name, - resource_defs=c.resource_defs, - can_subset=c.can_subset, - ) - for c in modified_checks - ] - else: - return checks_defs +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterDefs +from dagster._core.definitions.module_loaders.utils import find_modules_in_package def load_asset_checks_from_modules( @@ -68,7 +33,19 @@ def load_asset_checks_from_modules( asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix) + return ( + ModuleScopedDagsterDefs.from_modules(modules) + .get_object_list() + .with_attributes( + key_prefix=asset_key_prefix, + source_key_prefix=None, + group_name=None, + freshness_policy=None, + automation_condition=None, + backfill_policy=None, + ) + .checks_defs + ) def load_asset_checks_from_current_module( @@ -95,9 +72,7 @@ def load_asset_checks_from_current_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules([module]), asset_key_prefix=asset_key_prefix - ) + return load_asset_checks_from_modules([module], asset_key_prefix=asset_key_prefix) def load_asset_checks_from_package_module( @@ -120,9 +95,8 @@ def load_asset_checks_from_package_module( asset_key_prefix, "asset_key_prefix" ) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) @@ -147,7 +121,6 @@ def load_asset_checks_from_package_name( ) package_module = import_module(package_name) - return _checks_with_attributes( - _checks_from_modules(find_modules_in_package(package_module)), - asset_key_prefix=asset_key_prefix, + return load_asset_checks_from_modules( + find_modules_in_package(package_module), asset_key_prefix=asset_key_prefix ) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py new file mode 100644 index 0000000000000..3e47ffb79af61 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_assets_from_modules.py @@ -0,0 +1,266 @@ +import inspect +from importlib import import_module +from types import ModuleType +from typing import Iterable, Iterator, Optional, Sequence, Tuple, Type, Union, cast + +import dagster._check as check +from dagster._core.definitions.asset_checks import has_only_asset_checks +from dagster._core.definitions.asset_key import ( + CoercibleToAssetKeyPrefix, + check_opt_coercible_to_asset_key_prefix_param, +) +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) +from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterDefs +from dagster._core.definitions.module_loaders.utils import find_modules_in_package +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.utils import resolve_automation_condition + + +def find_objects_in_module_of_types( + module: ModuleType, + types: Union[Type, Tuple[Type, ...]], +) -> Iterator: + """Yields instances or subclasses of the given type(s).""" + for attr in dir(module): + value = getattr(module, attr) + if isinstance(value, types): + yield value + elif isinstance(value, list) and all(isinstance(el, types) for el in value): + yield from value + + +def find_subclasses_in_module( + module: ModuleType, + types: Union[Type, Tuple[Type, ...]], +) -> Iterator: + """Yields instances or subclasses of the given type(s).""" + for attr in dir(module): + value = getattr(module, attr) + if isinstance(value, type) and issubclass(value, types): + yield value + + +def load_assets_from_modules( + modules: Iterable[ModuleType], + group_name: Optional[str] = None, + key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + *, + freshness_policy: Optional[FreshnessPolicy] = None, + auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + automation_condition: Optional[AutomationCondition] = None, + backfill_policy: Optional[BackfillPolicy] = None, + source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]]: + """Constructs a list of assets and source assets from the given modules. + + Args: + modules (Iterable[ModuleType]): The Python modules to look for assets inside. + group_name (Optional[str]): + Group name to apply to the loaded assets. The returned assets will be copies of the + loaded objects, with the group name added. + key_prefix (Optional[Union[str, Sequence[str]]]): + Prefix to prepend to the keys of the loaded assets. The returned assets will be copies + of the loaded objects, with the prefix prepended. + freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded + assets. + automation_condition (Optional[AutomationCondition]): AutomationCondition to apply + to all the loaded assets. + backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. + source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned + assets will be copies of the loaded objects, with the prefix prepended. + + Returns: + Sequence[Union[AssetsDefinition, SourceAsset]]: + A list containing assets and source assets defined in the given modules. + """ + + def _asset_filter(dagster_object) -> bool: + if isinstance(dagster_object, AssetsDefinition): + # We don't load asset checks with asset module loaders. + return not has_only_asset_checks(dagster_object) + if isinstance(dagster_object, AssetSpec): + return include_specs + return isinstance( + dagster_object, (AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec) + ) + + return cast( + Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]], + ModuleScopedDagsterDefs.from_modules(modules) + .get_object_list() + .with_attributes( + key_prefix=check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix"), + source_key_prefix=check_opt_coercible_to_asset_key_prefix_param( + source_key_prefix, "source_key_prefix" + ), + group_name=check.opt_str_param(group_name, "group_name"), + freshness_policy=check.opt_inst_param( + freshness_policy, "freshness_policy", FreshnessPolicy + ), + automation_condition=resolve_automation_condition( + automation_condition, auto_materialize_policy + ), + backfill_policy=check.opt_inst_param( + backfill_policy, "backfill_policy", BackfillPolicy + ), + ) + .get_objects(_asset_filter), + ) + + +def load_assets_from_current_module( + group_name: Optional[str] = None, + key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + *, + freshness_policy: Optional[FreshnessPolicy] = None, + auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + automation_condition: Optional[AutomationCondition] = None, + backfill_policy: Optional[BackfillPolicy] = None, + source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]]: + """Constructs a list of assets, source assets, and cacheable assets from the module where + this function is called. + + Args: + group_name (Optional[str]): + Group name to apply to the loaded assets. The returned assets will be copies of the + loaded objects, with the group name added. + key_prefix (Optional[Union[str, Sequence[str]]]): + Prefix to prepend to the keys of the loaded assets. The returned assets will be copies + of the loaded objects, with the prefix prepended. + freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded + assets. + automation_condition (Optional[AutomationCondition]): AutomationCondition to apply + to all the loaded assets. + backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. + source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned + assets will be copies of the loaded objects, with the prefix prepended. + + Returns: + Sequence[Union[AssetsDefinition, SourceAsset, CachableAssetsDefinition]]: + A list containing assets, source assets, and cacheable assets defined in the module. + """ + caller = inspect.stack()[1] + module = inspect.getmodule(caller[0]) + if module is None: + check.failed("Could not find a module for the caller") + + return load_assets_from_modules( + [module], + group_name=group_name, + key_prefix=key_prefix, + freshness_policy=freshness_policy, + automation_condition=resolve_automation_condition( + automation_condition, auto_materialize_policy + ), + backfill_policy=backfill_policy, + source_key_prefix=source_key_prefix, + include_specs=include_specs, + ) + + +def load_assets_from_package_module( + package_module: ModuleType, + group_name: Optional[str] = None, + key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + *, + freshness_policy: Optional[FreshnessPolicy] = None, + auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + automation_condition: Optional[AutomationCondition] = None, + backfill_policy: Optional[BackfillPolicy] = None, + source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]]: + """Constructs a list of assets and source assets that includes all asset + definitions, source assets, and cacheable assets in all sub-modules of the given package module. + + A package module is the result of importing a package. + + Args: + package_module (ModuleType): The package module to looks for assets inside. + group_name (Optional[str]): + Group name to apply to the loaded assets. The returned assets will be copies of the + loaded objects, with the group name added. + key_prefix (Optional[Union[str, Sequence[str]]]): + Prefix to prepend to the keys of the loaded assets. The returned assets will be copies + of the loaded objects, with the prefix prepended. + freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded + assets. + automation_condition (Optional[AutomationCondition]): AutomationCondition to apply + to all the loaded assets. + backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. + source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned + assets will be copies of the loaded objects, with the prefix prepended. + + Returns: + Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + A list containing assets, source assets, and cacheable assets defined in the module. + """ + return load_assets_from_modules( + [*find_modules_in_package(package_module)], + group_name, + key_prefix, + freshness_policy=freshness_policy, + auto_materialize_policy=auto_materialize_policy, + automation_condition=automation_condition, + backfill_policy=backfill_policy, + source_key_prefix=source_key_prefix, + include_specs=include_specs, + ) + + +def load_assets_from_package_name( + package_name: str, + group_name: Optional[str] = None, + key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + *, + freshness_policy: Optional[FreshnessPolicy] = None, + auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + backfill_policy: Optional[BackfillPolicy] = None, + source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, + include_specs: bool = False, +) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]]: + """Constructs a list of assets, source assets, and cacheable assets that includes all asset + definitions and source assets in all sub-modules of the given package. + + Args: + package_name (str): The name of a Python package to look for assets inside. + group_name (Optional[str]): + Group name to apply to the loaded assets. The returned assets will be copies of the + loaded objects, with the group name added. + key_prefix (Optional[Union[str, Sequence[str]]]): + Prefix to prepend to the keys of the loaded assets. The returned assets will be copies + of the loaded objects, with the prefix prepended. + freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded + assets. + auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply + to all the loaded assets. + backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. + source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned + assets will be copies of the loaded objects, with the prefix prepended. + + Returns: + Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: + A list containing assets, source assets, and cacheable assets defined in the module. + """ + package_module = import_module(package_name) + return load_assets_from_package_module( + package_module, + group_name=group_name, + key_prefix=key_prefix, + freshness_policy=freshness_policy, + auto_materialize_policy=auto_materialize_policy, + backfill_policy=backfill_policy, + source_key_prefix=source_key_prefix, + include_specs=include_specs, + ) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py new file mode 100644 index 0000000000000..2d39e67d5acf0 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py @@ -0,0 +1,22 @@ +from types import ModuleType +from typing import Any, Mapping, Optional, Union + +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.executor_definition import ExecutorDefinition +from dagster._core.definitions.logger_definition import LoggerDefinition +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterDefs +from dagster._core.executor.base import Executor + + +def load_definitions_from_module( + module: ModuleType, + resources: Optional[Mapping[str, Any]] = None, + loggers: Optional[Mapping[str, LoggerDefinition]] = None, + executor: Optional[Union[Executor, ExecutorDefinition]] = None, +) -> Definitions: + return Definitions( + **ModuleScopedDagsterDefs.from_modules([module]).get_object_list().to_definitions_args(), + resources=resources, + loggers=loggers, + executor=executor, + ) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py new file mode 100644 index 0000000000000..e55057f6ea0f0 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py @@ -0,0 +1,406 @@ +from collections import defaultdict +from functools import cached_property +from types import ModuleType +from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Union, cast, get_args + +from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks +from dagster._core.definitions.asset_key import AssetKey, CoercibleToAssetKeyPrefix +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) +from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.job_definition import JobDefinition +from dagster._core.definitions.module_loaders.utils import ( + find_objects_in_module_of_types, + key_iterator, + replace_keys_in_asset, +) +from dagster._core.definitions.partitioned_schedule import ( + UnresolvedPartitionedAssetScheduleDefinition, +) +from dagster._core.definitions.schedule_definition import ScheduleDefinition +from dagster._core.definitions.sensor_definition import SensorDefinition +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition +from dagster._core.definitions.utils import DEFAULT_GROUP_NAME +from dagster._core.errors import DagsterInvalidDefinitionError + +LoadableDagsterDef = Union[ + AssetsDefinition, + SourceAsset, + CacheableAssetsDefinition, + AssetSpec, + SensorDefinition, + ScheduleDefinition, + JobDefinition, + UnresolvedAssetJobDefinition, + UnresolvedPartitionedAssetScheduleDefinition, +] + + +class ModuleScopedDagsterDefs: + def __init__( + self, + objects_per_module: Mapping[str, Sequence[LoadableDagsterDef]], + ): + self.objects_per_module = objects_per_module + self._do_collision_detection() + + @classmethod + def from_modules(cls, modules: Iterable[ModuleType]) -> "ModuleScopedDagsterDefs": + return cls( + { + module.__name__: list( + find_objects_in_module_of_types( + module, + get_args(LoadableDagsterDef), + ) + ) + for module in modules + }, + ) + + @cached_property + def flat_object_list(self) -> Sequence[LoadableDagsterDef]: + return [ + asset_object for objects in self.objects_per_module.values() for asset_object in objects + ] + + @cached_property + def objects_by_id(self) -> Dict[int, LoadableDagsterDef]: + return {id(asset_object): asset_object for asset_object in self.flat_object_list} + + @cached_property + def deduped_objects(self) -> Sequence[LoadableDagsterDef]: + return list(self.objects_by_id.values()) + + @cached_property + def assets_defs(self) -> Sequence[AssetsDefinition]: + return [asset for asset in self.deduped_objects if isinstance(asset, AssetsDefinition)] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [asset for asset in self.deduped_objects if isinstance(asset, SourceAsset)] + + @cached_property + def schedule_defs( + self, + ) -> Sequence[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]: + return [ + schedule + for schedule in self.deduped_objects + if isinstance( + schedule, (ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition) + ) + ] + + @cached_property + def job_objects(self) -> Sequence[Union[JobDefinition, UnresolvedAssetJobDefinition]]: + return [ + job + for job in self.deduped_objects + if isinstance(job, (JobDefinition, UnresolvedAssetJobDefinition)) + ] + + @cached_property + def sensor_defs(self) -> Sequence[SensorDefinition]: + return [sensor for sensor in self.deduped_objects if isinstance(sensor, SensorDefinition)] + + @cached_property + def module_name_by_id(self) -> Dict[int, str]: + return { + id(asset_object): module_name + for module_name, objects in self.objects_per_module.items() + for asset_object in objects + } + + @cached_property + def asset_objects_by_key( + self, + ) -> Mapping[AssetKey, Sequence[Union[SourceAsset, AssetSpec, AssetsDefinition]]]: + objects_by_key = defaultdict(list) + for asset_object in self.flat_object_list: + if not isinstance(asset_object, (SourceAsset, AssetSpec, AssetsDefinition)): + continue + for key in key_iterator(asset_object): + objects_by_key[key].append(asset_object) + return objects_by_key + + def _do_collision_detection(self) -> None: + # Collision detection on module-scoped asset objects. This does not include CacheableAssetsDefinitions, which don't have their keys defined until runtime. + for key, asset_objects in self.asset_objects_by_key.items(): + # If there is more than one asset_object in the list for a given key, and the objects do not refer to the same asset_object in memory, we have a collision. + num_distinct_objects_for_key = len( + set(id(asset_object) for asset_object in asset_objects) + ) + if len(asset_objects) > 1 and num_distinct_objects_for_key > 1: + asset_objects_str = ", ".join( + set(self.module_name_by_id[id(asset_object)] for asset_object in asset_objects) + ) + raise DagsterInvalidDefinitionError( + f"Asset key {key.to_user_string()} is defined multiple times. Definitions found in modules: {asset_objects_str}." + ) + # Collision detection on ScheduleDefinitions. + schedule_defs_by_name = defaultdict(list) + for schedule_def in self.schedule_defs: + schedule_defs_by_name[schedule_def.name].append(schedule_def) + for name, schedule_defs in schedule_defs_by_name.items(): + if len(schedule_defs) > 1: + schedule_defs_str = ", ".join( + set(self.module_name_by_id[id(schedule_def)] for schedule_def in schedule_defs) + ) + raise DagsterInvalidDefinitionError( + f"Schedule name {name} is defined multiple times. Definitions found in modules: {schedule_defs_str}." + ) + + # Collision detection on SensorDefinitions. + sensor_defs_by_name = defaultdict(list) + for sensor_def in self.sensor_defs: + sensor_defs_by_name[sensor_def.name].append(sensor_def) + for name, sensor_defs in sensor_defs_by_name.items(): + if len(sensor_defs) > 1: + sensor_defs_str = ", ".join( + set(self.module_name_by_id[id(sensor_def)] for sensor_def in sensor_defs) + ) + raise DagsterInvalidDefinitionError( + f"Sensor name {name} is defined multiple times. Definitions found in modules: {sensor_defs_str}." + ) + + # Collision detection on JobDefinitionObjects. + job_objects_by_name = defaultdict(list) + for job_object in self.job_objects: + job_objects_by_name[job_object.name].append(job_object) + for name, job_objects in job_objects_by_name.items(): + if len(job_objects) > 1: + job_objects_str = ", ".join( + set(self.module_name_by_id[id(job_object)] for job_object in job_objects) + ) + raise DagsterInvalidDefinitionError( + f"Job name {name} is defined multiple times. Definitions found in modules: {job_objects_str}." + ) + + def get_object_list(self) -> "DagsterObjectsList": + return DagsterObjectsList(self.deduped_objects) + + +class DagsterObjectsList: + def __init__( + self, + loaded_objects: Sequence[LoadableDagsterDef], + ): + self.loaded_defs = loaded_objects + + @cached_property + def assets_defs_and_specs(self) -> Sequence[Union[AssetsDefinition, AssetSpec]]: + return [ + asset + for asset in self.loaded_defs + if (isinstance(asset, AssetsDefinition) and asset.keys) or isinstance(asset, AssetSpec) + ] + + @cached_property + def assets_defs(self) -> Sequence[AssetsDefinition]: + return [asset for asset in self.loaded_defs if isinstance(asset, AssetsDefinition)] + + @cached_property + def checks_defs(self) -> Sequence[AssetChecksDefinition]: + return [ + cast(AssetChecksDefinition, asset) + for asset in self.loaded_defs + if isinstance(asset, AssetsDefinition) and has_only_asset_checks(asset) + ] + + @cached_property + def assets_defs_specs_and_checks_defs( + self, + ) -> Sequence[Union[AssetsDefinition, AssetSpec, AssetChecksDefinition]]: + return [*self.assets_defs_and_specs, *self.checks_defs] + + @cached_property + def source_assets(self) -> Sequence[SourceAsset]: + return [ + dagster_def for dagster_def in self.loaded_defs if isinstance(dagster_def, SourceAsset) + ] + + @cached_property + def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: + return [ + dagster_def + for dagster_def in self.loaded_defs + if isinstance(dagster_def, CacheableAssetsDefinition) + ] + + @cached_property + def sensors(self) -> Sequence[SensorDefinition]: + return [ + dagster_def + for dagster_def in self.loaded_defs + if isinstance(dagster_def, SensorDefinition) + ] + + @cached_property + def schedules( + self, + ) -> Sequence[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]: + return [ + dagster_def + for dagster_def in self.loaded_defs + if isinstance( + dagster_def, (ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition) + ) + ] + + @cached_property + def jobs(self) -> Sequence[Union[JobDefinition, UnresolvedAssetJobDefinition]]: + return [ + dagster_def + for dagster_def in self.loaded_defs + if isinstance(dagster_def, (JobDefinition, UnresolvedAssetJobDefinition)) + ] + + @cached_property + def assets( + self, + ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition, AssetSpec]]: + return [ + *self.assets_defs_and_specs, + *self.source_assets, + *self.cacheable_assets, + ] + + def get_objects( + self, filter_fn: Callable[[LoadableDagsterDef], bool] + ) -> Sequence[LoadableDagsterDef]: + return [dagster_def for dagster_def in self.loaded_defs if filter_fn(dagster_def)] + + def assets_with_loadable_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "DagsterObjectsList": + # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, + # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since + # at prefix-time it is not known that its dependency is one of the cacheable assets. + # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 + result_list = [] + all_asset_keys = { + key + for asset_object in self.assets_defs_specs_and_checks_defs + for key in key_iterator(asset_object, included_targeted_keys=True) + } + key_replacements = {key: key.with_prefix(key_prefix) for key in all_asset_keys} + for dagster_def in self.loaded_defs: + if isinstance(dagster_def, CacheableAssetsDefinition): + result_list.append(dagster_def.with_prefix_for_all(key_prefix)) + elif isinstance(dagster_def, (AssetsDefinition, AssetSpec)): + result_list.append(replace_keys_in_asset(dagster_def, key_replacements)) + else: + # We don't replace the key for SourceAssets, or of course for non-asset objects. + result_list.append(dagster_def) + + return DagsterObjectsList(result_list) + + def assets_with_source_prefix( + self, key_prefix: CoercibleToAssetKeyPrefix + ) -> "DagsterObjectsList": + result_list = [] + key_replacements = { + source_asset.key: source_asset.key.with_prefix(key_prefix) + for source_asset in self.source_assets + } + for dagster_def in self.loaded_defs: + if isinstance( + dagster_def, + (AssetSpec, SourceAsset, AssetsDefinition), + ): + result_list.append(replace_keys_in_asset(dagster_def, key_replacements)) + else: + result_list.append(dagster_def) + return DagsterObjectsList(result_list) + + def with_attributes( + self, + key_prefix: Optional[CoercibleToAssetKeyPrefix], + source_key_prefix: Optional[CoercibleToAssetKeyPrefix], + group_name: Optional[str], + freshness_policy: Optional[FreshnessPolicy], + automation_condition: Optional[AutomationCondition], + backfill_policy: Optional[BackfillPolicy], + ) -> "DagsterObjectsList": + dagster_def_list = self.assets_with_loadable_prefix(key_prefix) if key_prefix else self + dagster_def_list = ( + dagster_def_list.assets_with_source_prefix(source_key_prefix) + if source_key_prefix + else dagster_def_list + ) + return_list = [] + for dagster_def in dagster_def_list.loaded_defs: + if isinstance(dagster_def, AssetsDefinition): + new_asset = dagster_def.map_asset_specs( + _spec_mapper_disallow_group_override(group_name, automation_condition) + ).with_attributes( + backfill_policy=backfill_policy, freshness_policy=freshness_policy + ) + return_list.append( + new_asset.coerce_to_checks_def() + if has_only_asset_checks(new_asset) + else new_asset + ) + elif isinstance(dagster_def, SourceAsset): + return_list.append( + dagster_def.with_attributes( + group_name=group_name if group_name else dagster_def.group_name + ) + ) + elif isinstance(dagster_def, AssetSpec): + return_list.append( + _spec_mapper_disallow_group_override(group_name, automation_condition)( + dagster_def + ) + ) + elif isinstance(dagster_def, CacheableAssetsDefinition): + return_list.append( + dagster_def.with_attributes_for_all( + group_name, + freshness_policy=freshness_policy, + auto_materialize_policy=automation_condition.as_auto_materialize_policy() + if automation_condition + else None, + backfill_policy=backfill_policy, + ) + ) + else: + return_list.append(dagster_def) + return DagsterObjectsList(return_list) + + def to_definitions_args(self) -> Mapping[str, Any]: + return { + "assets": self.assets, + "asset_checks": self.checks_defs, + "sensors": self.sensors, + "schedules": self.schedules, + "jobs": self.jobs, + } + + +def _spec_mapper_disallow_group_override( + group_name: Optional[str], automation_condition: Optional[AutomationCondition] +) -> Callable[[AssetSpec], AssetSpec]: + def _inner(spec: AssetSpec) -> AssetSpec: + if ( + group_name is not None + and spec.group_name is not None + and group_name != spec.group_name + and spec.group_name != DEFAULT_GROUP_NAME + ): + raise DagsterInvalidDefinitionError( + f"Asset spec {spec.key.to_user_string()} has group name {spec.group_name}, which conflicts with the group name {group_name} provided in load_assets_from_modules." + ) + return spec.replace_attributes( + group_name=group_name if group_name else ..., + automation_condition=automation_condition if automation_condition else ..., + ) + + return _inner diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py new file mode 100644 index 0000000000000..f56407ee2cbef --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/utils.py @@ -0,0 +1,84 @@ +import pkgutil +from importlib import import_module +from types import ModuleType +from typing import Iterable, Iterator, Mapping, Tuple, Type, Union + +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.source_asset import SourceAsset + + +def find_objects_in_module_of_types( + module: ModuleType, + types: Union[Type, Tuple[Type, ...]], +) -> Iterator: + """Yields instances or subclasses of the given type(s).""" + for attr in dir(module): + value = getattr(module, attr) + if isinstance(value, types): + yield value + elif isinstance(value, list) and all(isinstance(el, types) for el in value): + yield from value + + +def find_subclasses_in_module( + module: ModuleType, + types: Union[Type, Tuple[Type, ...]], +) -> Iterator: + """Yields instances or subclasses of the given type(s).""" + for attr in dir(module): + value = getattr(module, attr) + if isinstance(value, type) and issubclass(value, types): + yield value + + +def key_iterator( + asset: Union[AssetsDefinition, SourceAsset, AssetSpec], included_targeted_keys: bool = False +) -> Iterator[AssetKey]: + return ( + iter( + [ + *asset.keys, + *( + [check_key.asset_key for check_key in asset.check_keys] + if included_targeted_keys + else [] + ), + ] + ) + if isinstance(asset, AssetsDefinition) + else iter([asset.key]) + ) + + +def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: + yield package_module + if package_module.__file__: + for _, modname, is_pkg in pkgutil.walk_packages( + package_module.__path__, prefix=package_module.__name__ + "." + ): + submodule = import_module(modname) + if is_pkg: + yield from find_modules_in_package(submodule) + else: + yield submodule + else: + raise ValueError( + f"Tried to find modules in package {package_module}, but its __file__ is None" + ) + + +def replace_keys_in_asset( + asset: Union[AssetsDefinition, AssetSpec, SourceAsset], + key_replacements: Mapping[AssetKey, AssetKey], +) -> Union[AssetsDefinition, AssetSpec, SourceAsset]: + if isinstance(asset, SourceAsset): + return asset.with_attributes(key=key_replacements.get(asset.key, asset.key)) + if isinstance(asset, AssetSpec): + return asset.replace_attributes( + key=key_replacements.get(asset.key, asset.key), + ) + else: + updated_object = asset.with_attributes(asset_key_replacements=key_replacements) + return updated_object diff --git a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py index 3cf0189899cf3..63733e9d45f98 100644 --- a/python_modules/dagster/dagster/_core/workspace/autodiscovery.py +++ b/python_modules/dagster/dagster/_core/workspace/autodiscovery.py @@ -5,7 +5,9 @@ from dagster import DagsterInvariantViolationError, GraphDefinition, RepositoryDefinition from dagster._core.code_pointer import load_python_file, load_python_module from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.load_assets_from_modules import assets_from_modules +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + load_assets_from_modules, +) LOAD_ALL_ASSETS = "<>" @@ -114,9 +116,9 @@ def loadable_targets_from_loaded_module(module: ModuleType) -> Sequence[Loadable ) ) - module_assets, module_source_assets, _ = assets_from_modules([module]) - if len(module_assets) > 0 or len(module_source_assets) > 0: - return [LoadableTarget(LOAD_ALL_ASSETS, [*module_assets, *module_source_assets])] + assets = load_assets_from_modules([module]) + if len(assets) > 0: + return [LoadableTarget(LOAD_ALL_ASSETS, assets)] raise DagsterInvariantViolationError( "No Definitions, RepositoryDefinition, Job, Pipeline, Graph, or AssetsDefinition found in " diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py index 1d7441635953b..27bb96bdd9b7f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py @@ -20,30 +20,30 @@ GIT_ROOT_PATH = os.path.normpath(os.path.join(DAGSTER_PACKAGE_PATH, "../../")) # path of the current file relative to the `dagster` package root -PATH_IN_PACKAGE = "/dagster_tests/asset_defs_tests/" +PATH_IN_PACKAGE = "/dagster_tests/definitions_tests/module_loader_tests/" # {path to module}:{path to file relative to module root}:{line number} EXPECTED_ORIGINS = { - "james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:12", + "james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:13", "chuck_berry": ( - DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:18" + DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:19" ), - "little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:4"), - "fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:16", + "little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:5"), + "fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:17", "miles_davis": ( DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/asset_subpackage/another_module_with_assets.py:6" ), "graph_backed_asset": ( - DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:41" + DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:42" ), } def test_asset_code_origins() -> None: - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets collection = load_assets_from_modules([asset_package, module_with_assets]) @@ -105,8 +105,8 @@ def test_asset_code_origins() -> None: def test_asset_code_origins_source_control() -> None: - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets collection = load_assets_from_modules([asset_package, module_with_assets]) @@ -161,8 +161,8 @@ def test_asset_code_origins_source_control() -> None: def test_asset_code_origins_source_control_custom_mapping() -> None: # test custom source_control_file_path_mapping fn - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets collection = load_assets_from_modules([asset_package, module_with_assets]) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py index 07ff2d7bc2f65..99d8733f7e7f4 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_job.py @@ -43,7 +43,6 @@ from dagster._core.definitions.decorators.asset_check_decorator import asset_check from dagster._core.definitions.dependency import NodeHandle, NodeInvocation from dagster._core.definitions.executor_definition import in_process_executor -from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.errors import DagsterInvalidSubsetError from dagster._core.execution.api import execute_run_iterator from dagster._core.snap import DependencyStructureIndex @@ -2362,7 +2361,15 @@ def test_asset_group_build_subset_job(job_selection, expected_assets, use_multi, all_assets = _get_assets_defs(use_multi=use_multi, allow_subset=use_multi) # apply prefixes for prefix in reversed(prefixes or []): - all_assets, _ = prefix_assets(all_assets, prefix, [], None) + all_assets = [ + assets_def.with_attributes( + asset_key_replacements={ + k: k.with_prefix(prefix) + for k in set(assets_def.keys_by_input_name.values()) | set(assets_def.keys) + }, + ) + for assets_def in all_assets + ] defs = Definitions( # for these, if we have multi assets, we'll always allow them to be subset diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 5fbc36e955cbb..db853ff7499ea 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -66,11 +66,9 @@ def asset1(input1, input2): assert input2 replaced = asset1.with_attributes( - output_asset_key_replacements={ - AssetKey(["asset1"]): AssetKey(["prefix1", "asset1_changed"]) - }, - input_asset_key_replacements={ - AssetKey(["something_else", "input2"]): AssetKey(["apple", "banana"]) + asset_key_replacements={ + AssetKey(["asset1"]): AssetKey(["prefix1", "asset1_changed"]), + AssetKey(["something_else", "input2"]): AssetKey(["apple", "banana"]), }, ) @@ -166,9 +164,7 @@ def test_retain_group(): def bar(): pass - replaced = bar.with_attributes( - output_asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])} - ) + replaced = bar.with_attributes(asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])}) assert replaced.specs_by_key[AssetKey("baz")].group_name == "foo" @@ -179,9 +175,7 @@ def test_retain_freshness_policy(): def bar(): pass - replaced = bar.with_attributes( - output_asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])} - ) + replaced = bar.with_attributes(asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])}) assert ( replaced.specs_by_key[AssetKey(["baz"])].freshness_policy == bar.specs_by_key[AssetKey(["bar"])].freshness_policy @@ -216,7 +210,7 @@ def my_graph(): ) replaced = my_graph_asset.with_attributes( - output_asset_key_replacements={ + asset_key_replacements={ AssetKey("a"): AssetKey("aa"), AssetKey("b"): AssetKey("bb"), AssetKey("c"): AssetKey("cc"), @@ -245,7 +239,7 @@ def bar(): original = AssetsDefinition.from_graph(bar, metadata_by_output_name={"result": md}) replaced = original.with_attributes( - output_asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])} + asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])} ) assert ( replaced.specs_by_key[AssetKey(["baz"])].metadata @@ -281,7 +275,7 @@ def bar_(input_last): assert isinstance(bar_.get_partition_mapping(AssetKey(["input_last"])), LastPartitionMapping) replaced = bar_.with_attributes( - input_asset_key_replacements={ + asset_key_replacements={ AssetKey(["input_last"]): AssetKey(["input_last2"]), } ) @@ -305,8 +299,10 @@ def abc_(context, in1, in2, in3): pass replaced_1 = abc_.with_attributes( - output_asset_key_replacements={AssetKey(["a"]): AssetKey(["foo", "foo_a"])}, - input_asset_key_replacements={AssetKey(["in1"]): AssetKey(["foo", "bar_in1"])}, + asset_key_replacements={ + AssetKey(["a"]): AssetKey(["foo", "foo_a"]), + AssetKey(["in1"]): AssetKey(["foo", "bar_in1"]), + }, ) assert replaced_1.keys == {AssetKey(["foo", "foo_a"]), AssetKey("b"), AssetKey("c")} @@ -328,11 +324,9 @@ def abc_(context, in1, in2, in3): assert subbed_1.keys == {AssetKey(["foo", "foo_a"]), AssetKey("b")} replaced_2 = subbed_1.with_attributes( - output_asset_key_replacements={ + asset_key_replacements={ AssetKey(["foo", "foo_a"]): AssetKey(["again", "foo", "foo_a"]), AssetKey(["b"]): AssetKey(["something", "bar_b"]), - }, - input_asset_key_replacements={ AssetKey(["foo", "bar_in1"]): AssetKey(["again", "foo", "bar_in1"]), AssetKey(["in2"]): AssetKey(["foo", "in2"]), AssetKey(["in3"]): AssetKey(["foo", "in3"]), @@ -2220,7 +2214,7 @@ def my_multi_asset(): assert my_multi_asset.specs_by_key[AssetKey("out2")].owners == ["user@dagsterlabs.com"] prefixed_asset = my_multi_asset.with_attributes( - output_asset_key_replacements={AssetKey(["out1"]): AssetKey(["prefix", "out1"])} + asset_key_replacements={AssetKey(["out1"]): AssetKey(["prefix", "out1"])} ) assert prefixed_asset.specs_by_key[AssetKey(["prefix", "out1"])].owners == [ "user@dagsterlabs.com" diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 563d5d46cf59c..c8d475662a658 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -25,7 +25,6 @@ from dagster._core.definitions import asset, multi_asset from dagster._core.definitions.decorators.hook_decorator import failure_hook, success_hook from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.definitions.partition import ( StaticPartitionsDefinition, static_partitioned_config, @@ -354,7 +353,15 @@ def test_define_selection_job(job_selection, expected_assets, use_multi, prefixe prefixed_assets = _get_assets_defs(use_multi=use_multi, allow_subset=use_multi) # apply prefixes for prefix in reversed(prefixes or []): - prefixed_assets, _ = prefix_assets(prefixed_assets, prefix, [], None) + prefixed_assets = [ + assets_def.with_attributes( + asset_key_replacements={ + key: key.with_prefix(prefix) + for key in set(assets_def.keys_by_input_name.values()) | set(assets_def.keys) + }, + ) + for assets_def in prefixed_assets + ] final_assets = with_resources( prefixed_assets, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/checks_submodule_2/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/checks_submodule_2/__init__.py deleted file mode 100644 index 6fe8c89bb150c..0000000000000 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/checks_submodule_2/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from dagster_tests.definitions_tests.asset_check_tests.checks_module.checks_submodule import ( - submodule_check as submodule_check, -) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/__init__.py diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/__init__.py similarity index 88% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/__init__.py index 9eaf0d44dad77..44830afc5c1fa 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/__init__.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset +from dagster._core.definitions.asset_spec import AssetSpec @asset @@ -29,4 +30,7 @@ def make_list_of_source_assets(): return [buddy_holly, jerry_lee_lewis] +top_level_spec = AssetSpec("top_level_spec") + + list_of_assets_and_source_assets = [*make_list_of_assets(), *make_list_of_source_assets()] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/another_module_with_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/another_module_with_assets.py similarity index 100% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/another_module_with_assets.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/another_module_with_assets.py diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/asset_subsubpackage/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/asset_subsubpackage/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/asset_subpackage/asset_subsubpackage/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/asset_subpackage/asset_subsubpackage/__init__.py diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/module_with_assets.py similarity index 79% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/module_with_assets.py index 0cea67687a7f0..ace7bfbf9c8af 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/module_with_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package/module_with_assets.py @@ -1,4 +1,5 @@ from dagster import AssetKey, SourceAsset, asset, graph_asset, op +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata import ( CodeReferencesMetadataSet, CodeReferencesMetadataValue, @@ -6,7 +7,7 @@ ) # importing this makes it show up twice when we collect everything -from dagster_tests.asset_defs_tests.asset_package.asset_subpackage.another_module_with_assets import ( +from dagster_tests.definitions_tests.module_loader_tests.asset_package.asset_subpackage.another_module_with_assets import ( miles_davis, ) @@ -41,3 +42,6 @@ def multiply_by_two(input_num): @graph_asset def graph_backed_asset(): return multiply_by_two(one()) + + +my_spec = AssetSpec("my_asset_spec") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package_with_cacheable/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package_with_cacheable/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/asset_defs_tests/asset_package_with_cacheable/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/asset_package_with_cacheable/__init__.py diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/__init__.py diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/checks_submodule/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/checks_submodule/__init__.py similarity index 100% rename from python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/checks_module/checks_submodule/__init__.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/checks_submodule/__init__.py diff --git a/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/checks_submodule_2/__init__.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/checks_submodule_2/__init__.py new file mode 100644 index 0000000000000..aad306cfb53e4 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/checks_module/checks_submodule_2/__init__.py @@ -0,0 +1,3 @@ +from dagster_tests.definitions_tests.module_loader_tests.checks_module.checks_submodule import ( + submodule_check as submodule_check, +) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/test_load_from_modules.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_asset_checks_from_modules.py similarity index 92% rename from python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/test_load_from_modules.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_asset_checks_from_modules.py index c299c488aae4a..9256adf4efadd 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_check_tests/test_load_from_modules.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_asset_checks_from_modules.py @@ -19,8 +19,8 @@ def test_load_asset_checks_from_modules(): - from dagster_tests.definitions_tests.asset_check_tests import checks_module - from dagster_tests.definitions_tests.asset_check_tests.checks_module import asset_check_1 + from dagster_tests.definitions_tests.module_loader_tests import checks_module + from dagster_tests.definitions_tests.module_loader_tests.checks_module import asset_check_1 checks = load_asset_checks_from_modules([checks_module]) assert len(checks) == 1 @@ -47,7 +47,7 @@ def test_load_asset_checks_from_modules(): def test_load_asset_checks_from_modules_prefix(): - from dagster_tests.definitions_tests.asset_check_tests import checks_module + from dagster_tests.definitions_tests.module_loader_tests import checks_module checks = load_asset_checks_from_modules([checks_module], asset_key_prefix="foo") assert len(checks) == 1 @@ -98,7 +98,7 @@ def test_load_asset_checks_from_current_module(): ], ) def test_load_asset_checks_from_package(load_fns): - from dagster_tests.definitions_tests.asset_check_tests import checks_module + from dagster_tests.definitions_tests.module_loader_tests import checks_module assets_load_fn, checks_load_fn = load_fns diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_assets_from_modules.py similarity index 75% rename from python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py rename to python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_assets_from_modules.py index 7fc579483ac2e..cf7f23e97703f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_load_assets_from_modules.py @@ -14,6 +14,7 @@ load_assets_from_package_module, load_assets_from_package_name, ) +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition @@ -87,7 +88,7 @@ def get_source_asset_with_key( def test_load_assets_from_package_name(): - from dagster_tests.asset_defs_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests import asset_package assets_defs = load_assets_from_package_name(asset_package.__name__) assert len(assets_defs) == 11 @@ -101,9 +102,18 @@ def test_load_assets_from_package_name(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_package_module(): - from dagster_tests.asset_defs_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests import asset_package assets_1 = load_assets_from_package_module(asset_package) assert len(assets_1) == 11 @@ -117,10 +127,19 @@ def test_load_assets_from_package_module(): assert assets_1 == assets_2 + assets_3 = load_assets_from_package_name(asset_package.__name__, include_specs=True) + assert len(assets_3) == 13 + + assert next( + iter( + a for a in assets_3 if isinstance(a, AssetSpec) and a.key == AssetKey("top_level_spec") + ) + ) + def test_load_assets_from_modules(monkeypatch): - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets collection_1 = load_assets_from_modules([asset_package, module_with_assets]) @@ -141,13 +160,23 @@ def little_richard(): m.setattr(asset_package, "little_richard_dup", little_richard, raising=False) with pytest.raises( DagsterInvalidDefinitionError, - match=re.escape( - "Asset key AssetKey(['little_richard']) is defined multiple times. " - "Definitions found in modules: dagster_tests.asset_defs_tests.asset_package." - ), + match=re.escape("Asset key little_richard is defined multiple times."), ): load_assets_from_modules([asset_package, module_with_assets]) + # Create an AssetsDefinition with an identical spec to that in the module + with monkeypatch.context() as m: + + @asset + def top_level_spec(): + pass + + m.setattr(asset_package, "top_level_spec_same_assets_def", top_level_spec, raising=False) + with pytest.raises( + DagsterInvalidDefinitionError, + ): + load_assets_from_modules([asset_package, module_with_assets], include_specs=True) + @asset(group_name="my_group") def asset_in_current_module(): @@ -156,17 +185,27 @@ def asset_in_current_module(): source_asset_in_current_module = SourceAsset(AssetKey("source_asset_in_current_module")) +spec_in_current_module = AssetSpec("spec_in_current_module") + def test_load_assets_from_current_module(): assets = load_assets_from_current_module() assets = [get_unique_asset_identifier(asset) for asset in assets] - assert assets == ["asset_in_current_module", AssetKey("source_asset_in_current_module")] + assert set(assets) == {"asset_in_current_module", AssetKey("source_asset_in_current_module")} assert len(assets) == 2 + assets = load_assets_from_current_module(include_specs=True) + assets = [get_unique_asset_identifier(asset) for asset in assets] + assert len(assets) == 3 + assert set(assets) == { + "asset_in_current_module", + AssetKey("source_asset_in_current_module"), + AssetKey("spec_in_current_module"), + } def test_load_assets_from_modules_with_group_name(): - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets assets = load_assets_from_modules( [asset_package, module_with_assets], group_name="my_cool_group" @@ -179,15 +218,16 @@ def test_load_assets_from_modules_with_group_name(): def test_respect_existing_groups(): assets = load_assets_from_current_module() - assert assets[0].group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue] + assets_def = next(iter(a for a in assets if isinstance(a, AssetsDefinition))) + assert assets_def.group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" with pytest.raises(DagsterInvalidDefinitionError): load_assets_from_current_module(group_name="yay") def test_load_assets_with_freshness_policy(): - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets assets = load_assets_from_modules( [asset_package, module_with_assets], @@ -202,8 +242,8 @@ def test_load_assets_with_freshness_policy(): def test_load_assets_with_auto_materialize_policy(): - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets assets = load_assets_from_modules( [asset_package, module_with_assets], auto_materialize_policy=AutoMaterializePolicy.eager() @@ -225,8 +265,8 @@ def test_load_assets_with_auto_materialize_policy(): ], ) def test_prefix(prefix): - from dagster_tests.asset_defs_tests import asset_package - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests import asset_package + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets assets = load_assets_from_modules([asset_package, module_with_assets], key_prefix=prefix) assert_assets_have_prefix(prefix, assets) # pyright: ignore[reportArgumentType] @@ -236,7 +276,7 @@ def test_prefix(prefix): def _load_assets_from_module_with_assets(**kwargs): - from dagster_tests.asset_defs_tests.asset_package import module_with_assets + from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets return load_assets_from_modules([module_with_assets], **kwargs) @@ -246,7 +286,7 @@ def _load_assets_from_module_with_assets(**kwargs): [ _load_assets_from_module_with_assets, lambda **kwargs: load_assets_from_package_name( - "dagster_tests.asset_defs_tests.asset_package", **kwargs + "dagster_tests.definitions_tests.module_loader_tests.asset_package", **kwargs ), ], ) @@ -270,7 +310,9 @@ def test_source_key_prefix(load_fn): assert get_assets_def_with_key( assets_with_prefix_sources, AssetKey(["foo", "my_cool_prefix", "chuck_berry"]) ).dependency_keys == { + # source prefix AssetKey(["bar", "cooler_prefix", "elvis_presley"]), + # loadable prefix AssetKey(["foo", "my_cool_prefix", "miles_davis"]), } @@ -297,7 +339,7 @@ def test_source_key_prefix(load_fn): ) def test_load_assets_cacheable(load_fn, prefix): """Tests the load-from-module and load-from-package-name functinos with cacheable assets.""" - from dagster_tests.asset_defs_tests import asset_package_with_cacheable + from dagster_tests.definitions_tests.module_loader_tests import asset_package_with_cacheable assets_defs = load_fn(asset_package_with_cacheable) assert len(assets_defs) == 3 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py new file mode 100644 index 0000000000000..90c76183058bd --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py @@ -0,0 +1,255 @@ +import logging +from contextlib import contextmanager +from types import ModuleType +from typing import Any, Mapping, Sequence, Type, cast + +import dagster as dg +import pytest +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.module_loaders.load_defs_from_module import ( + load_definitions_from_module, +) +from dagster._core.definitions.module_loaders.object_list import ( + LoadableDagsterDef, + ModuleScopedDagsterDefs, +) +from dagster._record import record + + +def build_module_fake(name: str, objects: Mapping[str, Any]) -> ModuleType: + module = ModuleType(name) + for key, value in objects.items(): + setattr(module, key, value) + return module + + +def asset_with_key(key: str) -> dg.AssetsDefinition: + @dg.asset(key=key) + def my_asset(): ... + + return my_asset + + +def schedule_with_name(name: str) -> dg.ScheduleDefinition: + return dg.ScheduleDefinition(name=name, cron_schedule="* * * * *", target="*") + + +def sensor_with_name(name: str) -> dg.SensorDefinition: + @dg.sensor(job_name="blah") + def my_sensor(): + pass + + return my_sensor + + +def check_with_key(key: str, name: str) -> dg.AssetChecksDefinition: + @dg.asset_check(asset=key, name=name) + def my_check() -> dg.AssetCheckResult: + raise Exception("ooops") + + return my_check + + +def all_loadable_objects_from_defs(defs: Definitions) -> Sequence[LoadableDagsterDef]: + return [ + *(defs.assets or []), + *(defs.sensors or []), + *(defs.schedules or []), + *(defs.asset_checks or []), + *(defs.jobs or []), + ] + + +@contextmanager +def optional_pytest_raise(error_expected: bool, exception_cls: Type[Exception]): + if error_expected: + with pytest.raises(exception_cls): + yield + else: + yield + + +@record +class ModuleScopeTestSpec: + objects: Mapping[str, Any] + error_expected: bool + id_: str + + @staticmethod + def as_parametrize_kwargs(seq: Sequence["ModuleScopeTestSpec"]) -> Mapping[str, Any]: + return { + "argnames": "objects,error_expected", + "argvalues": [(spec.objects, spec.error_expected) for spec in seq], + "ids": [spec.id_ for spec in seq], + } + + +some_schedule = schedule_with_name("foo") +some_sensor = sensor_with_name("foo") +some_asset = asset_with_key("foo") +some_job = dg.define_asset_job(name="foo") +some_check = check_with_key("foo_key", "some_name") +partitioned_job = dg.define_asset_job( + name="foo", partitions_def=dg.DailyPartitionsDefinition(start_date="2020-01-01") +) +some_partitioned_schedule = dg.build_schedule_from_partitioned_job(partitioned_job, "0 0 * * *") +other_partitioned_schedule = dg.build_schedule_from_partitioned_job(partitioned_job, "0 0 * * *") + + +MODULE_TEST_SPECS = [ + ModuleScopeTestSpec( + objects={"foo": some_schedule}, error_expected=False, id_="single schedule" + ), + ModuleScopeTestSpec( + objects={"foo": some_schedule, "bar": schedule_with_name("foo")}, + error_expected=True, + id_="conflicting schedules", + ), + ModuleScopeTestSpec( + objects={"foo": some_schedule, "bar": some_schedule}, + error_expected=False, + id_="schedules multiple variables", + ), + ModuleScopeTestSpec(objects={"foo": some_sensor}, error_expected=False, id_="single sensor"), + ModuleScopeTestSpec( + objects={"foo": some_sensor, "bar": sensor_with_name("foo")}, + error_expected=True, + id_="conflicting sensors", + ), + ModuleScopeTestSpec( + objects={"foo": some_sensor, "bar": some_sensor}, + error_expected=False, + id_="sensors multiple variables", + ), + ModuleScopeTestSpec( + objects={"foo": some_asset}, + error_expected=False, + id_="asset single variable", + ), + ModuleScopeTestSpec( + objects={"foo": some_asset, "bar": asset_with_key("foo")}, + error_expected=True, + id_="conflicting assets", + ), + ModuleScopeTestSpec( + objects={"foo": some_asset, "bar": some_asset}, + error_expected=False, + id_="assets multiple variables", + ), + ModuleScopeTestSpec( + objects={"foo": some_job}, + error_expected=False, + id_="single job", + ), + ModuleScopeTestSpec( + objects={"foo": some_job, "bar": dg.define_asset_job("other_job")}, + error_expected=False, + id_="conflicting jobs", + ), + ModuleScopeTestSpec( + objects={"foo": some_job, "bar": some_job}, + error_expected=False, + id_="job multiple variables", + ), + ModuleScopeTestSpec( + objects={"foo": some_check}, + error_expected=False, + id_="single job", + ), + # Currently, we do not perform any collision detection on asset checks. This is the behavior currently public in load_asset_checks_from_module. + ModuleScopeTestSpec( + objects={"foo": some_check, "bar": check_with_key("foo_key", "some_name")}, + error_expected=False, + id_="conflicting checks", + ), + ModuleScopeTestSpec( + objects={"foo": some_check, "bar": some_check}, + error_expected=False, + id_="check multiple variables", + ), + ModuleScopeTestSpec( + objects={"foo": some_partitioned_schedule, "job": partitioned_job}, + error_expected=False, + id_="single partitioned schedule", + ), + ModuleScopeTestSpec( + objects={ + "foo": some_partitioned_schedule, + "bar": other_partitioned_schedule, + "job": partitioned_job, + }, + error_expected=True, + id_="conflicting partitioned schedules", + ), + ModuleScopeTestSpec( + objects={ + "foo": some_partitioned_schedule, + "bar": some_partitioned_schedule, + "job": partitioned_job, + }, + error_expected=False, + id_="partitioned schedules multiple variables", + ), +] + + +@pytest.mark.parametrize(**ModuleScopeTestSpec.as_parametrize_kwargs(MODULE_TEST_SPECS)) +def test_collision_detection(objects: Mapping[str, Any], error_expected: bool) -> None: + module_fake = build_module_fake("fake", objects) + with optional_pytest_raise( + error_expected=error_expected, exception_cls=dg.DagsterInvalidDefinitionError + ): + obj_list = ModuleScopedDagsterDefs.from_modules([module_fake]).get_object_list() + obj_ids = {id(obj) for obj in objects.values()} + assert len(obj_list.loaded_defs) == len(obj_ids) + + +@pytest.mark.parametrize(**ModuleScopeTestSpec.as_parametrize_kwargs(MODULE_TEST_SPECS)) +def test_load_from_definitions(objects: Mapping[str, Any], error_expected: bool) -> None: + module_fake = build_module_fake("fake", objects) + with optional_pytest_raise( + error_expected=error_expected, exception_cls=dg.DagsterInvalidDefinitionError + ): + defs = load_definitions_from_module(module_fake) + obj_ids = {id(obj) for obj in all_loadable_objects_from_defs(defs)} + expected_obj_ids = {id(obj) for obj in objects.values()} + assert len(obj_ids) == len(expected_obj_ids) + + +def test_load_with_resources() -> None: + @dg.resource + def my_resource(): ... + + module_fake = build_module_fake("foo", {"my_resource": my_resource}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert len(defs.resources or {}) == 0 + defs = load_definitions_from_module(module_fake, resources={"foo": my_resource}) + assert len(defs.resources or {}) == 1 + + +def test_load_with_logger_defs() -> None: + @dg.logger(config_schema={}) + def my_logger(init_context) -> logging.Logger: ... + + module_fake = build_module_fake("foo", {"my_logger": my_logger}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert len(defs.resources or {}) == 0 + defs = load_definitions_from_module(module_fake, resources={"foo": my_logger}) + assert len(defs.resources or {}) == 1 + + +def test_load_with_executor() -> None: + @dg.executor(name="my_executor") + def my_executor(init_context) -> dg.Executor: ... + + module_fake = build_module_fake("foo", {"my_executor": my_executor}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert defs.executor is None + defs = load_definitions_from_module(module_fake, executor=my_executor) + assert ( + defs.executor is not None + and cast(dg.ExecutorDefinition, defs.executor).name == "my_executor" + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py index a47062ec3a623..5e8643905acc8 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py @@ -357,3 +357,44 @@ def foo(): with pytest.raises(CheckError): dg.map_asset_specs(lambda spec: spec.merge_attributes(deps=["baz"]), [foo]) + + +def test_static_partition_mapping_dep() -> None: + @dg.asset(partitions_def=dg.StaticPartitionsDefinition(["1", "2"])) + def b(): + pass + + @dg.multi_asset( + specs=[ + AssetSpec( + key="a", + partitions_def=dg.StaticPartitionsDefinition(["1", "2"]), + deps=[ + AssetDep("b", partition_mapping=dg.StaticPartitionMapping({"1": "1", "2": "2"})) + ], + ) + ] + ) + def my_asset(): + pass + + a_asset = next( + iter( + dg.map_asset_specs( + lambda spec: spec.merge_attributes( + deps=[ + AssetDep( + "c", partition_mapping=dg.StaticPartitionMapping({"1": "1", "2": "2"}) + ) + ] + ), + [my_asset], + ) + ) + ) + + a_spec = next(iter(a_asset.specs)) + b_dep = next(iter(dep for dep in a_spec.deps if dep.asset_key == AssetKey("b"))) + c_dep = next(iter(dep for dep in a_spec.deps if dep.asset_key == AssetKey("c"))) + assert b_dep.partition_mapping == dg.StaticPartitionMapping({"1": "1", "2": "2"}) + assert c_dep.partition_mapping == dg.StaticPartitionMapping({"1": "1", "2": "2"}) diff --git a/python_modules/dagster/dagster_tests/general_tests/test_cacheable_assets_defs.py b/python_modules/dagster/dagster_tests/general_tests/test_cacheable_assets_defs.py index 1ec97dc41eca1..6391f6e83cbc7 100644 --- a/python_modules/dagster/dagster_tests/general_tests/test_cacheable_assets_defs.py +++ b/python_modules/dagster/dagster_tests/general_tests/test_cacheable_assets_defs.py @@ -1,4 +1,4 @@ -from typing import Sequence +from typing import List, Sequence, Union, cast import dagster._check as check import pytest @@ -151,7 +151,9 @@ def test_resolve_wrong_data(): recon_repo.get_definition() -def define_uncacheable_and_resource_dependent_cacheable_assets(): +def define_uncacheable_and_resource_dependent_cacheable_assets() -> ( + Sequence[Union[CacheableAssetsDefinition, AssetsDefinition]] +): class ResourceDependentCacheableAsset(CacheableAssetsDefinition): def __init__(self): super().__init__("res_midstream") @@ -295,7 +297,7 @@ def _op(): ) -def test_multiple_wrapped_cached_assets(): +def test_multiple_wrapped_cached_assets() -> None: """Test that multiple wrappers (with_attributes, with_resources) work properly on cacheable assets.""" @resource @@ -304,9 +306,7 @@ def foo_resource(): my_cacheable_assets_with_group_and_asset = [ x.with_attributes( - output_asset_key_replacements={ - AssetKey("res_downstream"): AssetKey("res_downstream_too") - } + asset_key_replacements={AssetKey("res_downstream"): AssetKey("res_downstream_too")} ) for x in with_resources( [ @@ -333,13 +333,19 @@ def resource_dependent_repo_with_resources(): assert isinstance(repo.get_job("all_asset_job"), JobDefinition) my_cool_group_sel = AssetSelection.groups("my_cool_group") + cacheable_resource_asset = cast( + CacheableAssetsDefinition, my_cacheable_assets_with_group_and_asset[0] + ) + resolved_defs = list( + cacheable_resource_asset.build_definitions( + cacheable_resource_asset.compute_cacheable_data() + ) + ) assert ( len( my_cool_group_sel.resolve( - my_cacheable_assets_with_group_and_asset[0].build_definitions( - my_cacheable_assets_with_group_and_asset[0].compute_cacheable_data() - ) - + my_cacheable_assets_with_group_and_asset[1:] + resolved_defs + + cast(List[AssetsDefinition], my_cacheable_assets_with_group_and_asset[1:]) ) ) == 1 diff --git a/python_modules/libraries/dagster-components/dagster_components/core/component.py b/python_modules/libraries/dagster-components/dagster_components/core/component.py index dc1d77f6dcf72..13d7d2f8192cb 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/component.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/component.py @@ -197,7 +197,9 @@ def __repr__(self) -> str: def get_registered_components_in_module(module: ModuleType) -> Iterable[Type[Component]]: - from dagster._core.definitions.load_assets_from_modules import find_subclasses_in_module + from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + find_subclasses_in_module, + ) for component in find_subclasses_in_module(module, (Component,)): if is_registered_component(component): diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/cli/app.py b/python_modules/libraries/dagster-dbt/dagster_dbt/cli/app.py index cb439b79c5fd2..3a80099287c07 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/cli/app.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/cli/app.py @@ -7,7 +7,9 @@ import yaml from dagster._cli.project import check_if_pypi_package_conflict_exists from dagster._core.code_pointer import load_python_file -from dagster._core.definitions.load_assets_from_modules import find_objects_in_module_of_types +from dagster._core.definitions.module_loaders.load_assets_from_modules import ( + find_objects_in_module_of_types, +) from jinja2 import Environment, FileSystemLoader from rich.console import Console from rich.syntax import Syntax