Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join on deltalake partitioned DataFrames poor performance #3600

Open
jpedrick-numeus opened this issue Dec 18, 2024 · 5 comments
Open

Join on deltalake partitioned DataFrames poor performance #3600

jpedrick-numeus opened this issue Dec 18, 2024 · 5 comments
Labels
data-catalogs Related to support for Data Catalogs p2 (backlog) Nice to have features performance (legacy) Please use the "perf" label instead

Comments

@jpedrick-numeus
Copy link

Describe the bug

I'm joining an S3 inventory report of Hive structured data(using Ray), that I've translated into a deltalake partitioned by "bucket" and "part0", where "part0" is the first feature of the hive structure. Regardless of the specifics of the format, I'm joining it with another deltalake that has identical columns and partitioning. The resulting join is significantly slower if I run it as a whole, compared to running it in pieces using a filter. There are around 2500 partitions. I haven't timed the performance, but it feels like some kind of n*n operation, where the second join with filters is equivalent to n / 2500.

This seems off, since I would expect the deltalake partitioning to give the join operation's query planner/optimizer enough information to transform the first(without filtering) into the second.

extremely slow:

s3_df = daft.read_deltalake(...)
metadata_df = daft.read_deltalake(...)
joined = s3_df.join(
           metadata_df
           on=['bucket', 'part0', 'part1', 'part2', ]
           how='outer',
        ).write_parquet(...)

acceptable performance:

s3_df = daft.read_deltalake(...)
metadata_df = daft.read_deltalake(...)
for bucket, part0 in parts:
        partition_filter = (daft.col('bucket') == bucket) & (
            daft.col('part0') == part0
        )
        joined = s3_df.filter(partition_filter).join(
           metadata_df.filter(partition_filter), 
           on=['bucket', 'part0', 'part1', 'part2', ]
           how='outer',
        ).write_parquet(...)

To Reproduce

Join two dataframes with identical partitioning where the partition columns are the join-by columns without pre-filtering and write the resulting dataframe. In the fast case case, pre-filter the joins and write the data in pieces.

Expected behavior

These two joins should perform somewhat similarly.

Component(s)

SQL, Ray Runner, Parquet, Other

Additional context

I'm running on a single node Ray cluster w/ pretty slow EBS/Disk IO constraints. I suspect part of the issue is the scope and scale of repartitioning done for the join that results in a large amount of disk caching(also not ideal).

@jpedrick-numeus jpedrick-numeus added bug Something isn't working needs triage labels Dec 18, 2024
@raunakab
Copy link
Contributor

Hey @jpedrick-numeus, thanks for opening the issue! I'm taking a look through it and trying to understand the problem.

@raunakab
Copy link
Contributor

Roping @kevinzwang into the discussion since he's worked with Deltalake in the past.

@kevinzwang
Copy link
Member

Hi @jpedrick-numeus , thanks for raising this up. Although it is true that your Delta Lake partitioning scheme allows for a more efficient join, Daft currently does not have the logic to take advantage of that and instead still does a shuffle on the whole tables during the join.

This is definitely an optimization we are looking to do in the future, so I'm glad to see that there is interest in it! @jaychia I believe you have been thinking about this too, do you have anything to add?

@kevinzwang kevinzwang added p2 (backlog) Nice to have features and removed bug Something isn't working labels Dec 18, 2024
@jaychia
Copy link
Contributor

jaychia commented Dec 19, 2024

Yes great catch @jpedrick-numeus ! Was actually just chatting with @samster25 yesterday about this.

This is what's called a storage-partitioned join. Here is the Iceberg proposal for it: https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit?tab=t.0#heading=h.82w8qxfl2uwl

For Daft, we should add a logical notion of partitioning that "groups" sets of physical MicroPartitions. This will let us perform a per-logical-partition join without shuffling between logical partitions!

@jpedrick-numeus
Copy link
Author

@jaychia @kevinzwang thanks for the quick reply glad to hear this is on the roadmap! I suspect this kind of fundamental operation will be valuable to many users of Daft, so I hope it can get prioritized.

@jaychia jaychia added the data-catalogs Related to support for Data Catalogs label Dec 20, 2024
@kevinzwang kevinzwang added performance (legacy) Please use the "perf" label instead and removed perf labels Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-catalogs Related to support for Data Catalogs p2 (backlog) Nice to have features performance (legacy) Please use the "perf" label instead
Projects
None yet
Development

No branches or pull requests

4 participants