Skip to content

Commit

Permalink
feat: min/max消除, top n (#249)
Browse files Browse the repository at this point in the history
* feat: min/max消除, top n

* feat: support distinct count(*)

* feat: like to range

* feat: bit type transfer to int

* feat: add json type

* fix: float & double返回值精度同to_string函数

* fix: agg return 0 when result empty

* feat: 可动态修改user quota

* feat: agg没有聚合函数时支持limit

* fix: rollbak send binlog coredump

* fix: prepare 不支持full_export

* feat: store health check get old status when update

* fix: cstore + ttl + separate乱码问题
  • Loading branch information
cyz-2023 authored Dec 4, 2024
1 parent 5638a1c commit 8e9279a
Show file tree
Hide file tree
Showing 35 changed files with 725 additions and 83 deletions.
14 changes: 12 additions & 2 deletions include/common/expr_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct ExprValue {
case pb::HLL:
case pb::HEX:
case pb::TDIGEST:
case pb::JSON:
str_val = value.string_val();
break;
case pb::BITMAP: {
Expand All @@ -190,6 +191,7 @@ struct ExprValue {
float_precision_len = -1;
str_val = value_str;
if (primitive_type == pb::STRING
|| primitive_type == pb::JSON
|| primitive_type == pb::HEX
|| primitive_type == pb::BITMAP
|| primitive_type == pb::HLL
Expand Down Expand Up @@ -323,6 +325,7 @@ struct ExprValue {
case pb::STRING:
case pb::HEX:
case pb::BITMAP:
case pb::JSON:
case pb::TDIGEST:
value->set_string_val(str_val);
break;
Expand Down Expand Up @@ -422,6 +425,7 @@ struct ExprValue {
case pb::STRING:
case pb::HEX:
case pb::HLL:
case pb::JSON:
case pb::TDIGEST:
return str_val.length();
case pb::DATETIME:
Expand Down Expand Up @@ -522,6 +526,9 @@ struct ExprValue {
case pb::STRING:
str_val = get_string();
break;
case pb::JSON:
str_val = get_string();
break;
case pb::BITMAP: {
_u.bitmap = new(std::nothrow) Roaring();
if (str_val.size() > 0) {
Expand Down Expand Up @@ -576,6 +583,7 @@ struct ExprValue {
butil::MurmurHash3_x64_128(&_u, 8, seed, out);
return out[0];
case pb::STRING:
case pb::JSON:
case pb::HEX: {
butil::MurmurHash3_x64_128(str_val.c_str(), str_val.size(), seed, out);
return out[0];
Expand Down Expand Up @@ -627,6 +635,7 @@ struct ExprValue {
}
case pb::STRING:
case pb::HEX:
case pb::JSON:
case pb::HLL:
case pb::TDIGEST:
return str_val;
Expand Down Expand Up @@ -741,6 +750,7 @@ struct ExprValue {
(_u.double_val < other._u.double_val ? -1 : 0);
case pb::STRING:
case pb::HEX:
case pb::JSON:
return str_val.compare(other.str_val);
case pb::NULL_TYPE:
return -1;
Expand Down Expand Up @@ -795,7 +805,7 @@ struct ExprValue {
}

bool is_string() const {
return type == pb::STRING || type == pb::HEX || type == pb::BITMAP || type == pb::HLL || type == pb::TDIGEST;
return type == pb::STRING || type == pb::HEX || type == pb::BITMAP || type == pb::HLL || type == pb::TDIGEST || type == pb::JSON;
}

bool is_double() const {
Expand Down Expand Up @@ -934,7 +944,7 @@ struct ExprValue {

struct HashFunction {
size_t operator()(const ExprValue& ev) const {
if (ev.type == pb::STRING || ev.type == pb::HEX) {
if (ev.type == pb::STRING || ev.type == pb::HEX || ev.type == pb::JSON) {
return ev.hash();
}
return ev._u.uint64_val;
Expand Down
4 changes: 4 additions & 0 deletions include/common/type_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ inline uint8_t to_mysql_type(pb::PrimitiveType type) {
case pb::BITMAP:
case pb::TDIGEST:
return MYSQL_TYPE_STRING;
case pb::JSON:
return MYSQL_TYPE_JSON;
default:
return MYSQL_TYPE_STRING;
}
Expand Down Expand Up @@ -399,6 +401,8 @@ inline std::string to_mysql_type_string(pb::PrimitiveType type) {
case pb::BITMAP:
case pb::TDIGEST:
return "binary";
case pb::JSON:
return "json";
default:
return "text";
}
Expand Down
15 changes: 15 additions & 0 deletions include/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,22 @@ class ExecNode {
bool is_get_keypoint() {
return _is_get_keypoint;
}
bool set_has_optimized(bool has_optimized) {
_has_optimized = has_optimized;
}
bool has_optimized() {
if (_has_optimized) {
return true;
}
for (auto child : _children) {
if (child->has_optimized()) {
return true;
}
}
return false;
}
protected:
bool _has_optimized = false;
int64_t _limit = -1;
int64_t _num_rows_returned = 0;
bool _is_explain = false;
Expand Down
3 changes: 3 additions & 0 deletions include/exec/sort_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class SortNode : public ExecNode {
for (auto expr : _slot_order_exprs) {
ExprNode::create_pb_expr(sort_node->add_slot_order_exprs(), expr);
}
if (_limit != -1) {
pb_node->set_limit(_limit);
}
}

void transfer_fetcher_pb(pb::FetcherNode* pb_fetcher) {
Expand Down
1 change: 1 addition & 0 deletions include/expr/expr_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ExprNode {

// optimize or node to in node
static void or_node_optimize(ExprNode** expr_node);
static bool like_node_optimize(ExprNode** root, std::vector<ExprNode*>& new_exprs);
bool has_same_children();
bool is_vaild_or_optimize_tree(int32_t level, std::unordered_set<int32_t>* tuple_set);
static int change_or_node_to_in(ExprNode** expr_node);
Expand Down
6 changes: 6 additions & 0 deletions include/expr/internal_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ExprValue pi(const std::vector<ExprValue>& input);
ExprValue greatest(const std::vector<ExprValue>& input);
ExprValue least(const std::vector<ExprValue>& input);
ExprValue pow(const std::vector<ExprValue>& input);
ExprValue bit_count(const std::vector<ExprValue>& input);
//string functions
ExprValue length(const std::vector<ExprValue>& input);
ExprValue bit_length(const std::vector<ExprValue>& input);
Expand All @@ -66,6 +67,11 @@ ExprValue lpad(const std::vector<ExprValue>& input);
ExprValue rpad(const std::vector<ExprValue>& input);
ExprValue instr(const std::vector<ExprValue>& input);
ExprValue json_extract(const std::vector<ExprValue>& input);
ExprValue json_extract1(const std::vector<ExprValue>& input);
ExprValue json_type(const std::vector<ExprValue>& input);
ExprValue json_array(const std::vector<ExprValue>& input);
ExprValue json_object(const std::vector<ExprValue>& input);
ExprValue json_valid(const std::vector<ExprValue>& input);
ExprValue export_set(const std::vector<ExprValue>& input);
ExprValue to_base64(const std::vector<ExprValue>& input);
ExprValue from_base64(const std::vector<ExprValue>& input);
Expand Down
2 changes: 2 additions & 0 deletions include/logical_plan/select_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class SelectPlanner : public LogicalPlanner {
int parse_limit();

int subquery_rewrite();

int minmax_remove();

bool is_full_export();

Expand Down
1 change: 1 addition & 0 deletions include/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ class RuntimeState {
int range_count_limit = 0;
int64_t _sql_exec_timeout = -1;
bool _is_ddl_work = false;
bool must_have_one = false;
private:
bool _is_inited = false;
bool _is_cancelled = false;
Expand Down
13 changes: 7 additions & 6 deletions include/runtime/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ class Sorter {
public:
Sorter(MemRowCompare* comp) : _comp(comp), _idx(0) {
}
void add_batch(std::shared_ptr<RowBatch>& batch) {
virtual void add_batch(std::shared_ptr<RowBatch>& batch) {
batch->reset();
_min_heap.push_back(batch);
}
void sort();
void merge_sort();
int get_next(RowBatch* batch, bool* eos);
virtual void sort();
virtual void merge_sort();
virtual int get_next(RowBatch* batch, bool* eos);

size_t batch_size() {
return _min_heap.size();
}
private:
void multi_sort();
void make_heap();
void shiftdown(size_t index);
virtual void shiftdown(size_t index);

private:
protected:
MemRowCompare* _comp;
private:
std::vector<std::shared_ptr<RowBatch>> _min_heap;
size_t _idx;
};
Expand Down
52 changes: 52 additions & 0 deletions include/runtime/topn_sorter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2018-present Baidu, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <algorithm>
#include <vector>
#include "common.h"
#include "row_batch.h"
#include "mem_row_compare.h"
#include "sorter.h"

namespace baikaldb {
//对每个batch并行的做sort后,再用heap做归并

struct TopNHeapItem {
std::unique_ptr<baikaldb::MemRow> row;
int64_t idx;
};

class TopNSorter : public Sorter {
public:
TopNSorter(MemRowCompare* comp, int64_t limit) : Sorter(comp), _limit(limit) {
}
virtual void add_batch(std::shared_ptr<RowBatch>& batch);
virtual void sort();
virtual void merge_sort(){}
virtual int get_next(RowBatch* batch, bool* eos);
private:
virtual void shiftdown(size_t index);
virtual void shiftup(size_t index);

private:
std::vector<TopNHeapItem> _mem_row_heap;
int64_t _limit = -1;
int64_t _current_count = 0;
int64_t _current_idx = 0;
};
}

/* vim: set ts=4 sw=4 sts=4 tw=100 */
10 changes: 1 addition & 9 deletions include/session/user_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,7 @@ struct UserInfo {

~UserInfo() {}

bool is_exceed_quota() {
if (query_cost.get_time() > 1000000) {
query_cost.reset();
query_count = 0;
return false;
}
return query_count++ > query_quota;
}

bool is_exceed_quota();
bool connection_inc() {
bool res = false;
std::lock_guard<std::mutex> guard(conn_mutex);
Expand Down
2 changes: 2 additions & 0 deletions include/sqlparser/sql_lex.l
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ VAR_SAMP { un_reserved_keyword(yylval, yyscanner, parser); return VAR_SAMP; }
\|\| { return OR; }
\<\< { return LS_OP; }
\>\> { return RS_OP; }
\-\> { return JS_OP; }
\-\>\> { return JS_OP1; }

[0-9]+ {
//integer
Expand Down
37 changes: 35 additions & 2 deletions include/sqlparser/sql_parse.y
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ extern int sql_error(YYLTYPE* yylloc, yyscan_t yyscanner, SqlParser* parser, con
VAR_SAMP
USER_AGG

%token EQ_OP ASSIGN_OP MOD_OP GE_OP GT_OP LE_OP LT_OP NE_OP AND_OP OR_OP NOT_OP LS_OP RS_OP CHINESE_DOT
%token EQ_OP ASSIGN_OP MOD_OP GE_OP GT_OP LE_OP LT_OP NE_OP AND_OP OR_OP NOT_OP LS_OP RS_OP CHINESE_DOT JS_OP JS_OP1
%token <string> IDENT
%token <expr> STRING_LIT INTEGER_LIT DECIMAL_LIT PLACE_HOLDER_LIT

Expand Down Expand Up @@ -760,7 +760,8 @@ extern int sql_error(YYLTYPE* yylloc, yyscan_t yyscanner, SqlParser* parser, con
%left EQ_OP NE_OP GE_OP GT_OP LE_OP LT_OP IS LIKE IN
%left '|'
%left '&'
%left LS_OP RS_OP
%left JS_OP1
%left LS_OP RS_OP JS_OP
%left '+' '-'
%left '*' '/' MOD_OP MOD
%left '^'
Expand Down Expand Up @@ -1866,6 +1867,38 @@ SelectField:
select_field->as_name = $5;
$$ = select_field;
}
| ColumnName JS_OP STRING_LIT {
SelectField* select_field = new_node(SelectField);
FuncExpr* fun = new_node(FuncExpr);
fun->fn_name = "json_extract1";
fun->children.push_back($1, parser->arena);
fun->children.push_back($3, parser->arena);
select_field->expr = fun;
parser::String t1, t2;
t1 = "->\"";
t2 = "\"";
select_field->org_name = ((ColumnName*) $1)->name;
select_field->org_name.append("->\"", parser->arena);
select_field->org_name.append(((LiteralExpr*)$3)->_u.str_val.c_str(), parser->arena);
select_field->org_name.append("\"", parser->arena);
$$ = select_field;
}
| ColumnName JS_OP1 STRING_LIT {
SelectField* select_field = new_node(SelectField);
FuncExpr* fun = new_node(FuncExpr);
fun->fn_name = "json_extract";
fun->children.push_back($1, parser->arena);
fun->children.push_back($3, parser->arena);
select_field->expr = fun;
parser::String t1, t2;
t1 = "->\"";
t2 = "\"";
select_field->org_name = ((ColumnName*) $1)->name;
select_field->org_name.append("->\"", parser->arena);
select_field->org_name.append(((LiteralExpr*)$3)->_u.str_val.c_str(), parser->arena);
select_field->org_name.append("\"", parser->arena);
$$ = select_field;
}
;
FieldAsNameOpt:
/* EMPTY */
Expand Down
5 changes: 3 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ enum PrimitiveType {
HEX = 20;
BITMAP = 21;
TDIGEST = 22;
MAXVALUE_TYPE = 23;
JSON = 23;
MAXVALUE_TYPE = 24;
};

enum SchemaType {
Expand Down Expand Up @@ -146,4 +147,4 @@ message ExprValue {
optional float float_val = 7;
optional double double_val = 8;
optional bytes string_val = 9;
};
};
1 change: 1 addition & 0 deletions src/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ int primitive_to_proto_type(pb::PrimitiveType type) {
{ pb::BOOL, FieldDescriptorProto::TYPE_BOOL},
{ pb::BITMAP, FieldDescriptorProto::TYPE_BYTES},
{ pb::TDIGEST, FieldDescriptorProto::TYPE_BYTES},
{ pb::JSON, FieldDescriptorProto::TYPE_BYTES},
{ pb::NULL_TYPE, FieldDescriptorProto::TYPE_BOOL}
};
if (_mysql_pb_type_mapping.count(type) == 0) {
Expand Down
Loading

0 comments on commit 8e9279a

Please sign in to comment.