Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Much simpler AssetExecutionContext #16417

Closed
wants to merge 19 commits into from

Conversation

schrockn
Copy link
Member

@schrockn schrockn commented Sep 9, 2023

Summary & Motivation

This PR is not met for commit, but instead to drive conversation. If we align on this, I will split it up into many smaller PRs.

The internal discussion (6704) around consolidating partition APIs as well as #16378 which documents the surface area of our partitioning APIs has convinced me that more radical change is required to get our APIs on our core context API back in control. The current API is a rat's nest beyond saving, especially in light of our de-emphasis of i/o managers.

This PR proposes that we repurpose the AssetExecutionContext type alias to instead be an entirely new wrapper class with a clean slate API.

On a temporary basis, we will make this context usable in all codepaths that expect OpExecutionContext. Will be do this by overriding __getattr__ and also adding a metaclass to OpExecutionContext to make instances of AssetExecutionContext pass isinstance(asset_execution_context, OpExecutionContext) which will make this object usable in old code paths.

The upside of this approach relative to subclassing is that this provides a much better in editor experience. The surface area of the API is far lower (14 methods instead of the >50 methods/properties on OpExecutionContext). The experience with vscode's typesahead speaks for itself:

Before:

Screenshot 2023-09-10 at 2 48 21 PM

After:

Screenshot 2023-09-10 at 2 33 41 PM

An unknown for me here is can we make this work with pyright. I suspect yes, but it accomplishing that goal exceeds by current level of knowledge/ability re: Python witchcraft. I suspect @smackesey can help with this. If we cannot, we can fall back to AssetExecutionContext as a subclass as a solution.

The actual API changes

The easiest way to understand the delta here is to look at this chunk of code added in the compatibility layer to detail which methods and properties will be moved and communicate clear deprecation warnings to users. This is an explicit list of all the methods and properties that we will eventually eliminated from the context object, and an indication of the alternative mechanism.

# have to do asset_execution_context.op_execution_context in order to get these
# some of these might have to be hoisted.
OP_EXECUTION_CONTEXT_ONLY_METHODS = set(
    [
        "describe_op",
        "file_manager",
        "has_assets_def",
        "get_mapping_key",
        "job_def",
        "job_name",
        "node_handle",
        "op",
        "op_config",
        "op_handle",
        "retry_number",
        "step_launcher",
    ]
)


PARTITION_KEY_RANGE_AS_ALT = "use partition_key_range instead"
INPUT_OUTPUT_ALT = "not use input or output names and instead use asset keys directly"
OUTPUT_METADATA_ALT = "return MaterializationResult from the asset instead"

# have to do asset_execution_context.op_execution_context in order to get these
DEPRECATED_IO_MANAGER_CENTRIC_CONTEXT_METHODS = {
    "add_output_metadata": OUTPUT_METADATA_ALT,
    "asset_key_for_input": INPUT_OUTPUT_ALT,
    "asset_key_for_output": INPUT_OUTPUT_ALT,
    "asset_partition_key_for_input": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partition_key_for_output": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partition_key_range_for_input": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partition_key_range_for_output": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partition_keys_for_input": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partition_keys_for_output": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partitions_time_window_for_input": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partitions_time_window_for_output": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partitions_def_for_input": PARTITION_KEY_RANGE_AS_ALT,
    "asset_partitions_def_for_output": PARTITION_KEY_RANGE_AS_ALT,
    "get_output_metadata": "use op_execution_context.op_def.get_output(...).metadata",
    "merge_output_metadata": OUTPUT_METADATA_ALT,
    "output_for_asset_key": INPUT_OUTPUT_ALT,
    "selected_output_names": INPUT_OUTPUT_ALT,
}

ALTERNATE_AVAILABLE_METHODS = {
    "has_tag": "use dagster_run.has_tag instead",
    "get_tag": "use dagster_run.get_tag instead",
    "run_tags": "use dagster_run.tags instead",
    "set_data_version": "use MaterializationResult instead",
}

Simple use cases don't need "input" and "output" anywhere. When paired with deps this ends up being much nicer. See this test case case:

