Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan node #44038

Merged
merged 7 commits into from
Dec 10, 2024

Conversation

morningman
Copy link
Contributor

@morningman morningman commented Nov 15, 2024

What problem does this PR solve?

  1. Opt the parallelism when doing count push down optimization

    Count push down optimization is used to optimize queries such as select count(*) from table.
    In this scenario, we can directly obtain the number of rows through the row count statistics
    of the external table, or the metadata of the Parquet/ORC file,
    without reading the actual file content, thereby speeding up such queries.

    Currently, we support count push down optimization for Hive, Iceberg, and Paimon tables.
    There are two ways to obtain the number of rows:

    1. Obtain directly from statistics

      For Iceberg tables, we can obtain the number of rows directly from statistics.
      However, due to the historical issues of Iceberg, if there is position/equality delete in the table,
      this method cannot be used to prevent incorrect row count.
      In this case, it will degenerate to obtaining from the metadata of the file.

    2. Obtain from the metadata of the file

      For Hive, Paimon, and some of Iceberg tables, the number of rows can be obtained directly
      from the metadata of the Parquet/ORC file.
      For Text format tables, efficiency can also be improved by only performing row separation, without column separation.

    In the task splitting logic, for Count push-down optimization, the number of split tasks should comprehensively
    consider the file format, number of files, parallelism, number of BE nodes, and the Local Shuffle:

    1. Count push-down optimization should avoid Local Shuffle, so the number of split tasks should be greater than or equal to parallelism * number of BE nodes.
  2. Fix the incorrect logic of Count push-down optimization

    In the previous code, for Iceberg and Paimon tables, Count push-down optimization did not take effect because we did not push
    CountPushDown information to FileFormatReader inside TableForamtReader. This PR fixes this problem.

  3. Store SessionVaraible variables in FileQueryScanNode.

    SessionVaraible is a variable in ConnectionContext. And ConnectionContext is a ThreadLocal variable.
    In FileQueryScanNode, SessionVaraible may be accessed in other threads in some cases,
    so ThreadLocal variables may not be obtained.
    Therefore, the SessionVaraible reference is stored in FileQueryScanNode to prevent illegal access.

  4. Independent FileSplitter class.

    The FileSplitter class is a tool class that allows users to split Split according to different strategies.
    This PR does not modify the splitting strategy, but only extracts this part of the logic separately,
    to be able to perform logic optimization later.

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@doris-robot
Copy link

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

1 similar comment
Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

@morningman
Copy link
Contributor Author

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.48% (10006/26002)
Line Coverage: 29.51% (83906/284377)
Region Coverage: 28.60% (43113/150735)
Branch Coverage: 25.20% (21914/86974)
Coverage Report: http://coverage.selectdb-in.cc/coverage/b955b479e1cfe99d0e43b45f618aba83f82af0ca_b955b479e1cfe99d0e43b45f618aba83f82af0ca/report/index.html

@morningman
Copy link
Contributor Author

run buildall

