Skip to content

Commit

Permalink
bug fix (#246)
Browse files Browse the repository at this point in the history
* fix: 字段range选择需要清空range对象

* fix: 计算分区时只有EQ类型才需要计算

* fix: prepare set open_binlog for ctx

* fix: prepare union set dml txn

* fix: union update select_time_cost bvar

* fix: 部分场景不能使用full_export

* feat: prepare execute reset runtime state

* fix: update manager node return last_insert_id

* for codereview

* for code review
  • Loading branch information
cyz-2023 authored Sep 24, 2024
1 parent 1a5ce66 commit eb6d074
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 16 deletions.
11 changes: 11 additions & 0 deletions include/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,17 @@ class RuntimeState {
int64_t get_cost_time() {
return time_cost.get_time();
}
void prepare_reset() {
time_cost.reset();
region_count = 0;
_num_increase_rows = 0;
_num_affected_rows = 0;
_num_returned_rows = 0;
_num_scan_rows = 0;
_num_filter_rows = 0;
_read_disk_size = 0;
_is_cancelled = false;
}
bool is_timeout() {
return _sql_exec_timeout > 0 && time_cost.get_time() > _sql_exec_timeout * 1000L;
}
Expand Down
3 changes: 2 additions & 1 deletion src/exec/filter_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ int FilterNode::get_next(RuntimeState* state, RowBatch* batch, bool* eos) {
state->memory_limit_release(state->num_scan_rows(), row->used_size());
}
if (reached_limit()) {
DB_WARNING_STATE(state, "reach limit size:%lu", batch->size());
// DB_WARNING_STATE(state, "reach limit size:%lu", batch->size());
DB_DEBUG("reach limit size:%lu, logid: %lu", batch->size(), state->log_id());
*eos = true;
return 0;
}
Expand Down
3 changes: 3 additions & 0 deletions src/exec/update_manager_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ void UpdateManagerNode::update_record(RuntimeState* state, SmartRecord record) {
state->last_insert_id = last_insert_id_expr->get_value(row).get_numberic<int64_t>();
state->client_conn()->last_insert_id = state->last_insert_id;
}
if (last_insert_id_expr != nullptr) {
state->client_conn()->last_insert_id = last_insert_id_expr->get_value(row).get_numberic<int64_t>();
}
}
}
}
Expand Down
22 changes: 12 additions & 10 deletions src/logical_plan/logical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2944,10 +2944,18 @@ int LogicalPlanner::create_scan_nodes() {
}

