-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add operators to support duplicate eliminated joins #695
base: main
Are you sure you want to change the base?
Conversation
ACTION NEEDED Substrait follows the Conventional Commits The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, thanks for creating this PR. I appreciate the time and effort to describe these concepts. I'm struggling to understand the linked paper but I've spent an hour on it and have to move on so I'll do my best at paraphrasing my understanding and hope you can help me. I suspect there is a good chance I am completely misunderstanding.
Does this sound correct (I'm not asking you to describe it this way, I'm just checking my understanding, by hopefully rephrasing the definition in a much more mechanical way):
The duplicate eliminated join operator wraps a normal hash equijoin operator and has two outputs. The first output is the regular join output. The second output is the keys of the hashtable created on the build side.
In addition, this operator must consume the build side (and create the hashtable) first (before consuming the probe side) because the hashtable output will be used to generate the probe side input.
The duplicate eliminated get operator takes a duplicate eliminated join operator as input. The output of this operator is the hashtable from the duplicate eliminated join operator.
@@ -47,7 +47,45 @@ The nested loop join operator does a join by holding the entire right input and | |||
| Join Expression | A boolean condition that describes whether each record from the left set "match" the record from the right set. | Optional. Defaults to true (a Cartesian join). | | |||
| Join Type | One of the join types defined in the Join operator. | Required | | |||
|
|||
|
|||
## Duplicate Eliminated Join Operator | |||
The Duplicate Eliminated Join, along with the [Duplicate Eliminated Get Operator](physical_relations.md#duplicate-eliminated-get-operator) are the two necessary operators that enable general subquery unnesting. (See the [Unnesting Arbitrary Queries](https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf) paper for more information.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are the two necessary operators that enable general subquery unnesting
I'm not entirely sure I understand (but this is not surprising as I am not an expert in relational algebra and had difficulty understanding the paper). From my read of the paper it seems that general unnesting can be used to convert a query with dependent joins into a query without them. Duplicate elimnated joins seem to be an optimization that is useful to simplify plans created by generate unnesting but not strictly needed to enable it.
Also, duplicate eliminated joins seems to be a general optimization and not specific to query unnesting. Though perhaps it is mostly useful in that context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The paper is indeed very difficult to understand. There is also a video from @Mytherin explaining the topic.
The duplicate eliminated join is not only an optimization, but rather a necessary technique to get rid of Dependent Joins
. In some cases you don't need a duplicate eliminated join to de-correlate but on others they are necessary.
I'm not sure what you mean by:
"Also, duplicate eliminated joins seems to be a general optimization and not specific to query unnesting. Though perhaps it is mostly useful in that context."
I'm not aware of other scenarios other than correlated subqueries.
## Duplicate Eliminated Join Operator | ||
The Duplicate Eliminated Join, along with the [Duplicate Eliminated Get Operator](physical_relations.md#duplicate-eliminated-get-operator) are the two necessary operators that enable general subquery unnesting. (See the [Unnesting Arbitrary Queries](https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf) paper for more information.) | ||
|
||
The Duplicate Eliminated Join is essentially a [Regular Join Operator](logical_relations.md#join-operator). It can have any regular join type, and its execution is the same. The main difference is that one of its children has, somewhere in its subtree, a dependency on the deduplicated result of the other. Therefore, this operator pushes the deduplicated result to its dependent child via the Duplicate Eliminated Get Operator. The side that will be deduplicated is specified in the Duplicate Eliminated Side property. The other side is the one that depends on the deduplication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm following things correctly, the goal here is to take the hashtable from the build side and push that hash table into branch that calculates the probe input?
Also, we keep talking about "deduplicated result" but we are only talking about the key columns and not the entire column set correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it's not the hastable but actually the deduplicated side. You can see that the Duplicate Eliminated Get
has a repeated Expression.FieldReference column_ids = 3;
representing the deduplicated columns.
| Duplicate Eliminated Side | The side that is deduplicated and pushed into the other side. | Required | | ||
|
||
## Duplicate Eliminated Get Operator | ||
An operator that takes as its input the result of the deduplicated side of the Duplicate Eliminated Join. It simply scans the input and outputs the deduplicated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the input to this relation? The deduplicated join relation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is the deduplicated side of the Duplicate Eliminated Join
Yes, that sounds about right, you could think that the hashtable result is being pushed into the
Yes! The probe side contains the
It's not only the hashtable, but rather the deduplicated side of the |
@westonpace, Many thanks for taking the time to review the PR. Unnesting arbitrary subqueries is a difficult topic in itself, and unfortunately, the available material on the subject is somewhat lacking. However, the original paper and Mark's presentations are by far the two best resources for understanding it. Please let me know if I answered your questions and, if so, whether you would like any rephrasing of the text. |
@pdet I watched the video, it is very helpful. I think I'm close to understanding what is going on now.
I understand that key columns are deduplicated during a hash join. I'm not sure I understand how non-key columns are necessarily deduplicated. Is the duplicate eliminating join doing more work to satisfy these non-key columns? For example, when collecting some payload column P does the duplicate eliminating join store P in the hashtable as normal payload and then ALSO store P in a hashset that it can later return to duplicate eliminated get operator? |
Only the join-keys are deduplicated. There is not a scenario where the non-key columns are deduplicated. For reference, in plan_delim_join.cpp you can see that |
Got it, I understand the operator now.
For my opinion, I like this how you have it (as a separate relation) because I don't think this is a "standard logical operator" but more a "physical operator". I think there will be high level query engines / planners / producers that never interact with or support this join type. This is the fist example, so far, of a relation having multiple outputs. So I'd be interested to hear others opinions on the topic. I can think of several other ways to tackle the issue (e.g. don't require duplicate eliminated get to use a reference rel, create some "output numbering" scheme for multiple outputs so duplicate eliminated get rel isn't needed at all, etc. etc.) That being said, for a new physical operator, I don't want to hold up the PR while we have esoteric discussions about what multiple outputs should look like. My primary concern at the moment is that the wording is a little confusing. I think we can focus more on the "what" and less on the "why". I'll make a new review with this emphasis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made some wording suggestions. I do believe there are lots of ways this particular challenge could be completed. For example, we could split a hash join rel into two relations (build rel and probe rel) and avoid the cycle. Or, we could keep the current approach but remove all "duplicate eliminated" phrasing and just say that a hash equijoin is allowed to emit some columns of its hashtable as a second output that is emit whenever the build is done.
However, I think we should have a bias towards action and towards people that are doing the work. This is especially so for these more physical / specialized relations (which may not be adopted by multiple engines for some time) and so I'm willing to approve once the wording is clear.
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
@westonpace I've removed the column references from the Regarding the Thanks again for all the time you invested reviewing the PR and your suggestions; I highly appreciate them! |
I haven't yet reviewed this and would like some time to do so. |
Can you provide an example plan with these operators as you see them used. I'm struggling with seeing these two relations as set relations but want to make sure I see them used in context first. Thanks! |
@jacques-n Here's a plan along with the steps I used to get DuckDB to spit it out: |
Will take a look @EpsilonPrime . Do we have a substrait example plan? |
They are not. They are physical optimizations. In the same way a hash equi-join doesn't really look like a set relation. The logical set operation is "remove all duplicates". X columns go in. X columns go out. Every row of duplicates is removed. You can represent this logically in at least two ways (a set relation with SET_OP_UNION_DISTINCT which has two inputs and they are the same or a self join). The optimization is just reusing the hashtable from an existing hash-join that already has the columns you want to |
@pdet how should this be implemented if the deduplicated side is the probe side? How does the hash join deduplicate the probe side? |
The substrait that DuckDB would generate for the TPC-H Q02 that @EpsilonPrime shared would be the following: |
In practice a hash aggregate is created regardless of the deduplicated side being the probe or the build side. The As shown below, the input chunk goes into two SinkResultType PhysicalRightDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk,
OperatorSinkInput &input) const {
auto &lstate = input.local_state.Cast<RightDelimJoinLocalState>();
OperatorSinkInput join_sink_input {*join->sink_state, *lstate.join_state, input.interrupt_state};
join->Sink(context, chunk, join_sink_input);
OperatorSinkInput distinct_sink_input {*distinct->sink_state, *lstate.distinct_state, input.interrupt_state};
distinct->Sink(context, chunk, distinct_sink_input);
return SinkResultType::NEED_MORE_INPUT;
} Similarly, when the deduplicated side is the probe side, it goes into a distinct, but also to a column data collection. SinkResultType PhysicalLeftDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk,
OperatorSinkInput &input) const {
auto &lstate = input.local_state.Cast<LeftDelimJoinLocalState>();
lstate.lhs_data.Append(lstate.append_state, chunk);
OperatorSinkInput distinct_sink_input {*distinct->sink_state, *lstate.distinct_state, input.interrupt_state};
distinct->Sink(context, chunk, distinct_sink_input);
return SinkResultType::NEED_MORE_INPUT;
} |
I've spent some time reviewing this and I'd like to state my understanding: There are two new operators. A more generic name for each operator might be: HashEquiJoin_SaveBuildHashTableKeys The HashEquiJoin will save the build hash table keys and the AggregateDistinct will read the saved hash table keys from elsewhere as opposed to actually reading it's input. I have often referred this to a sideband operation: an operator communicates a secondary dataset with another operator through a path other than the DAG. I've seen this pattern used for examples like this, agg+sort, and dynamic partition pruning/bloomfilter pushdown. I think my main question is what are our goals doing this in Substrait core? I'll make the assumption that we're trying to make this as compatible as possible (which is why we're doing in core as opposed to via a 3rd party extension). Given that, I think it is best to look at the two variations on normal operations as optimizations. If we think about it that way, I think we can actually define these as simple optimization extensions. One optimization extension would be used for the existing HashEquiJoinRel that is "save the build side keys to sideband:". The second one would be an optimization extension for AggregateRel that would be "use the output from sideband: instead of reading from your input". If we followed this pattern, it means that a plan from duckdb could be used in other systems (they could simply ignore the optimization extensions) and a plan from other systems could be run in duckdb (whether it used the optimization extension or not). Additionally, this pattern should roundtrip in duckdb with full fidelity. (I believe that this would also be a relatively simple thing to implement in the DuckDB Substrait extension... e.g. a simple transformation from the current proposed format in both directions) Thoughts? |
I like the idea of the reusable operators (although I think of them as AggregateDistinct_SaveBuildHashTableKeys and HashEquiJoin_ReadSavedHashTableFromElsewhere). We could add an optimization metadata to HashEquiJoin (where to get the saved hash table) as the implementation of one of them. Which means we probably could do something similar for the aggregation (adding a metadata to save the hash data). The format of the hash table could be left to the engine if we don't want to do cross-system optimization (which would also require defining where to store it along with what it looks like). The end proposal would look more like "save a numbered hashtable" and "use a numbered hashtable" for any such operations that could take advantage of it. One issue with ignoring the optimizations may be correctness (such as TPC-H 15's rounding error problem with floats caused by computing the same subtree twice but with a potential different rounding result. But that's an existing problem that can be solved elsewhere (such as with Decimal) or by using ReferenceRel. The other is potentially performance. I'm not sure but a plan rewritten to be more efficient with these optimizations may end up being much slower on systems without it. |
I agree that your version makes more sense given data flow. Unfortunately the nature of the hashtable in the join is that has more information than a hash table in a distinct op (each join hash table record links to the carry-along records whereas the distinct op does not maintain those links). Given that, the hashtable from the join can help the aggregate but the hashtable from the aggregate can't help the join (basically). At least that has been my experience.
I don't see an issue with correctness. Beyond the float comment, where do you see that being an issue?
float accuracy is generally unreliable--I don't think there is anything special here. Any operation that is shred into multiple parallel ops (be it threads, machines, etc) with float will give a different result depending on the order of parallel roll-ups (which are indeterminate in most systems). Every multithreaded system I've worked with allow that variation even for multiple executions of the same plan.
This doesn't seem like a problem. Optimizations by definition are something that if you ignore them, you're likely to perform slower. |
I haven't added any documentation but do optimization hints as described in #705 seem like they would suffice as a replacement for these two new operators? |
This PR implements the
DuplicateEliminatedGetRel
and theDuplicateEliminatedJoinRel
. Both relations are necessary to support duplicate eliminated joins, which is a join type necessary for unnesting arbitrary subqueries.They are introduced in-depth in the unnesting arbitrary subqueries paper.
I also have a POC PR for the DuckDB substrait repo, which already appropriately round-trips queries with the definitions proposed here.
The main question I have is if it is more desirable to have the
DuplicateEliminatedJoinRel
as a separate relation or if it attributes should be merged intojoinrel.
For clarity/reference, in DuckDB the Duplicate Eliminated Join, is literally the LogicalComparisonJoin with a
LogicalOperatorType::LOGICAL_DELIM_JOIN
.The possible join types for the logical operator type are:
The comparison Join only uses the following types:
Hence, a different possibility would be to add an enum to joinrel with only
LOGICAL_DELIM_JOIN
andLOGICAL_COMPARISON_JOIN
for now.