Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UPath I/O managers] special case handling of None outputs #18820

Closed

Conversation

jamiedemaria
Copy link
Contributor

@jamiedemaria jamiedemaria commented Dec 19, 2023

Summary & Motivation

Long term, we’d like to have the following system re: I/O managers

  • Behavior that opts you into using the I/O manager:
    ** returning any non-None value from an asset or op
    ** doing a parameter-based dependency (def my_asset(upstream))
  • Behavior that does not use the I/O manager:
    ** returning None from an asset or op
    ** therefore you should set up dependencies using deps (Nothing dependencies for op).
    ** If you do a parameter-based dependency on an asset that returns None, the expectation is that you have met the contract of handle_output in some other way

The DB IO managers already error when None is returned, but the UPath I/O manager does store Nones, which means that this setup

@asset 
def upstream():
    return None

@asset 
def downstream(upstream):
    ....

will break if we go directly to not storing all returned Nones. However, creating a lot of files that just store None causes other problems:

  1. Vast number of vestigial files created in cases when the asset is not being loaded as an I/O-managed input. This is especially apparent for large backfills.
  2. Moving an asset from non-partitioned to partitioned can cause errors when the partitioned file path cannot exist

So as a workaround we will add special behavior to the UPath I/O manager so that None values will not be stored in the file system, but will still be loadable by the I/O manager in downstream assets. We will not add special behavior for ops because it conflicts with expectations for memoization. Once memoization is deprecated and removed we can add special casing for ops based on whether the expected output file exists. If it does not exist we can assume the output was None and provide None.

For assets:
In execute_step.py we will add a "marker" when a None value is handled. In UPath IO manager handle_output we will not store the None in the file system. Then in load_input, if the corresponding output has the "marker" we will automatically return None rather that reading from the file system. The "marker" will be stored as tags, which are surfaced as part of the data versioning code in #19324 so this does not add any additional reads to the db

Making these changes will allow us to continue supporting None values with the UPath I/O manager with no change in behavior, but also solve issues 1 and 2 described above.

To Do:

  • ensure logging messages about outputs are updated to accurately reflect the changes in this PR. For example, we should not log a path to a file when no file is created to store None

Notes:
reference PR #15611

How I Tested These Changes

@jamiedemaria
Copy link
Contributor Author

jamiedemaria commented Dec 19, 2023

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

Current dependencies on/for this PR:

This stack of pull requests is managed by Graphite.

@jamiedemaria jamiedemaria force-pushed the jamie/remove-nothing-special-casing branch from 9569d69 to bb786c6 Compare December 27, 2023 17:56
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch 2 times, most recently from e18a44d to 3ca7f0b Compare December 28, 2023 17:34
@jamiedemaria jamiedemaria changed the base branch from jamie/remove-nothing-special-casing to jamie/upstream-metadata December 28, 2023 17:34
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch 2 times, most recently from be728b5 to 067f7f6 Compare December 28, 2023 18:48
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 70b8610 to 9051a62 Compare January 2, 2024 20:02
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from 25d63d1 to c97a875 Compare January 2, 2024 20:02
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 9051a62 to 5cb3793 Compare January 2, 2024 21:11
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from c97a875 to b12f2f7 Compare January 2, 2024 21:11
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 5cb3793 to 56a2790 Compare January 3, 2024 00:06
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from 0bee290 to 0ac8116 Compare January 3, 2024 00:06
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 56a2790 to 047325b Compare January 3, 2024 01:39
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from 0ac8116 to f3c472d Compare January 3, 2024 01:39
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 047325b to 75d69d2 Compare January 3, 2024 20:51
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from f3c472d to ae0ed4f Compare January 3, 2024 20:52
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 75d69d2 to b7aba1e Compare January 4, 2024 00:09
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from ae0ed4f to d58f206 Compare January 4, 2024 00:09
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from b7aba1e to 3f1cf4e Compare January 4, 2024 16:08
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from d58f206 to 44a3567 Compare January 4, 2024 16:08
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 3f1cf4e to 19eee0e Compare January 4, 2024 16:10
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from 44a3567 to ca41494 Compare January 4, 2024 16:11
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-metadata branch from 19eee0e to 963ec15 Compare January 5, 2024 18:21
@@ -979,7 +979,7 @@ def build_memoized_plan(
resources=resources,
version=step_output_versions[step_output_handle],
)
if not io_manager.has_output(context):
if not io_manager.has_output(context): # TODO - this is the problem.
Copy link
Contributor Author

