Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

During write_parquet if Ray worker crashes invalid partial partition may be persisted #3576

Open
jpedrick-numeus opened this issue Dec 15, 2024 · 5 comments
Assignees
Labels
bug Something isn't working p1 Important to tackle soon, but preemptable by p0

Comments

@jpedrick-numeus
Copy link

Describe the bug

I'm running Daft + Ray on an instance with limited memory. When calling daft.DataFrame.write_parquet if workers crash for any reason, the partially written parquet files aren't cleaned up which causes problems in downstream processes. I, at minimum, need the entire write_parquet operation to return some kind of failed state. Ideally, however, would be that Daft would cleanup any potentially corrupted parquet files and retry the task.

To Reproduce

Run daft.DataFrame.write_parquet with too many threads for the memory available resulting in OOM crashes for Ray workers.

Expected behavior

Either being informed that the write operation is potentially corrupted from the return value(or thrown exception) or ensure that the write_parquet operation cleans up and potentially corrupted parts when a ray worker crashes.

Component(s)

Parquet

Additional context

No response

@jpedrick-numeus jpedrick-numeus added bug Something isn't working needs triage labels Dec 15, 2024
@andrewgazelka
Copy link
Contributor

Thanks for reporting this. Want to take a look @jaychia, @colin-ho ?

@andrewgazelka andrewgazelka added the p1 Important to tackle soon, but preemptable by p0 label Dec 15, 2024
@colin-ho
Copy link
Contributor

Hey @jpedrick-numeus, this should be possible to implement. We can implement a cleanup mechanism to detect duplicate/corrupted files written by daft upon a successful write operation.

Setting write_mode='overwrite' already does this, of course, it will also clean up all files that previously existed in the directory. We can implement a similar logic for write_mode='append' that only cleans up files written by daft.

@jpedrick-numeus
Copy link
Author

jpedrick-numeus commented Dec 16, 2024

Hi @colin-ho , thanks for the reply. I'm not quite sure what you're saying here. If I say "write_mode='overwrite'" Daft will cleanup partial writes from a crashed ray worker?

My intuition is that currently DataFrame.write_parquet() call would still have the corrupted parquet files, and I would need to call write_parquet(write_mode='overwrite') to clean them up?

@colin-ho
Copy link
Contributor

write_mode='overwrite' is designed to overwrite the directory, i.e. any existing files before the write will be removed.

The way write_mode='overwrite' works is:

  • Daft writes the new parquet files directly to the target directory
  • Once the files are successfully written, it removes all other files.
    This cleanup ensures only the successfully written files remain in the directory, which means any corrupted parquet files will also be removed.

You don't need to call df.write_parquet and then df.write_parquet(write_mode='overwrite'), just a single df.write_parquet(write_mode='overwrite') will work.

@jpedrick-numeus
Copy link
Author

Ok, interesting, I'll try that then.

@ccmao1130 ccmao1130 assigned andrewgazelka and unassigned colin-ho Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working p1 Important to tackle soon, but preemptable by p0
Projects
None yet
Development

No branches or pull requests

4 participants