Skip to content

Commit

Permalink
[enhance](count) support push down count(1) on mow table
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 21, 2023
1 parent f3bf26c commit a1c0304
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 8 deletions.
10 changes: 9 additions & 1 deletion be/src/vec/olap/vgeneric_iterators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "vec/olap/vgeneric_iterators.h"
#include <glog/logging.h>

#include <algorithm>
#include <memory>
Expand Down Expand Up @@ -45,7 +46,11 @@ namespace vectorized {
Status VStatisticsIterator::init(const StorageReadOptions& opts) {
if (!_init) {
_push_down_agg_type_opt = opts.push_down_agg_type_opt;

auto segment_id = _segment->id();
if (opts.delete_bitmap.contains(segment_id)) {
auto cur_delete_bitmap = opts.delete_bitmap.at(segment_id);
_delete_map_rows = _delete_map_rows + cur_delete_bitmap->cardinality();
}
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
auto unique_id = _schema.column(cid)->unique_id();
Expand All @@ -58,6 +63,8 @@ Status VStatisticsIterator::init(const StorageReadOptions& opts) {
}

_target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows();
DCHECK_GE(_target_rows, _delete_map_rows) <<_target_rows<<" "<<_delete_map_rows;
_target_rows = _target_rows - _delete_map_rows;
_init = true;
}

Expand All @@ -75,6 +82,7 @@ Status VStatisticsIterator::next_batch(Block* block) {
: std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
LOG(INFO)<<"_target_rows: _output_rows : size: "<<_target_rows<<" "<<_output_rows<<" "<<size;
for (int i = 0; i < block->columns(); ++i) {
columns[i]->resize(size);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/olap/vgeneric_iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class VStatisticsIterator : public RowwiseIterator {
TPushAggOp::type _push_down_agg_type_opt;
std::map<int32_t, std::unique_ptr<ColumnIterator>> _column_iterators_map;
std::vector<ColumnIterator*> _column_iterators;
int _delete_map_rows = 0;

static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,10 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
}

if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
OlapTable table = ((LogicalOlapScan) logicalScan).getTable();
KeysType keysType = table.getKeysType();
boolean isMOWTable = (keysType == KeysType.UNIQUE_KEYS) && (table.getEnableUniqueKeyMergeOnWrite());
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS && !isMOWTable) {
return canNotPush;
}
}
Expand All @@ -363,7 +365,8 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
}
if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) {
//before have check, UNIQUE_KEYS and NOT MOW canNotPush
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS && keysType != KeysType.UNIQUE_KEYS) {
return canNotPush;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,12 +1690,13 @@ public StatsDelta genStatsDelta() throws AnalysisException {
@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
KeysType type = getOlapTable().getKeysType();
if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) {
//mow table could push count function
if ((type == KeysType.UNIQUE_KEYS && !getOlapTable().getEnableUniqueKeyMergeOnWrite()) || type == KeysType.PRIMARY_KEYS) {
return false;
}

String aggFunctionName = aggExpr.getFnName().getFunction();
if (aggFunctionName.equalsIgnoreCase("COUNT") && type != KeysType.DUP_KEYS) {
if (aggFunctionName.equalsIgnoreCase("COUNT") && type != KeysType.DUP_KEYS && type != KeysType.UNIQUE_KEYS) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
Expand Down Expand Up @@ -416,11 +418,22 @@ private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp originA

private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) {
do {
if (CollectionUtils.isNotEmpty(root.getConjuncts())
|| CollectionUtils.isNotEmpty(root.getProjectList())) {
if (CollectionUtils.isNotEmpty(root.getProjectList())) {
break;
}

if (CollectionUtils.isNotEmpty(root.getConjuncts())) {
if (!(root instanceof OlapScanNode)) {
break;
}
OlapTable table = ((OlapScanNode) root).getOlapTable();
// MOW table have __DORIS_DELETE_SIGN__
if (!(table.getKeysType() == KeysType.UNIQUE_KEYS && table.getEnableUniqueKeyMergeOnWrite()
&& root.getConjuncts().size() == 1)) {
break;
}
}

// TODO: Support muti table in the future
if (selectStmt.getTableRefs().size() != 1) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
-- !select --
1

-- !sql_1 --
1

Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,29 @@ suite("test_pushdown_explain") {
sql("select count(cast(lo_orderkey as bigint)) from test_lineorder;")
contains "pushAggOp=COUNT"
}

sql "DROP TABLE IF EXISTS table_mow"
sql """
CREATE TABLE `table_mow` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"'
) ENGINE=OLAP
UNIQUE KEY(`user_id`, `username`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true"
);
"""
sql """ insert into table_mow values(1,"asd"); """
sql """ insert into table_mow values(1,"asd"); """
sql """ insert into table_mow values(1,"asd"); """
sql """ set experimental_enable_nereids_planner = false; """
explain {
sql("select count(1) from table_mow;")
contains "pushAggOp=COUNT"
}
qt_sql_1 "select count(1) from table_mow;"
}

0 comments on commit a1c0304

Please sign in to comment.