def test_basic_daily_partitioning_two_assets() -> None:
    called = {"upstream": False, "downstream": False}
    partitions_def = DailyPartitionsDefinition(start_date="2020-01-01", end_date="2020-01-03")

    @asset(partitions_def=partitions_def)
    def upstream(context: AssetExecutionContext):
        assert context.partition_key_range.start == "2020-01-01"
        assert context.partition_key_range.end == "2020-01-02"
        called["upstream"] = True

    @asset(deps=[upstream], partitions_def=partitions_def)
    def downstream(context: AssetExecutionContext):
        assert context.partition_key_range.start == "2020-01-01"
        assert context.partition_key_range.end == "2020-01-02"
        called["downstream"] = True

    assert materialize_single_run_with_partition_key_range(
        [upstream, downstream], start="2020-01-01", end="2020-01-02"
    ).success

    assert called["upstream"]
    assert called["downstream"]

This also solves some other problems.

  • All context methods that refer to "inputs" and "outputs" are deprecated. For partitioned runs, partition_key_range gets the current range or partition_key_range_for_asset for getting information about upstreams replace all the "for_input" and "for_output" variants. This mapping layer introduces a ton of cognitive load for all users, and is completely bewildering if you are not using AssetIn or AssetOut at all.
  • add_output_metadata and get_output_metadata refer to completely different notions of metadata (materialization metadata and output definition metadata, respectively). There are now clearer APIs for this.
  • has_tag and get_tag got the tags of the run but there are many different entities that have tags. Instead the user gets the run first and then gets it tags, which is much clearer.
  • A bunch of methods are very specific to ops, to pushed them to live only on the underlying OpExecutionContext.

Follow up Work

  • Add unit tests to ensure that the entire public API surface of OpExecutionContext is either on AssetExecutionContext or in those sets and dictionaries that deprecate. This way any new methods added to OpExecutionContext must have to be accounted for here.
  • Make it so a user can iterate over partition keys from the context directly without materializing the entire list in memory at any one time. context.partition_keys
  • Add support for time windows, but not directly on the context. Instead this support should co-located with the partition definition types it is tightly coupled to
  • Possibly: add a global accessor for AssetExecutionContext so that the user can just call AssetExecutionContext.get() anywhere and get it, without having to thread it through. Prototyped by @alangenfeld here: add indirect execution context access #14954
  • We actually make it extremely difficult to launch runs with partition key ranges. We should support this in much more first-class way. See the chicanery in materialize_single_run_with_partition_key_range.

How I Tested These Changes

Included tests.

@schrockn
Copy link
Member Author

schrockn commented Sep 9, 2023

Current dependencies on/for this PR:

This comment was auto-generated by Graphite.

@schrockn schrockn force-pushed the asset-execution-context-take-two branch from 863fa1b to 90b2f65 Compare September 10, 2023 15:34
@schrockn schrockn changed the title Asset Execution Context exploraation RFC: Much simpler AssetExecutionContext Sep 10, 2023
@schrockn schrockn marked this pull request as ready for review September 10, 2023 19:01
# seems like there should be both "asset_keys" and "selected_asset_keys"
@property
def selected_asset_keys(self) -> AbstractSet[AssetKey]:
return self._op_execution_context.selected_asset_keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also likely will be adding selected_asset_checks here. This refactor might be an opportunity to condense them to some selection object

@petehunt
Copy link
Contributor

petehunt commented Sep 11, 2023

I like this a lot and think we absolutely have to solve this problem. When I put this on the roadmap originally I was hoping to get this level of simplification, as I think this is one of the big usability issues in the product today.

The specific things I like:

  1. It makes invalid states unrepresentable. Every asset can be run as a single-run backfill and this forces your code to be compatible.
  2. There is now 1 way to do things, as opposed to multiple APIs to access partition keys
  3. I don't have to know op-level concepts like inputs and outputs

However, I do think that we lose the ability to access the datetime of the partition start and end window. We also implicitly couple the body of the function to the partitions_def, as it will need to know the granularity of the partitioning in order to work with the partition range.

Here are a few options to solve these issues.

partitions_def = DailyPartitionsDefinition(...)
@asset(partitions_def=partitions_def)
def my_asset(context: AssetExecutionContext):
  # option 1: give the user parsed partition keys instead of strings
  for partition_key in context.partition_keys(): # iterator version of partition_key_range()
    # partition_key is a datetime 

  # option 2: give the user both parsed and unparsed partition keys
  for partition_key in context.partition_keys(): # iterator version of partition_key_range()
    # partition_key.raw is a string
    # partition_key.value is a datetime

  # option 3: give them the option to parse a string partition key as a datetime
  for partition_key in context.partition_keys():
    # partition_key.str contains the string representation
    # partition_key.datetime tries to parse the string as a datetime
    # partition_key.int tries to parse the string as an int
    # etc etc

  # option 4: don't mess with PartitionKeyRange and bundle it into the partitions def
  # least discoverable option, but has the best static typing
  for partition_key_datetime in partitions_def.partition_keys(context.partition_key_range()):
    ...

I favor option 1 or option 2

return self._op_execution_context.asset_partition_key_range

@public
def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be partition_key_range_for_dep()?

return self._op_execution_context.asset_partition_key_range

@public
def partition_key_range_for_asset_key(self, asset_key: AssetKey) -> PartitionKeyRange:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just leaving a note that making the partition methods agnostic to "input" or "output" lead to some pretty complex errors. Some of the methods (like the partition_key_range one) have different code paths for "inputs" and "outputs" because of partition mapping. Not worth trying to do all of that implementation detail in this RFC, but for a final implementation, we should make sure this code path is really thoroughly tested

@smackesey
Copy link
Collaborator

@jamiedemaria @schrockn I haven't reviewed this in detail but the Pyright errors are a result of passing AssetExecutionContext where an OpExecutionContext is expected. Because AssetExecutionContext is no longer a subclass of OpExecutionContext, we get type errors. Pyright (and probably every other static analyzer) doesn't understand metaclass __instancecheck__ magic.

While it is possible there is some magical way to make pyright understand, I doubt it and a search for __instancecheck__ on the issue tracker turns up nothing. So I think the solution is to make an alias, something like OpOrAssetExecutionContext = Union[OpExecutionContext, AssetExecutionContext], and update all code paths expecting OpExeuctionContext to expect OpOrAssetExecutionContext instead.

@schrockn
Copy link
Member Author

@jamiedemaria @schrockn I haven't reviewed this in detail but the Pyright errors are a result of passing AssetExecutionContext where an OpExecutionContext is expected. Because AssetExecutionContext is no longer a subclass of OpExecutionContext, we get type errors. Pyright (and probably every other static analyzer) doesn't understand metaclass __instancecheck__ magic.

While it is possible there is some magical way to make pyright understand, I doubt it and a search for __instancecheck__ on the issue tracker turns up nothing. So I think the solution is to make an alias, something like OpOrAssetExecutionContext = Union[OpExecutionContext, AssetExecutionContext], and update all code paths expecting OpExeuctionContext to expect OpOrAssetExecutionContext instead.

Yeah this doesn't fix user's code though, unfortunately.

}


class AssetExecutionContext:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread for figuring out why AssetExecutionContext was not made a subclass of OpExecutionContext to begin with.

Background:

  • AssetExecutionContext began as a type alias to align on naming and get a quick docs/examples improvement (here)
  • AssetExecutionContext was made a subclass of OpExecutionContext here
  • AssetExecutionContext was reverted back to a type alias here

In the revert PR, the reasoning for reverting was:

Conditions like:

* manually constructing AssetsDefinition with a manually written @op
* @ops that make up a graph backed AssetsDefinition

make having different context objects trickier for users than originally anticipated.

There is also a slack thread mentioning this where alex says

the wall I hit with trying to split AssetExecutionContext and OpExecutionContext was resolving what the ops in a graph-backed-asset should receive.

based on this, my interpretation is that the issue wasn't a technical one (python limitation, inability to pass the correct context through, etc), but more a design issue "What is the correct context for an @op in a @graph_backed_asset to receive?"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it. Thanks for digging that up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can figure out a reasonable solution for the graph-backed asset case. We could alter what instance the user gets based on their typehint. We could also make an asset_execution_context property on OpExecutionContext so you can do the reverse in that case.

@schrockn
Copy link
Member Author

Closing in favor of #16487

@schrockn schrockn closed this Sep 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants