Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support intra-operator parallelism #856

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open

Conversation

wangrunji0408
Copy link
Member

@wangrunji0408 wangrunji0408 commented Nov 24, 2024

This PR adds data partitioning and intra-operator parallelism.

graphviz-4

The performance of TPC-H improved on my M1 Pro (10 cores):

speedup

Seems resolve #748

Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
@wangrunji0408 wangrunji0408 requested a review from skyzh November 24, 2024 13:59
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
@skyzh
Copy link
Member

skyzh commented Nov 27, 2024

two quick questions: what is the schema plan node? and what is the definition of exchange node? is it the distribution of the child, or the expected distribution of the output node?

@wangrunji0408
Copy link
Member Author

what is the schema plan node?

The schema node is a virtual node that only changes the output schema of the child node. It was introduced to resolve a tricky issue in 2-phase aggregation.

Let's say we have a query: select sum(a) * 2 from t;

The original plan is:

Proj: sum(a) * 2
    Agg: sum(a)
        Scan: t(a)

After parallelization (by pushing down the ToParallel node), the Agg is transformed into a 2-phase aggregation:

Proj: sum(a) * 2
    Agg: sum(sum(a))
        Exchange: merge
            Agg: sum(a)
                Scan: t(a)

You may notice that the output schema of the Agg node is changed from sum(a) to sum(sum(a)). Therefore, the Proj node will throw an error when trying to resolve the physical column index of its expression sum(a).

So, in order to keep the schema unchanged, we can insert a Schema node between Proj and Agg:

Proj: sum(a) * 2
    Schema: sum(a)
        Agg: sum(sum(a))
            Exchange: merge
                Agg: sum(a)
                    Scan: t(a)

And the Schema node will be simply ignored when building executors.

@wangrunji0408
Copy link
Member Author

what is the definition of exchange node? is it the distribution of the child, or the expected distribution of the output node?

(exchange dist child)
where dist is the expected distribution of the output.
The child can have any distribution.

@wangrunji0408
Copy link
Member Author

By the way, after this optimization, the bottleneck of some queries (such as Q6) has shifted to table scan.
Next step it's critical to support parallel partition scan in the storage. 🥹

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

executor: inter-operator parallelism
2 participants