Skip to content

Commit

Permalink
fix: for code reveiw
Browse files Browse the repository at this point in the history
  • Loading branch information
wy1433 committed Apr 16, 2024
1 parent 6689c78 commit 5e2fa70
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 15 deletions.
1 change: 0 additions & 1 deletion proto/meta.interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,6 @@ message BaikalHeartBeatResponse {
repeated Statistics statistics = 10;
repeated RegionDdlWork region_ddl_works = 11;
repeated DdlWorkInfo ddl_works = 12;
repeated string meta_peer = 13;
};

enum QueryOpType {
Expand Down
22 changes: 14 additions & 8 deletions src/exec/fetcher_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ DEFINE_bool(use_dynamic_timeout, false, "whether use dynamic_timeout");
BRPC_VALIDATE_GFLAG(use_dynamic_timeout, brpc::PassValidate);
DEFINE_bool(use_read_index, false, "whether use follower read");
DEFINE_bool(read_random_select_peer, false, "read random select peers");
DEFINE_int32(sql_exec_timeout, 100000, "sql exec timeout default 100s");
BRPC_VALIDATE_GFLAG(sql_exec_timeout, brpc::NonNegativeInteger);
DEFINE_int32(sql_exec_timeout, -1, "sql exec timeout. -1 means no limit");
BRPC_VALIDATE_GFLAG(sql_exec_timeout, brpc::PassValidate);
bvar::Adder<int64_t> OnRPCDone::async_rpc_region_count {"async_rpc_region_count"};
bvar::LatencyRecorder OnRPCDone::total_send_request {"total_send_request"};
bvar::LatencyRecorder OnRPCDone::add_backup_send_request {"add_backup_send_request"};
Expand Down Expand Up @@ -439,14 +439,20 @@ ErrorType OnRPCDone::send_async() {
if (_fetcher_store->dynamic_timeout_ms > 0 && !_backup.empty() && _backup != _addr) {
option.backup_request_ms = _fetcher_store->dynamic_timeout_ms;
}

if (!_state->is_ddl_work() && _request.op_type() == 4 && _state->explain_type == EXPLAIN_NULL) {
int64_t min_timeout = std::min(FLAGS_sql_exec_timeout - _state->get_cost_time() / 1000, FLAGS_fetcher_request_timeout * 1L);
if (min_timeout <= 0) {
DB_WARNING("logid: %lu, sql exec timeout, op_type: %d, total_cost: %ld", _state->log_id(), _request.op_type(), _state->get_cost_time());
return E_FATAL;
int32_t sql_exec_time_left = FLAGS_fetcher_request_timeout;
if (FLAGS_sql_exec_timeout > 0) {
int64_t sql_exec_time_left = std::min(FLAGS_sql_exec_timeout - _state->get_cost_time() / 1000,
FLAGS_fetcher_request_timeout * 1L);
if (sql_exec_time_left <= 0) {
DB_WARNING("logid: %lu, sql exec timeout, op_type: %d, total_cost: %ld", _state->log_id(),
_request.op_type(), _state->get_cost_time());
return E_FATAL;
}
}
_request.set_sql_exec_timeout(min_timeout);
option.timeout_ms = min_timeout;
_request.set_sql_exec_timeout(sql_exec_time_left);
option.timeout_ms = sql_exec_time_left;
}
// SelectiveChannel在init时会出core,开源版先注释掉
#ifdef BAIDU_INTERNAL
Expand Down
2 changes: 1 addition & 1 deletion src/logical_plan/delete_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ int DeletePlanner::create_limit_node() {
if (_limit_count.nodes_size() > 0) {
limit->mutable_count_expr()->CopyFrom(_limit_count);
}
_ctx->enable_2pc = true;
return 0;
}

Expand Down Expand Up @@ -296,6 +295,7 @@ int DeletePlanner::parse_limit() {
DB_WARNING("create limit offset expr failed");
return -1;
}
_ctx->execute_global_flow = true;
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/logical_plan/update_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ int UpdatePlanner::create_limit_node() {
if (_limit_count.nodes_size() > 0) {
limit->mutable_count_expr()->CopyFrom(_limit_count);
}
_ctx->enable_2pc = true;
_ctx->execute_global_flow = true;
return 0;
}

Expand Down
6 changes: 2 additions & 4 deletions src/physical_plan/separate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,6 @@ int Separate::separate_update(QueryContext* ctx) {
ExecNode* plan = ctx->root;
PacketNode* packet_node = static_cast<PacketNode*>(plan->get_node(pb::PACKET_NODE));
UpdateNode* update_node = static_cast<UpdateNode*>(plan->get_node(pb::UPDATE_NODE));
LimitNode* limit_node = static_cast<LimitNode*>(update_node->get_node(pb::LIMIT_NODE));
std::vector<ExecNode*> scan_nodes;
plan->get_node(pb::SCAN_NODE, scan_nodes);
pb::PlanNode pb_manager_node;
Expand All @@ -817,7 +816,7 @@ int Separate::separate_update(QueryContext* ctx) {
}
int64_t main_table_id = update_node->table_id();
bool should_delete = false;
if (!need_separate_plan(ctx, main_table_id) && limit_node == nullptr) {
if (!need_separate_plan(ctx, main_table_id)) {
auto region_infos = static_cast<RocksdbScanNode*>(scan_nodes[0])->region_infos();
manager_node->set_op_type(pb::OP_UPDATE);
manager_node->set_region_infos(region_infos);
Expand All @@ -844,7 +843,6 @@ int Separate::separate_delete(QueryContext* ctx) {
ExecNode* plan = ctx->root;
PacketNode* packet_node = static_cast<PacketNode*>(plan->get_node(pb::PACKET_NODE));
DeleteNode* delete_node = static_cast<DeleteNode*>(plan->get_node(pb::DELETE_NODE));
LimitNode* limit_node = static_cast<LimitNode*>(delete_node->get_node(pb::LIMIT_NODE));
std::vector<ExecNode*> scan_nodes;
plan->get_node(pb::SCAN_NODE, scan_nodes);
int64_t main_table_id = delete_node->table_id();
Expand All @@ -862,7 +860,7 @@ int Separate::separate_delete(QueryContext* ctx) {
return -1;
}
bool should_delete = false;
if (!need_separate_plan(ctx, main_table_id) && limit_node == nullptr) {
if (!need_separate_plan(ctx, main_table_id)) {
auto region_infos = static_cast<RocksdbScanNode*>(scan_nodes[0])->region_infos();
manager_node->set_op_type(pb::OP_DELETE);
manager_node->set_region_infos(region_infos);
Expand Down

0 comments on commit 5e2fa70

Please sign in to comment.