diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py index e7b27d251ebad..3fb5a8cb28041 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/dagster.py @@ -160,8 +160,13 @@ def build_graphql_python_client_backcompat_steps() -> List[CommandStep]: CommandStepBuilder(":graphql: GraphQL Python Client backcompat") .on_test_image(AvailablePythonVersion.get_default()) .run( +<<<<<<< HEAD "pip install -e python_modules/dagster[test] -e python_modules/dagster-pipes -e" " python_modules/dagster-graphql -e python_modules/automation", +======= + "pip install -e python_modules/dagster[test] -e python_modules/dagster-graphql -e " + " python_modules/automation -e python_modules/dagster-ext", +>>>>>>> 49b0a60e1f (random bk) "dagster-graphql-client query check", ) .with_skip( diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 663eaf4bf3b3f..0f342ca19c642 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -14,6 +14,8 @@ cast, ) +from dagster_ext import IContext + import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -50,6 +52,7 @@ from dagster._utils.forked_pdb import ForkedPdb from dagster._utils.warnings import ( deprecation_warning, + disable_dagster_warnings, ) from .system import StepExecutionContext @@ -1264,10 +1267,461 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) -class AssetExecutionContext(OpExecutionContext): - def __init__(self, step_execution_context: StepExecutionContext): - super().__init__(step_execution_context=step_execution_context) +############################ +##### AssetExecutionContext +############################ + +# To preserve backwards compatibility, AssetExecutionContext is being written as a subclass of +# OpExecutionContext until we can split it into its own class. All methods on OpExecutionContext +# that will not be included in the eventual AssetExecutionContext will be marked with deprecation +# warnings according to how the user should access that functionality in the future +# +# The following sets/maps are used to determine which methods need deprecation warnings, and how to +# direct users to the correct method to use + + +OP_EXECUTION_CONTEXT_ONLY_METHODS = set( + [ + "describe_op", + "file_manager", + "has_assets_def", + "get_mapping_key", + "get_step_execution_context", + "job_def", + "node_handle", + "op", + "op_config", + "op_def", + "op_handle", + "step_launcher", + "has_events", + "consume_events", + "log_event", + "get_asset_provenance", + ] +) + + +PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range or partition_key_range_for_asset_key instead" +INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly" +OUTPUT_METADATA_ALT = "return MaterializeResult from the asset instead" + +DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = { + "add_output_metadata": OUTPUT_METADATA_ALT, + "asset_key_for_input": INPUT_OUTPUT_ALT, + "asset_key_for_output": INPUT_OUTPUT_ALT, + "has_partition_key": "use is_partition_step instead.", + "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_key_range": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_time_window_for_output": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT, + "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT, + "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata", + "output_for_asset_key": INPUT_OUTPUT_ALT, + "selected_output_names": INPUT_OUTPUT_ALT, +} + +ALTERNATE_AVAILABLE_METHODS = { + "has_tag": "use dagster_run.has_tag instead", + "get_tag": "use dagster_run.tags.get instead", + "run_tags": "use dagster_run.tags instead", + "set_data_version": "use MaterializeResult instead", + "run": "use dagster_run instead", + "asset_check_spec": "use check_specs_by_asset_key instead", + "selected_asset_keys": "use asset_keys instead", +} + + +def _get_deprecation_kwargs(attr: str): + deprecation_kwargs = {"breaking_version": "1.7.0"} + deprecation_kwargs["subject"] = f"AssetExecutionContext.{attr}" + + if attr in OP_EXECUTION_CONTEXT_ONLY_METHODS: + deprecation_kwargs["additional_warn_text"] = ( + f"You have called the deprecated method {attr} on AssetExecutionContext. Use" + " the underlying OpExecutionContext instead by calling" + f" context.op_execution_context.{attr}." + ) + + if attr in DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS: + alt = DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS[attr] + deprecation_kwargs["additional_warn_text"] = ( + f"You have called method {attr} on AssetExecutionContext that is oriented" + f" around I/O managers. If you not using I/O managers we suggest you {alt}. If" + " you are using I/O managers the method still exists at" + f" context.op_execution_context.{attr}." + ) + + if attr in ALTERNATE_AVAILABLE_METHODS: + deprecation_kwargs["additional_warn_text"] = f"Instead {ALTERNATE_AVAILABLE_METHODS[attr]}." + + return deprecation_kwargs + + +class AssetExecutionContext(OpExecutionContext, IContext): + def __init__(self, op_execution_context: OpExecutionContext) -> None: + self._op_execution_context = check.inst_param( + op_execution_context, "op_execution_context", OpExecutionContext + ) + self._asset_keys = self._op_execution_context.selected_asset_keys + self._assets_def = self._op_execution_context.assets_def + + with disable_dagster_warnings(): + self._provenance_by_asset_key = { + key: self._op_execution_context.get_asset_provenance(key) + for key in self._asset_keys + } + self._code_version_by_asset_key = { + key: self._assets_def.code_versions_by_key[key] for key in self._asset_keys + } + self._check_specs_by_asset_key = { + check.asset_key: check for check in self._assets_def.check_specs_by_output_name.values() + } + + @public + @property + def op_execution_context(self) -> OpExecutionContext: + """An instance of OpExecutionContext created for the execution of this step.""" + return self._op_execution_context + + # IContext interface methods + @public + @property + def asset_key(self) -> AssetKey: + """The AssetKey for the current asset. Errors if the current step is not an asset, or is a + multi_asset. For multi_assets, use asset_keys. + """ + return self._op_execution_context.asset_key + + @property + def asset_keys(self) -> AbstractSet[AssetKey]: + """The set of AssetKeys that are being materialized by the current asset. Errors if the current + step is not an asset. + """ + return self._asset_keys + + @property + def provenance(self) -> Optional[DataProvenance]: + """The data provenance for the current asset. For multi_assets, use provenance_by_asset_key.""" + return self._provenance_by_asset_key[self.asset_key] + + @property + def provenance_by_asset_key(self) -> Mapping[AssetKey, Optional[DataProvenance]]: + """A dictionary of AssetKey: DataProvenance of the provenance of each asset being materialized + by the current asset. + """ + return self._provenance_by_asset_key + + @property + def code_version(self) -> Optional[str]: + """The code version for the current asset. For multi_assets, use code_version_by_asset_key.""" + return self.code_version_by_asset_key[self.asset_key] + + @property + def code_version_by_asset_key(self) -> Mapping[AssetKey, Optional[str]]: + """A dictionary of AssetKey: code version of the code version of each asset being materialized + by the current asset. + """ + return self._code_version_by_asset_key + + @public + @property + def is_partitioned(self) -> bool: + """True if the current execution is partitioned.""" + return self._op_execution_context.has_partition_key + + @public + @property + def run_id(self) -> str: + """The run id of the current execution.""" + return self._op_execution_context.run_id + + @property + def job_name(self) -> Optional[str]: + """The name of the currently executing job.""" + return self.op_execution_context.job_name + + @property + def retry_number(self) -> int: + """The number of attempted retries for this step.""" + return self.op_execution_context.retry_number + + # Additional methods + + @public + @property + def instance(self) -> DagsterInstance: + """DagsterInstance: The current Dagster instance.""" + return self._op_execution_context.instance + + @public + @property + def dagster_run(self) -> DagsterRun: + """PipelineRun: The current pipeline run.""" + return self._op_execution_context.dagster_run + + @public + @property + def pdb(self) -> ForkedPdb: + """An instance of pdb for debugging.""" + return self._op_execution_context.pdb + + @public + @property + def log(self) -> DagsterLogManager: + """DagsterLogManager: The log manager available in the execution context.""" + return self._op_execution_context.log + + @public + @property + def assets_def(self) -> AssetsDefinition: + """The AssetsDefinition for the current asset.""" + return self._assets_def + + @public + @property + def check_specs_by_asset_key(self) -> Mapping[AssetKey, AssetCheckSpec]: + """A dictionary of AssetKey: AssetCheckSpec of the AssetCheckSpec for each asset being materialized + by the current asset. + """ + return self._check_specs_by_asset_key + + @public + @property + def resources(self) -> Any: + """Resources: The currently available resources.""" + return self._op_execution_context.resources + + @public + @property + def run_config(self) -> Mapping[str, object]: + """dict: The run config for the current execution.""" + return self._op_execution_context.run_config + + # partition methods that may be marked deprecated once we have aligned on future partition methods + @property + def partition_key(self) -> str: + return self.op_execution_context.partition_key + + @public + @property + def partition_key_range(self) -> PartitionKeyRange: + return self._op_execution_context.partition_key_range + + @property + def partition_time_window(self) -> TimeWindow: + return self.op_execution_context.partition_time_window + + @public + def asset_partition_key_for_input(self, input_name: str) -> str: + return self.op_execution_context.asset_partition_key_for_input(input_name=input_name) + + @public + def asset_partition_key_for_output(self, output_name: str = "result") -> str: + return self.op_execution_context.asset_partition_key_for_output(output_name=output_name) + + @public + def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_input(input_name=input_name) + + @public + def asset_partition_key_range_for_output( + self, output_name: str = "result" + ) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range_for_output( + output_name=output_name + ) + + @public + @property + def has_partition_key(self) -> bool: + return self.op_execution_context.has_partition_key + + @public + @property + def asset_partition_key_range(self) -> PartitionKeyRange: + return self.op_execution_context.asset_partition_key_range + + @public + def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_input(input_name=input_name) + + @public + def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: + return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) + + @public + def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_input( + input_name=input_name + ) + + @public + def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: + return self.op_execution_context.asset_partitions_time_window_for_output( + output_name=output_name + ) + + @public + def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_input(input_name=input_name) + + @public + def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: + return self.op_execution_context.asset_partitions_def_for_output(output_name=output_name) + + # deprecated methods. All remaining methods on OpExecutionContext should be here with the + # appropriate deprecation warning + + @deprecated(**_get_deprecation_kwargs("selected_asset_keys")) + @property + def selected_asset_keys(self) -> AbstractSet[AssetKey]: + return self.op_execution_context.selected_asset_keys + + @deprecated(**_get_deprecation_kwargs("op_def")) + @property + def op_def(self) -> OpDefinition: + return self.op_execution_context.op_def + + @deprecated(**_get_deprecation_kwargs("op_config")) + @property + def op_config(self) -> Any: + return self.op_execution_context.op_config + + @deprecated(**_get_deprecation_kwargs("file_manager")) + @property + def file_manager(self): + return self.op_execution_context.file_manager + + @deprecated(**_get_deprecation_kwargs("has_assets_def")) + @property + def has_assets_def(self) -> bool: + return self.op_execution_context.has_assets_def + @deprecated(**_get_deprecation_kwargs("get_mapping_key")) + def get_mapping_key(self) -> Optional[str]: + return self.op_execution_context.get_mapping_key() + + @deprecated(**_get_deprecation_kwargs("job_def")) + @property + def job_def(self) -> JobDefinition: + return self.op_execution_context.job_def + + @deprecated(**_get_deprecation_kwargs("node_handle")) + @property + def node_handle(self) -> NodeHandle: + return self.op_execution_context.node_handle + + @deprecated(**_get_deprecation_kwargs("op")) + @property + def op(self) -> Node: + return self.op_execution_context.op + + @deprecated(**_get_deprecation_kwargs("describe_op")) + def describe_op(self): + return self.op_execution_context.describe_op() + + @deprecated(**_get_deprecation_kwargs("op_handle")) + @property + def op_handle(self) -> NodeHandle: + return self.op_execution_context.op_handle + + @deprecated(**_get_deprecation_kwargs("step_launcher")) + @property + def step_launcher(self) -> Optional[StepLauncher]: + return self.op_execution_context.step_launcher + + @deprecated(**_get_deprecation_kwargs("consume_events")) + def consume_events(self) -> Iterator[DagsterEvent]: + return self.op_execution_context.consume_events() + + @deprecated(**_get_deprecation_kwargs("add_output_metadata")) + def add_output_metadata( + self, + metadata: Mapping[str, Any], + output_name: Optional[str] = None, + mapping_key: Optional[str] = None, + ) -> None: + return self.op_execution_context.add_output_metadata( + metadata=metadata, output_name=output_name, mapping_key=mapping_key + ) + + @deprecated(**_get_deprecation_kwargs("asset_key_for_input")) + def asset_key_for_input(self, input_name: str) -> AssetKey: + return self.op_execution_context.asset_key_for_input(input_name=input_name) + + @deprecated(**_get_deprecation_kwargs("asset_key_for_output")) + def asset_key_for_output(self, output_name: str = "result") -> AssetKey: + return self.op_execution_context.asset_key_for_output(output_name=output_name) + + @deprecated(**_get_deprecation_kwargs("get_output_metadata")) + def get_output_metadata( + self, output_name: str, mapping_key: Optional[str] = None + ) -> Optional[Mapping[str, Any]]: + return self.op_execution_context.get_output_metadata( + output_name=output_name, mapping_key=mapping_key + ) + + @deprecated(**_get_deprecation_kwargs("output_for_asset_key")) + def output_for_asset_key(self, asset_key: AssetKey) -> str: + return self.op_execution_context.output_for_asset_key(asset_key=asset_key) + + @deprecated(**_get_deprecation_kwargs("selected_output_names")) + @property + def selected_output_names(self) -> AbstractSet[str]: + return self.op_execution_context.selected_output_names + + @deprecated(**_get_deprecation_kwargs("has_tag")) + def has_tag(self, key: str) -> bool: + return self.op_execution_context.has_tag(key=key) + + @deprecated(**_get_deprecation_kwargs("get_tag")) + def get_tag(self, key: str) -> Optional[str]: + return self.op_execution_context.get_tag(key=key) + + @property + @deprecated(**_get_deprecation_kwargs("run_tags")) + def run_tags(self) -> Mapping[str, str]: + return self.op_execution_context.run_tags + + @deprecated(**_get_deprecation_kwargs("set_data_version")) + def set_data_version(self, asset_key: AssetKey, data_version: DataVersion) -> None: + return self.op_execution_context.set_data_version( + asset_key=asset_key, data_version=data_version + ) + + @deprecated(**_get_deprecation_kwargs("run")) + @property + def run(self) -> DagsterRun: + return self.op_execution_context.run + + @deprecated(**_get_deprecation_kwargs("get_step_execution_context")) + def get_step_execution_context(self) -> StepExecutionContext: + return self.op_execution_context.get_step_execution_context() + + @deprecated(**_get_deprecation_kwargs("has_events")) + def has_events(self) -> bool: + return self.op_execution_context.has_events() + + @deprecated(**_get_deprecation_kwargs("asset_check_spec")) + @property + def asset_check_spec(self) -> AssetCheckSpec: + return self._op_execution_context.asset_check_spec + + @deprecated(**_get_deprecation_kwargs("log_event")) + def log_event(self, event: UserEvent) -> None: + return self._op_execution_context.log_event(event) + + @deprecated(**_get_deprecation_kwargs("get_asset_provenance")) + @experimental + def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: + return self._op_execution_context.get_asset_provenance(asset_key) def build_execution_context( step_context: StepExecutionContext,