void LogicalPlanner::set_dml_txn_state(int64_t table_id) {
if (_ctx->is_prepared || _ctx->is_explain) {
if (_ctx->is_explain || table_id == -1) {
return;
}
auto client = _ctx->client_conn;
//is_gloabl_ddl 打开时,该连接处理全局二级索引增量数据,不需要处理binlog。
if (!_ctx->no_binlog && _factory->has_open_binlog(table_id) && !client->is_index_ddl) {
client->open_binlog = true;
_ctx->open_binlog = true;
}
if (_ctx->is_prepared) {
return;
}
if (client->txn_id == 0) {
DB_DEBUG("enable_2pc %d global index %d, binlog %d",
_ctx->enable_2pc, _factory->has_global_index(table_id), _factory->has_open_binlog(table_id));
Expand All @@ -2957,22 +2965,13 @@ void LogicalPlanner::set_dml_txn_state(int64_t table_id) {
client->on_begin();
DB_DEBUG("get txn %ld", client->txn_id);
client->seq_id = 0;
//is_gloabl_ddl 打开时,该连接处理全局二级索引增量数据,不需要处理binlog。
if (!_ctx->no_binlog && _factory->has_open_binlog(table_id) && !client->is_index_ddl) {
client->open_binlog = true;
_ctx->open_binlog = true;
}
} else {
client->txn_id = 0;
client->seq_id = 0;
}
//DB_WARNING("DEBUG client->txn_id:%ld client->seq_id: %d", client->txn_id, client->seq_id);
_ctx->get_runtime_state()->set_single_sql_autocommit(true);
} else {
if (!_ctx->no_binlog && _factory->has_open_binlog(table_id) && !client->is_index_ddl) {
client->open_binlog = true;
_ctx->open_binlog = true;
}
//DB_WARNING("DEBUG client->txn_id:%ld client->seq_id: %d", client->txn_id, client->seq_id);
_ctx->get_runtime_state()->set_single_sql_autocommit(false);
}
Expand Down Expand Up @@ -3103,6 +3102,9 @@ int LogicalPlanner::fill_placeholders(
}

int LogicalPlanner::set_dml_local_index_binlog(const int64_t table_id) {
if (table_id == -1) {
return 0;
}
if (_ctx->open_binlog && !_factory->has_global_index(table_id)) {
// plan的第二个节点为dml节点
if (_ctx->plan.nodes_size() < 2) {
Expand Down
15 changes: 15 additions & 0 deletions src/logical_plan/prepare_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,21 @@ int PreparePlanner::stmt_execute(const std::string& stmt_name, std::vector<pb::E
}

std::shared_ptr<QueryContext> prepare_ctx = iter->second;
prepare_ctx->get_runtime_state()->prepare_reset();
_ctx->stat_info.family = prepare_ctx->stat_info.family;
_ctx->stat_info.table = prepare_ctx->stat_info.table;
_ctx->stat_info.sample_sql << prepare_ctx->stat_info.sample_sql.str();
_ctx->stat_info.sign = prepare_ctx->stat_info.sign;
_ctx->is_full_export = prepare_ctx->is_full_export;
_ctx->debug_region_id = prepare_ctx->debug_region_id;
_ctx->execute_global_flow = prepare_ctx->execute_global_flow;
if (params.size() != prepare_ctx->placeholders.size()) {
_ctx->stat_info.error_code = ER_WRONG_ARGUMENTS;
_ctx->stat_info.error_msg << "Incorrect arguments to EXECUTE: "
<< params.size() << ", "
<< prepare_ctx->placeholders.size();
return -1;
}
// ttl沿用prepare的注释
DB_DEBUG("row_ttl_duration %ld", prepare_ctx->row_ttl_duration);
_ctx->row_ttl_duration = prepare_ctx->row_ttl_duration;
Expand Down
7 changes: 5 additions & 2 deletions src/logical_plan/select_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int SelectPlanner::plan() {
//print_debug_log();
//_create_group_tuple_desc();

if (is_full_export()) {
if (!_ctx->is_full_export && is_full_export()) {
_ctx->is_full_export = true;
}
get_slot_column_mapping();
Expand Down Expand Up @@ -150,6 +150,9 @@ bool SelectPlanner::is_full_export() {
if (_ctx->explain_type != EXPLAIN_NULL) {
return false;
}
if (_ctx->debug_region_id != -1) {
return false;
}
if (_ctx->has_derived_table || _ctx->has_information_schema) {
return false;
}
Expand All @@ -159,7 +162,7 @@ bool SelectPlanner::is_full_export() {
if (_select->having != nullptr) {
return false;
}
if (_select->order != nullptr) {
if (_select->order != nullptr || !_order_ascs.empty()) {
return false;
}
//if (_select->limit != nullptr) {
Expand Down
24 changes: 22 additions & 2 deletions src/physical_plan/index_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ void IndexSelector::hit_row_field_range(ExprNode* expr,
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
if (tmp_type != field_range_map[field_id].type) {
field_range_map[field_id] = FieldRange();
}
for (auto pair : values) {
field_range_map[field_id].left.push_back(pair.second[0]);
}
Expand All @@ -224,6 +227,9 @@ void IndexSelector::hit_row_field_range(ExprNode* expr,
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
if (tmp_type != field_range_map[field_id].type) {
field_range_map[field_id] = FieldRange();
}
for (auto pair : values) {
field_range_map[field_id].right.push_back(pair.second[0]);
}
Expand Down Expand Up @@ -454,6 +460,11 @@ void IndexSelector::hit_field_range(ExprNode* expr,
int32_t fn_op = static_cast<ScalarFnCall*>(expr)->fn().fn_op();
switch (fn_op) {
case parser::FT_EQ:
tmp_type = EQ;
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
field_range_map[field_id] = FieldRange();
field_range_map[field_id].eq_in_values = values;
field_range_map[field_id].conditions.insert(expr);
field_range_map[field_id].type = EQ;
Expand All @@ -463,6 +474,9 @@ void IndexSelector::hit_field_range(ExprNode* expr,
tmp_type = RANGE;
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
if (tmp_type != field_range_map[field_id].type) {
field_range_map[field_id] = FieldRange();
}
if (field_range_map[field_id].left.size() > 0){
field_range_map[field_id].left.clear();
Expand All @@ -479,6 +493,9 @@ void IndexSelector::hit_field_range(ExprNode* expr,
tmp_type = RANGE;
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
if (tmp_type != field_range_map[field_id].type) {
field_range_map[field_id] = FieldRange();
}
if(field_range_map[field_id].right.size() > 0){
field_range_map[field_id].right.clear();
Expand All @@ -504,6 +521,7 @@ void IndexSelector::hit_field_range(ExprNode* expr,
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return ;
}
field_range_map[field_id] = FieldRange();
field_range_map[field_id].eq_in_values = values;
field_range_map[field_id].conditions.clear();
field_range_map[field_id].conditions.insert(expr);
Expand All @@ -529,6 +547,7 @@ void IndexSelector::hit_field_range(ExprNode* expr,
if (get_field_hit_type_weight(field_range_map[field_id].type) > get_field_hit_type_weight(tmp_type)) {
return;
}
field_range_map[field_id] = FieldRange();
range::FieldRange fulltext_and_range;
field_range_map[field_id].like_values.push_back(values[0]);
fulltext_and_range.like_values.push_back(values[0]);
Expand Down Expand Up @@ -769,8 +788,9 @@ int IndexSelector::select_partition(SmartTable& table_info, ScanNode* scan_node,
}

if (partition_type == pb::PT_HASH) {
if (field_iter != field_range_map.end() && !field_iter->second.eq_in_values.empty()) {
scan_node->set_partition_field_id(field_iter->first);
if (field_iter != field_range_map.end()
&& (field_iter->second.type == EQ || field_iter->second.type == IN || field_iter->second.type == LIKE_EQ)
&& !field_iter->second.eq_in_values.empty()) {
ExprValueFlatSet eq_in_values_set;
eq_in_values_set.init(ajust_flat_size(field_iter->second.eq_in_values.size()));
for (auto& value : field_iter->second.eq_in_values) {
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ void StateMachine::_print_query_time(SmartSocket client) {
field_range_type, err_count, stat_info->sign, subquery_signs);
}

if (op_type == pb::OP_SELECT) {
if (op_type == pb::OP_SELECT || op_type == pb::OP_UNION) {
select_time_cost << stat_info->total_time;
std::unique_lock<std::mutex> lock(_mutex);
if (select_by_users.find(client->username) == select_by_users.end()) {
Expand Down

0 comments on commit eb6d074

Please sign in to comment.