@morningman morningman changed the title [opt](catalog) opt the count pushdown rule for iceberg/paimon/hive scan node [fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan node Dec 8, 2024
@morningman morningman marked this pull request as ready for review December 8, 2024 21:53
@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.49% (10007/26002)
Line Coverage: 29.51% (83914/284381)
Region Coverage: 28.61% (43128/150744)
Branch Coverage: 25.20% (21921/86976)
Coverage Report: http://coverage.selectdb-in.cc/coverage/0d6cc74a96542717523af72bb7fec831e48b5274_0d6cc74a96542717523af72bb7fec831e48b5274/report/index.html

@morningman
Copy link
Contributor Author

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.68% (10061/26013)
Line Coverage: 29.59% (84211/284574)
Region Coverage: 28.68% (43268/150843)
Branch Coverage: 25.24% (21973/87054)
Coverage Report: http://coverage.selectdb-in.cc/coverage/f4a3f49968151b46094b27c2d0045aae284e7ae4_f4a3f49968151b46094b27c2d0045aae284e7ae4/report/index.html

@morningman
Copy link
Contributor Author

run buildall

Copy link
Contributor

github-actions bot commented Dec 9, 2024

clang-tidy review says "All clean, LGTM! 👍"

Copy link
Contributor

github-actions bot commented Dec 9, 2024

PR approved by anyone and no changes requested.

1
3

fix

fix count

fix number backends

wait file filter
@morningman
Copy link
Contributor Author

run buildall

Copy link
Contributor

github-actions bot commented Dec 9, 2024

clang-tidy review says "All clean, LGTM! 👍"

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.80% (10103/26037)
Line Coverage: 29.70% (84716/285199)
Region Coverage: 28.77% (43480/151147)
Branch Coverage: 25.32% (22088/87228)
Coverage Report: http://coverage.selectdb-in.cc/coverage/4d3920f7e0ba13b006060d612a352fc1fbdde81d_4d3920f7e0ba13b006060d612a352fc1fbdde81d/report/index.html

Copy link
Contributor

@kaka11chen kaka11chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Dec 10, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

@morningman morningman merged commit 18dc92a into apache:master Dec 10, 2024
24 of 26 checks passed
morningman added a commit to morningman/doris that referenced this pull request Dec 10, 2024
…an node (apache#44038)

### What problem does this PR solve?

1. Opt the parallelism when doing count push down optimization

Count push down optimization is used to optimize queries such as `select
count(*) from table`.
In this scenario, we can directly obtain the number of rows through the
row count statistics
    of the external table, or the metadata of the Parquet/ORC file,
without reading the actual file content, thereby speeding up such
queries.

Currently, we support count push down optimization for Hive, Iceberg,
and Paimon tables.
    There are two ways to obtain the number of rows:

    1. Obtain directly from statistics

For Iceberg tables, we can obtain the number of rows directly from
statistics.
However, due to the historical issues of Iceberg, if there is
position/equality delete in the table,
        this method cannot be used to prevent incorrect row count.
In this case, it will degenerate to obtaining from the metadata of the
file.

    2. Obtain from the metadata of the file

For Hive, Paimon, and some of Iceberg tables, the number of rows can be
obtained directly
        from the metadata of the Parquet/ORC file.
For Text format tables, efficiency can also be improved by only
performing row separation, without column separation.

In the task splitting logic, for Count push-down optimization, the
number of split tasks should comprehensively
consider the file format, number of files, parallelism, number of BE
nodes, and the Local Shuffle:

1. Count push-down optimization should avoid Local Shuffle, so the
number of split tasks should be greater than or equal to `parallelism *
number of BE nodes`.

2. Fix the incorrect logic of Count push-down optimization

In the previous code, for Iceberg and Paimon tables, Count push-down
optimization did not take effect because we did not push
CountPushDown information to FileFormatReader inside TableForamtReader.
This PR fixes this problem.

3. Store SessionVaraible variables in FileQueryScanNode.

SessionVaraible is a variable in ConnectionContext. And
ConnectionContext is a ThreadLocal variable.
In FileQueryScanNode, SessionVaraible may be accessed in other threads
in some cases,
    so ThreadLocal variables may not be obtained.
Therefore, the SessionVaraible reference is stored in FileQueryScanNode
to prevent illegal access.

4. Independent FileSplitter class.

The FileSplitter class is a tool class that allows users to split
`Split` according to different strategies.
This PR does not modify the splitting strategy, but only extracts this
part of the logic separately,
    to be able to perform logic optimization later.
morningman added a commit to morningman/doris that referenced this pull request Dec 15, 2024
…an node (apache#44038)

### What problem does this PR solve?

1. Opt the parallelism when doing count push down optimization

Count push down optimization is used to optimize queries such as `select
count(*) from table`.
In this scenario, we can directly obtain the number of rows through the
row count statistics
    of the external table, or the metadata of the Parquet/ORC file,
without reading the actual file content, thereby speeding up such
queries.

Currently, we support count push down optimization for Hive, Iceberg,
and Paimon tables.
    There are two ways to obtain the number of rows:

    1. Obtain directly from statistics

For Iceberg tables, we can obtain the number of rows directly from
statistics.
However, due to the historical issues of Iceberg, if there is
position/equality delete in the table,
        this method cannot be used to prevent incorrect row count.
In this case, it will degenerate to obtaining from the metadata of the
file.

    2. Obtain from the metadata of the file

For Hive, Paimon, and some of Iceberg tables, the number of rows can be
obtained directly
        from the metadata of the Parquet/ORC file.
For Text format tables, efficiency can also be improved by only
performing row separation, without column separation.

In the task splitting logic, for Count push-down optimization, the
number of split tasks should comprehensively
consider the file format, number of files, parallelism, number of BE
nodes, and the Local Shuffle:

1. Count push-down optimization should avoid Local Shuffle, so the
number of split tasks should be greater than or equal to `parallelism *
number of BE nodes`.

2. Fix the incorrect logic of Count push-down optimization

In the previous code, for Iceberg and Paimon tables, Count push-down
optimization did not take effect because we did not push
CountPushDown information to FileFormatReader inside TableForamtReader.
This PR fixes this problem.

3. Store SessionVaraible variables in FileQueryScanNode.

SessionVaraible is a variable in ConnectionContext. And
ConnectionContext is a ThreadLocal variable.
In FileQueryScanNode, SessionVaraible may be accessed in other threads
in some cases,
    so ThreadLocal variables may not be obtained.
Therefore, the SessionVaraible reference is stored in FileQueryScanNode
to prevent illegal access.

4. Independent FileSplitter class.

The FileSplitter class is a tool class that allows users to split
`Split` according to different strategies.
This PR does not modify the splitting strategy, but only extracts this
part of the logic separately,
    to be able to perform logic optimization later.
morningman added a commit to morningman/doris that referenced this pull request Dec 17, 2024
…an node (apache#44038)

1. Opt the parallelism when doing count push down optimization

Count push down optimization is used to optimize queries such as `select
count(*) from table`.
In this scenario, we can directly obtain the number of rows through the
row count statistics
    of the external table, or the metadata of the Parquet/ORC file,
without reading the actual file content, thereby speeding up such
queries.

Currently, we support count push down optimization for Hive, Iceberg,
and Paimon tables.
    There are two ways to obtain the number of rows:

    1. Obtain directly from statistics

For Iceberg tables, we can obtain the number of rows directly from
statistics.
However, due to the historical issues of Iceberg, if there is
position/equality delete in the table,
        this method cannot be used to prevent incorrect row count.
In this case, it will degenerate to obtaining from the metadata of the
file.

    2. Obtain from the metadata of the file

For Hive, Paimon, and some of Iceberg tables, the number of rows can be
obtained directly
        from the metadata of the Parquet/ORC file.
For Text format tables, efficiency can also be improved by only
performing row separation, without column separation.

In the task splitting logic, for Count push-down optimization, the
number of split tasks should comprehensively
consider the file format, number of files, parallelism, number of BE
nodes, and the Local Shuffle:

1. Count push-down optimization should avoid Local Shuffle, so the
number of split tasks should be greater than or equal to `parallelism *
number of BE nodes`.

2. Fix the incorrect logic of Count push-down optimization

In the previous code, for Iceberg and Paimon tables, Count push-down
optimization did not take effect because we did not push
CountPushDown information to FileFormatReader inside TableForamtReader.
This PR fixes this problem.

3. Store SessionVaraible variables in FileQueryScanNode.

SessionVaraible is a variable in ConnectionContext. And
ConnectionContext is a ThreadLocal variable.
In FileQueryScanNode, SessionVaraible may be accessed in other threads
in some cases,
    so ThreadLocal variables may not be obtained.
Therefore, the SessionVaraible reference is stored in FileQueryScanNode
to prevent illegal access.

4. Independent FileSplitter class.

The FileSplitter class is a tool class that allows users to split
`Split` according to different strategies.
This PR does not modify the splitting strategy, but only extracts this
part of the logic separately,
    to be able to perform logic optimization later.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. dev/2.1.8-merged dev/3.0.4-merged reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants