Skip to content

Commit

Permalink
功能完善 (#233)
Browse files Browse the repository at this point in the history
* feat: suport '' as ' and "" as "

* feat: ignore new field faild when select *

* feat: support select ... for update

* fix: init channel faild when return leader is null

* feat: support _utf8mb4 ''

* fix: (a,b) in (结果为空的子查询)时,导致扫全表的bug

* feat: add min & max split slow down cost

* fix: field in multi row expr filter error

* feat: (subselect) as a subselect

* feat: support blobdb for key-value separation

* fix: Innodb to InnoDB

* fix: for code review

* fix: for code review

---------

Co-authored-by: yuzhong.chen <[email protected]>
  • Loading branch information
wy1433 and cyz-2023 authored Apr 12, 2024
1 parent 82fa752 commit de5915a
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 24 deletions.
1 change: 1 addition & 0 deletions include/exec/scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ class ScanNode : public ExecNode {
std::vector<ScanIndexInfo> _scan_indexs;
bthread::Mutex _current_index_mutex;
bool _current_global_backup = false;
GetMode _get_mode = GET_ONLY; // set to GET_LOCK, when "select ... for update"
};
}

Expand Down
1 change: 1 addition & 0 deletions include/logical_plan/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class QueryContext {
bool is_get_keypoint = false;
bool is_full_export = false;
bool is_straight_join = false;
bool select_for_update = false;
ExplainType explain_type = EXPLAIN_NULL;
int single_store_concurrency = -1;

Expand Down
7 changes: 4 additions & 3 deletions include/sqlparser/sql_lex.l
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ LONG { return LONG; }
VARCHAR { return VARCHAR; }
VARBINARY { return VARBINARY; }
_BINARY { return _BINARY; }
_UTF8MB4 { return _UTF8MB4; }
VIRTUAL { return VIRTUAL; }
WHEN { return WHEN; }
WHERE { return WHERE; }
Expand Down Expand Up @@ -472,12 +473,12 @@ VAR_SAMP { un_reserved_keyword(yylval, yyscanner, parser); return VAR_SAMP; }
return STRING_LIT;
}

\"([^\\\"]|\\.)*\" |
\'([^\\\']|\\.)*\' {
\"([^\\\"]|\\.|\"\")*\" |
\'([^\\\']|\\.|\'\')*\' {
//string
LiteralExpr* lit;
lit = LiteralExpr::make_string(yytext, parser->arena);
lit->_u.str_val.stripslashes(); // move from logical_plan
lit->_u.str_val.stripslashes(yytext[0]); // move from logical_plan
if (parser->is_gbk && parser->has_5c) {
lit->_u.str_val.restore_5c();
}
Expand Down
7 changes: 7 additions & 0 deletions include/sqlparser/sql_parse.y
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ extern int sql_error(YYLTYPE* yylloc, yyscan_t yyscanner, SqlParser* parser, con
VARCHAR
VARBINARY
_BINARY
_UTF8MB4
VIRTUAL
WHEN
WHERE
Expand Down Expand Up @@ -1819,6 +1820,9 @@ SubSelect:
sub_query->query_stmt = (UnionStmt*)$2;
$$ = sub_query;
}
| '(' SubSelect ')' {
$$ = $2;
}
;

// See https://dev.mysql.com/doc/refman/5.7/en/union.html
Expand Down Expand Up @@ -2860,6 +2864,9 @@ Literal:
| _BINARY STRING_LIT {
$$ = $2;
}
| _UTF8MB4 STRING_LIT {
$$ = $2;
}
;

SimpleExpr:
Expand Down
7 changes: 6 additions & 1 deletion include/sqlparser/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct String {
++i;
}
}
void stripslashes() {
void stripslashes(char c) {
size_t slow = 0;
size_t fast = 0;
bool has_slash = false;
Expand All @@ -109,6 +109,11 @@ struct String {
}
has_slash = false;
} else {
if (value[fast] == c && fast + 1 < length && value[fast + 1] == c) {
value[slow++] = c;
fast += 2;
continue;
}
if (value[fast] == '\\') {
has_slash = true;
fast++;
Expand Down
2 changes: 1 addition & 1 deletion src/common/information_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ void InformationSchema::init_tables() {
record->set_string(record->get_field_by_name("TABLE_SCHEMA"), db);
record->set_string(record->get_field_by_name("TABLE_NAME"), table_info->short_name);
record->set_string(record->get_field_by_name("TABLE_TYPE"), "BASE TABLE");
record->set_string(record->get_field_by_name("ENGINE"), "Innodb");
record->set_string(record->get_field_by_name("ENGINE"), "InnoDB");
record->set_int64(record->get_field_by_name("VERSION"), table_info->version);
record->set_string(record->get_field_by_name("ROW_FORMAT"), "Compact");
record->set_int64(record->get_field_by_name("TABLE_ROWS"), 0);
Expand Down
15 changes: 15 additions & 0 deletions src/engine/rocks_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ DEFINE_int32(max_dict_bytes, 16 * 1024, "default 16K");
DEFINE_int32(zstd_max_train_bytes, 256 * 1024, "default 256K");
DEFINE_bool(olap_import_mode, false, "is olap import, default: false");

DEFINE_bool(enable_blob_files, false, "set it to true to enable key-value separation");
DEFINE_int32(min_blob_size, 1 * 1024,
"values at or above this threshold will be written to blob files during flush or compaction");

const std::string RocksWrapper::RAFT_LOG_CF = "raft_log";
const std::string RocksWrapper::BIN_LOG_CF = "bin_log_new";
const std::string RocksWrapper::DATA_CF = "data";
Expand Down Expand Up @@ -275,6 +279,17 @@ int32_t RocksWrapper::init(const std::string& path) {
_data_cf_option.bottommost_compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
}

// key-value separation
if (FLAGS_enable_blob_files) {
_data_cf_option.enable_blob_files = true;
_data_cf_option.min_blob_size = FLAGS_min_blob_size;
_data_cf_option.blob_file_size = 1ULL << 28;
_data_cf_option.blob_compression_type = rocksdb::CompressionType::kLZ4Compression;
_data_cf_option.enable_blob_garbage_collection = true;
_data_cf_option.blob_garbage_collection_age_cutoff = 0.25;
_data_cf_option.blob_garbage_collection_force_threshold = 0.8;
}

//todo
//prefix: 0x01-0xFF,分别用来存储不同的meta信息
_meta_info_option.prefix_extractor.reset(
Expand Down
2 changes: 1 addition & 1 deletion src/exec/fetcher_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ ErrorType OnRPCDone::handle_response(const std::string& remote_side) {
if (_retry_times > 1 && _response.leader() == "0.0.0.0:0") {
schema_factory->update_instance(remote_side, pb::FAULTY, false, false);
}
if (_response.leader() != "0.0.0.0:0") {
if (_response.leader() != "0.0.0.0:0" && _response.leader() != "") {
// store返回了leader,则相信store,不判断normal
_info.set_leader(_response.leader());
schema_factory->update_leader(_info);
Expand Down
26 changes: 18 additions & 8 deletions src/exec/rocksdb_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ int RocksdbScanNode::init(const pb::PlanNode& node) {
DB_WARNING("pri info not found _table_id:%ld", _table_id);
return -1;
}
if (node.derive_node().scan_node().has_lock() &&
node.derive_node().scan_node().lock() == pb::LOCK_GET_ONLY_PRIMARY) {
_get_mode = GET_LOCK;
}
_is_ddl_work = node.derive_node().scan_node().is_ddl_work();
_ddl_work_type = node.derive_node().scan_node().ddl_work_type();
_ddl_index_id = node.derive_node().scan_node().ddl_index_id();
Expand Down Expand Up @@ -469,7 +473,7 @@ int RocksdbScanNode::open(RuntimeState* state) {
for (auto& slot : _tuple_desc->slots()) {
if (slot.field_id() > _field_slot.size() - 1) {
DB_WARNING("vector out of range, region_id: %ld, field_id: %d", _region_id, slot.field_id());
return -1;
continue;
}
_field_slot[slot.field_id()] = slot.slot_id();
if (pri_field_ids.count(slot.field_id()) == 0) {
Expand Down Expand Up @@ -736,19 +740,19 @@ int RocksdbScanNode::get_next_by_table_get(RuntimeState* state, RowBatch* batch,
*eos = true;
return 0;
}
if (!FLAGS_scan_use_multi_get) {
if (!FLAGS_scan_use_multi_get || _get_mode != GET_ONLY) {
++_scan_rows;
if (_use_encoded_key) {
_idx++;
auto key_pair = _scan_range_keys.get_next();
int ret = txn->get_update_primary(_region_id, *_pri_info, key_pair->left_key(), record,
_field_ids, GET_ONLY, state->need_check_region());
_field_ids, _get_mode, state->need_check_region());
if (ret < 0) {
continue;
}
} else {
record = _left_records[_idx++];
int ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_ONLY,
int ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, _get_mode,
state->need_check_region());
if (ret < 0) {
continue;
Expand Down Expand Up @@ -813,7 +817,7 @@ int RocksdbScanNode::get_next_by_index_get(RuntimeState* state, RowBatch* batch,
*eos = true;
return 0;
}
if (!FLAGS_scan_use_multi_get) {
if (!FLAGS_scan_use_multi_get || _get_mode != GET_ONLY) {
++_scan_rows;
if (_use_encoded_key) {
auto key_pair = _scan_range_keys.get_next();
Expand All @@ -834,7 +838,7 @@ int RocksdbScanNode::get_next_by_index_get(RuntimeState* state, RowBatch* batch,
}
if (!_is_covering_index && !_is_global_index) {
++get_primary_cnt;
int ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_ONLY, false);
int ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, _get_mode, false);
if (ret < 0) {
DB_FATAL("get primary:%ld fail, not exist, ret:%d, record: %s",
_table_id, ret, record->to_string().c_str());
Expand Down Expand Up @@ -1161,6 +1165,12 @@ int RocksdbScanNode::get_next_by_table_seek(RuntimeState* state, RowBatch* batch
continue;
}
if (_lock != pb::LOCK_GET) {
if (_lock == pb::LOCK_GET_ONLY_PRIMARY) {
// select ... for update
if (lock_primary(state, row.get()) != 0) {
return -1;
}
}
if (!need_copy(row.get(), _scan_conjuncts)) {
state->inc_num_filter_rows();
++index_filter_cnt;
Expand Down Expand Up @@ -1436,11 +1446,11 @@ int RocksdbScanNode::get_next_by_index_seek(RuntimeState* state, RowBatch* batch
}
//DB_NOTICE("get index: %ld", cost.get_time());
//cost.reset();
if (!FLAGS_scan_use_multi_get || _has_s_wordrank) {
if (!FLAGS_scan_use_multi_get || _has_s_wordrank || _get_mode != GET_ONLY) {
if (!_is_covering_index && !_is_global_index) {
++get_primary_cnt;
// todo: 反查直接用encode_key
ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_ONLY, false);
ret = txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, _get_mode, false);
if (ret < 0) {
if (_reverse_indexes.size() == 0 && _reverse_index == nullptr) {
DB_FATAL("get primary:%ld fail, ret:%d, index primary may be not consistency: %s",
Expand Down
4 changes: 4 additions & 0 deletions src/logical_plan/logical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2864,6 +2864,10 @@ int LogicalPlanner::create_join_and_scan_nodes(JoinMemTmp* join_root, ApplyMemTm
for (auto index_id : join_root->ignore_indexes) {
scan->add_ignore_indexes(index_id);
}
if (_ctx->select_for_update) {
// 仅加主表行锁
scan->set_lock(pb::LOCK_GET_ONLY_PRIMARY);
}
return 0;
}
//如果不是根节点必须是左右孩子都有
Expand Down
4 changes: 4 additions & 0 deletions src/logical_plan/select_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ int SelectPlanner::plan() {
_ctx->is_straight_join = _select->select_opt->straight_join;
}

if (_select->lock == parser::SL_FOR_UPDATE && client->txn_id != 0) {
_ctx->select_for_update = true;
}

// parse from
if (0 != parse_db_tables(_select->table_refs, &_join_root)) {
return -1;
Expand Down
30 changes: 28 additions & 2 deletions src/physical_plan/index_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,22 @@ void IndexSelector::hit_row_field_range(ExprNode* expr,
if (!expr->children(i)->is_row_expr()) {
return;
}
bool has_null = false;
for (auto& pair : slots) {
size_t idx = pair.first;
values[idx].push_back(static_cast<RowExpr*>(expr->children(i))->get_value(nullptr, idx));
ExprValue val = static_cast<RowExpr*>(expr->children(i))->get_value(nullptr, idx);
if (val.is_null()) {
has_null = true;
}
}
if (!has_null) {
for (auto& pair : slots) {
size_t idx = pair.first;
values[idx].push_back(static_cast<RowExpr*>(expr->children(i))->get_value(nullptr, idx));
}
}
}
if (values.begin()->second.size() == 0) {
if (values.empty() || values.begin()->second.size() == 0) {
*index_predicate_is_null = is_index_predicate(expr);
return;
}
Expand Down Expand Up @@ -169,6 +179,9 @@ void IndexSelector::hit_row_field_range(ExprNode* expr,
return;
}
int32_t field_id = slots[0]->field_id();
if (field_range_map[field_id].left.size() > 0) {
return;
}
for (auto pair : values) {
field_range_map[field_id].left.push_back(pair.second[0]);
}
Expand All @@ -185,6 +198,9 @@ void IndexSelector::hit_row_field_range(ExprNode* expr,
return;
}
int32_t field_id = slots[0]->field_id();
if (field_range_map[field_id].right.size() > 0) {
return;
}
for (auto pair : values) {
field_range_map[field_id].right.push_back(pair.second[0]);
}
Expand Down Expand Up @@ -420,13 +436,23 @@ void IndexSelector::hit_field_range(ExprNode* expr,
return;
case parser::FT_GE:
case parser::FT_GT:
if (field_range_map[field_id].left.size() > 0){
field_range_map[field_id].left.clear();
field_range_map[field_id].left_row_field_ids.clear();
field_range_map[field_id].is_row_expr = false;
}
field_range_map[field_id].left.push_back(values[0]);
field_range_map[field_id].left_open = fn_op == parser::FT_GT;
field_range_map[field_id].left_expr = expr;
field_range_map[field_id].type = RANGE;
return;
case parser::FT_LE:
case parser::FT_LT:
if(field_range_map[field_id].right.size() > 0){
field_range_map[field_id].right.clear();
field_range_map[field_id].right_row_field_ids.clear();
field_range_map[field_id].is_row_expr = false;
}
field_range_map[field_id].right.push_back(values[0]);
field_range_map[field_id].right_open = fn_op == parser::FT_LT;
field_range_map[field_id].right_expr = expr;
Expand Down
6 changes: 4 additions & 2 deletions src/protocol/network_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,16 @@ void NetworkServer::print_agg_sql() {
}
SQL_TRACE("date_hour_min=[%04d-%02d-%02d\t%02d\t%02d] sum_pv_avg_affected_scan_filter_rgcnt_err="
"[%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld] sign_hostname_index=[%lu\t%s\t%s] dynamic_timeout_ms:%ld sql_agg: %s "
"op_version_desc=[%ld\t%s\t%s\t%s]",
"op_version_desc=[%ld\t%s\t%s\t%s] server_addr=[%s:%d]",
1900 + tm.tm_year, 1 + tm.tm_mon, tm.tm_mday, tm.tm_hour, tm.tm_min,
pair2.second.sum, pair2.second.count,
pair2.second.count == 0 ? 0 : pair2.second.sum / pair2.second.count,
pair2.second.affected_rows, pair2.second.scan_rows, pair2.second.filter_rows,
pair2.second.region_count, pair2.second.err_count,
out_sign, hostname.c_str(), factory->get_index_name(pair2.first).c_str(), dynamic_timeout_ms,
pair.first.c_str(), version, op_description.c_str(), recommend_index.c_str(), field_desc.c_str());
pair.first.c_str(), version, op_description.c_str(), recommend_index.c_str(), field_desc.c_str(),
butil::my_ip_cstr(), FLAGS_baikal_port);

table_count_err[pair2.second.table_id] += CountErr(pair2.second.count, pair2.second.err_count);

if (FLAGS_insert_agg_sql) {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/show_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ bool ShowHelper::_show_table_status(const SmartSocket& client, const std::vector
// Make rows.
std::vector<std::string> row;
row.emplace_back(tables[cnt]);
row.emplace_back("Innodb");
row.emplace_back("InnoDB");
row.emplace_back("1");
row.emplace_back("Compact");
row.emplace_back("1");
Expand Down Expand Up @@ -1989,7 +1989,7 @@ bool ShowHelper::_show_table_status(const SmartSocket& client, const std::vector
// Make rows.
std::vector<std::string> row;
row.emplace_back(table_info.table_name());
row.emplace_back("Innodb");
row.emplace_back("InnoDB");
row.emplace_back(std::to_string(table_info.version()));
row.emplace_back("Compact");
row.emplace_back(std::to_string(table_info.row_count()));
Expand Down
Loading

0 comments on commit de5915a

Please sign in to comment.