@jamiedemaria jamiedemaria Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update has_output to check for the tag so that assets are supported for memoization, if supporting memoization for assets is even a thing we do

@jamiedemaria jamiedemaria force-pushed the jamie/upstream-materialization-tags branch from febab28 to cfcc61b Compare January 31, 2024 21:57
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from ef85e4f to 23102ab Compare January 31, 2024 21:57
@jamiedemaria jamiedemaria force-pushed the jamie/upstream-materialization-tags branch from cfcc61b to 434eeb8 Compare February 2, 2024 19:51
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from 5e898b9 to dce20ac Compare February 2, 2024 19:51
@jamiedemaria jamiedemaria force-pushed the jamie/upath-skip-none branch from dce20ac to ddf1bc8 Compare February 5, 2024 15:27
@jamiedemaria
Copy link
Contributor Author

jamiedemaria commented Feb 6, 2024

Putting the decision history for this PR's design here for future reference if we revive this effort:

Problem statement:

The UPath I/O manager stores None outputs in files and can load those None s as inputs in downstream assets. This causes two issues:

  1. Potentially creates lots of None files on the FS, potentially confusing for users who aren’t using I/O managers
  2. For users who aren’t using I/O managers, moving a non-partitioned asset to a partitioned asset will cause errors. A non-partitioned my_asset will make the file $DAGSTER_HOME/storage/my_asset but the partitioned asset will try to make the file $DAGSTER_HOME/storage/my_asset/2024-01-02 which will cause an error since my_asset is a file not a directory.

We want to modify the UPath I/O manager so that it would not create files for None outputs, but would still be able to load None s in downstream assets.

Solution 1: Use the existence of a file to determine if the asset was None . This has two problems:

  1. The file not existing could mean the asset was never materialized. We’d need to query the event log to ensure the asset had been materialized before returning None
  2. If an asset had been materialized with a non-None value, then later materialized with a None value, the original file would not be overwritten. At load time, the old non-None value would be loaded.
  3. For ops, issues 1 and 2 don’t exist since the files are in a new directory for each run. But memoization relies on the existence of a file to determine if an op should rerun, so all ops that return None would always be rerun.

Solution 1 seemed bad, so we came up with…

Solution 2: Mark that the output was None on the corresponding AssetMaterialization / OutputHandled event

At load time, we can piggyback on the versioning code path to fetch the AssetMaterialization for the input assets. We can then check if this mark exists, and if so provide a None

Implementation 1: add this mark on the metadata for the AssetMaterialization /OutputHandled

Issues:

  1. No code path exists for pre-fetching OutputHandled events for ops, so this would involve adding new “critical read” DB access during load_input and would require new indexes on the DB to get the OutputHandled event for a specific run id.
    1. solution - continue to store None returned from ops until memorization is deprecated and we can use file existence to determine None values.
  2. in the versioning code path, the full AssetMaterialization is not fetched for partitioned assets, instead just the tags are fetched

Implementation 2: add this mark on the tags of the AssetMaterialization

This avoids issue 2 from above, but still has issues

  1. The versioning code doesn’t fetch tags when more than 100 partitions are mapped for a particular input
    1. potential solution: When more than 100 partitions are mapped to an input, still fetch the none-output-tag in load_input . Would need to do this in batches so we don’t overwhelm the db. We can’t fetch the tag for each partition when we load the file for that particular partition since the load function is async, so we’d need to async-ify the db query, which we aren’t going to do.

More issues:

  1. The versioning code is not called for asset checks, but we still need the None tag for asset checks. A potential solution is to also fetch the tags when an asset check is running (but not run the rest of the versioning code)
  2. The defs.load_asset_value doesn’t have access to the tags either. We would need to fetch the tags when this is called.

If this does get revived, I'd recommend considering going directly into a deprecation cycle for storing None outputs. This metadata/tag shenanigans felt like it would add a lot of code smell and lead to confusion down the road as to what this code was for, and uncertainty about whether it could be deleted. It also relied heavily on assumptions about how AssetMaterializations are emitted for range-partition backfills. We require one AssetMaterialization per partition in the range, and that is a constraint that may not hold in the future. We can defend against this with unit tests, but it is still a fragility in the system and a non-intuitive code dependency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant