-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] consolidate partition methods on the context #16255
[RFC] consolidate partition methods on the context #16255
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
33be378
to
dc1e90c
Compare
bcb05ee
to
8c0def0
Compare
a8aeca5
to
58d6691
Compare
8c0def0
to
a4bc5a0
Compare
58d6691
to
735a7aa
Compare
a4bc5a0
to
e4d2d1c
Compare
|
||
@public | ||
def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: | ||
"""The time window for the partitions of the output asset. | ||
def partitions_time_window_for_asset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kinda odd to me that this one is plural partitions
. Any objection to renaming partition_time_window_for_asset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to keep the old one around and deprecate it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old one being asset_partitions_time_window_for_output
? I don't think we should rename that, and it's been moved to the bottom of the file and marked deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I misread what was going on 👍🏻
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah moving the methods around in the file made the diff really annoying. I'll stack a PR before this one that just moves the methods to clean things up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok stacked a temp PR before this one that should hopefully make the diff a bit better to read. It's still pretty beefy, but at least GitHub is showing the changes in a way that's truer to what the PR is doing
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-syr53iqsz-elementl.vercel.app Direct link to changed pages: |
""" | ||
asset_key = self.asset_key_for_output(output_name) | ||
return self.partitions_time_window_for_asset(asset_key) | ||
|
||
@deprecated( | ||
breaking_version="2.0", | ||
additional_warn_text="Use `partitions_time_window_for_asset` instead.", | ||
) | ||
@public | ||
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: | ||
"""Deprecated. Use partitions_time_window_for_asset instead. | ||
|
||
The time window for the partitions of the input asset. | ||
|
||
Raises an error if either of the following are true: | ||
- The input asset has no partitioning. | ||
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a | ||
MultiPartitionsDefinition with one time-partitioned dimension. | ||
""" | ||
asset_key = self.asset_key_for_input(input_name) | ||
return self.partitions_time_window_for_asset(asset_key, is_dependency=True) | ||
|
||
@deprecated( | ||
breaking_version="2.0", | ||
additional_warn_text=( | ||
"Use `partition_key_range` or `partition_key_range_for_asset` instead." | ||
), | ||
) | ||
@public | ||
def asset_partition_key_range_for_output( | ||
self, output_name: str = "result" | ||
) -> PartitionKeyRange: | ||
"""Deprecated. Use partition_key_range property or partition_key_range_for_asset instead. | ||
|
||
Return the PartitionKeyRange for the corresponding output. Errors if not present. | ||
""" | ||
asset_key = self.asset_key_for_output(output_name) | ||
return self.partition_key_range_for_asset(asset_key) | ||
|
||
@deprecated( | ||
breaking_version="2.0", additional_warn_text="Use `partition_key_range_for_asset` instead." | ||
) | ||
@public | ||
def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: | ||
"""Deprecated. Use partition_key_range_for_asset instead. | ||
|
||
Return the PartitionKeyRange for the corresponding input. Errors if there is more or less than one. | ||
""" | ||
asset_key = self.asset_key_for_input(input_name) | ||
return self.partition_key_range_for_asset(asset_key, is_dependency=True) | ||
|
||
@deprecated( | ||
breaking_version="2.0", | ||
additional_warn_text="Use `partitions_def` or `partitions_def_for_asset` instead.", | ||
) | ||
@public | ||
def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition: | ||
"""Deprecated. Use partitions_def property or partitions_def_for_asset instead. | ||
|
||
The PartitionsDefinition on the upstream asset corresponding to this input. | ||
""" | ||
asset_key = self.asset_key_for_output(output_name) | ||
return self.partitions_def_for_asset(asset_key) | ||
|
||
@deprecated( | ||
breaking_version="2.0", additional_warn_text="Use `partitions_def_for_asset` instead." | ||
) | ||
@public | ||
def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition: | ||
"""Deprecated. Use partitions_def_for_asset instead. | ||
|
||
The PartitionsDefinition on the upstream asset corresponding to this input. | ||
""" | ||
asset_key = self.asset_key_for_input(input_name) | ||
return self.partitions_def_for_asset(asset_key) | ||
|
||
@deprecated( | ||
breaking_version="2.0", additional_warn_text="Use `partition_keys_for_asset` instead." | ||
) | ||
@public | ||
def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: | ||
"""Deprecated. Use partition_keys_for_asset instead. | ||
|
||
Returns a list of the partition keys for the given output. | ||
""" | ||
asset_key = self.asset_key_for_output(output_name) | ||
return self.partition_keys_for_asset(asset_key) | ||
|
||
@deprecated( | ||
breaking_version="2.0", additional_warn_text="Use `partition_keys_for_asset` instead." | ||
) | ||
@public | ||
def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]: | ||
"""Deprecated. Use partition_keys_for_asset instead. | ||
|
||
Returns a list of the partition keys of the upstream asset corresponding to the | ||
given input. | ||
""" | ||
asset_key = self.asset_key_for_input(input_name) | ||
return self.partition_keys_for_asset(asset_key, is_dependency=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unclear that we will want to deprecate these as long as AssetOut
/ AssetIn
world is still supported. Since you define your objects in terms of output names there and the keys are derived for you it seem less than ideal to force users to translate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with alex. We're aren't completely divorcing ourselves from I/O manager, but we are making them more more "opt-in" and making non-io-manager land feel more first-class. For io-manager-forward users, this is a big regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. My main concern with keeping all of these different versions around is how confusing all of the names will become. Seeing three to four variations of a method (i.e. asset_partitions_keys_for_input()
, asset_partition_keys_for_output()
, partition_keys_for_asset()
, and partition_keys
property) would be so daunting and would require the user to learn about each of those methods before they can pick the correct one.
We could probably resolve some of this with good API docs:
- In the doc block for each method, indicate if it is intended for use with I/O managers. Maybe not in those words exactly, but give the user the ability to easily know if this method is relevant to them.
- Manually create sections in the API docs page that groups the "for_asset" methods together, the "for_output" methods together, and the "for_input" methods together. I think this kind of manual ordering of methods within a class is possible in sphinx, but I'm not 100% sure
- If we can do ^ type of manual ordering, we could present the methods in a way that goes from least complex to more complex. Right now it automatically sorts of the methods of the
context
alphabetically, so if you scroll through you are presented with the methods in a non-sensical order - alt docs organization would be to group by what the function does (partition_key, key range, time window, etc) and then have each option (for_asset, for_output, for_input) in the group
def asset_partition_key_for_output(self, output_name: str = "result") -> str: | ||
"""Returns the asset partition key for the given output. Defaults to "result", which is the | ||
name of the default output. | ||
def partition_key_for_asset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these for_asset methods have asset
be optional and work in the single asset case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that seems reasonable
"""Returns the asset partition key for the given output. Defaults to "result", which is the | ||
name of the default output. | ||
def partition_key_for_asset( | ||
self, asset: CoercibleToAssetKey, is_dependency: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the is_dependency
bool param here is to handle the self-dependent partition case. Basically when you have a self-dependent partition, if you said context.partition_key_for_asset("my_asset")
it's unclear if you want the partition key of my_asset
as a dependency, or as is being currently materialized. I'm handling this (for now at least) by allowing users to specify is_dependency
to indicate they want it loaded as a dependency.
In all other cases, the function should be able to figure out the correct partition key to load given the current asset
I'm not set on this solution, so if you have any concerns or other ideas let me know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you break this up a bit? I think the is_dependency
bit will require some discussion, but that shouldn't block the docs improvements
|
||
@public | ||
def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: | ||
"""The time window for the partitions of the output asset. | ||
def partitions_time_window_for_asset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to keep the old one around and deprecate it
def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow: | ||
"""The time window for the partitions of the output asset. | ||
def partitions_time_window_for_asset( | ||
self, asset: CoercibleToAssetKey, is_dependency: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these should take CoercibleToAssetKey
as an AssetsDefinition
with multiple keys would have to be a hard error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think cases like this present a pretty good argument that CoercibleToAssetKey
shouldn't be expanded to take AssetsDefinition
(as discussed here) and that a new type alias should be created to include AssetsDefinition
def asset_partitions_time_window_for_input(self, input_name: str = "result") -> TimeWindow: | ||
"""The time window for the partitions of the input asset. | ||
def partition_keys_for_asset( | ||
self, asset: CoercibleToAssetKey, is_dependency: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_dependency
seems like a rough name for this
Do we have a generalized way to describe this concept? Asset keys + partition mappings end up being a different sort of DAG, which you can visualize as a third dimension in the asset graph (imagine each partition being an entry in the z axis). How do we describe this elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 that the name sucks.
I'm not aware of any generalized way to describe this kind of thing. Maybe @clairelin135 has an idea from when the partition mapping stuff was first written?
da34a6a
to
70330f4
Compare
70330f4
to
181ff5d
Compare
6f998c0
to
39bb474
Compare
181ff5d
to
b2a5e9d
Compare
39bb474
to
cf95455
Compare
b2a5e9d
to
ff4246f
Compare
closing in favor of the PR for more complete simplification of |
Summary & Motivation
With the move away from I/O managers, inputs, and outputs, there is now a need for more generic partition methods on the context. In a I/O manager-less world, it doesn't make sense to call
asset_partition_key_for_input
as there is no input.When discussing the
AssetDep
API for specifyingPartitionMapping
s forAssetSpec
s, Alex, Sandy, Yuhan, and I also discussed the need for partition context methods that didn't use input/output terminology.In a parallel discussion Pete brought up similar concerns about the methods on the context. Based on this discussion, I think adding more general versions of the partition context methods would only add to confusion around which partition method to use when.
The PR explores what it would look like to completely remove the
input/output
partition methods and replace them with methods that takeCoercibleToAssetKey
instead. This is still in progress and lots of tests are failing, but i'm working on fixing themFeedback is appreciated!
Some things to note:
Union[CoercibleToAssetKey, AssetSpec]
and maybe includeSourceAsset
andAssetsDefinition
in there as well (although I'm not sure what the consequences of including the later two would be)dagster-ext
to try to make a single interface that both contexts implement. So the naming conventions are subject to change based on that interfaceHow I Tested These Changes
Not updating any unit tests in this PR to prove that the old versions of the methods work in the same way. I'm going to write a stacked PR that will change all of the callsites in our code base to the new methods to test those methods