From 6d563e512b3892f104c01fbf9a6b3ccec7be5e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=87?= Date: Wed, 17 Apr 2024 13:58:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84=20(#237)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: show full columns * feat: show tables, show columns * fix: binlog update checkpoint bug * fix: sort_limit_by_range * feat: show create table compatible with mysql * fix: show tables information_schema库转小写 * fix: cstore + ttl表,update字段默认值时不生效的bug * feat: slow query log add sign, server_addr and conn_id * fix: show cmd make simple result bug * fix: exec node delete after parent clear children * feat: auto update meta list * feat: add sql exec timeout && client count bvar * fix: enable profilers * fix: string type return field length 255 * fix: not in (null) * feat: support delete/update stmt with OrderByOption and LimitCause * fix: show processlist segment fault * fix: for code reveiw * fix: exec node delete after parent clear children * fix: for code review --------- Co-authored-by: yuzhong.chen --- BUILD | 14 +- include/common/meta_server_interact.hpp | 8 +- include/common/type_utils.h | 12 +- include/logical_plan/delete_planner.h | 1 + include/logical_plan/setkv_planner.h | 3 +- include/logical_plan/update_planner.h | 7 +- include/protocol/network_server.h | 6 + include/protocol/show_helper.h | 9 +- include/runtime/runtime_state.h | 12 + include/store/region.h | 1 + include/store/store.h | 2 + proto/raft.proto | 2 + proto/store.interface.proto | 1 + src/common/information_schema.cpp | 6 +- src/common/meta_server_interact.cpp | 31 +- src/exec/delete_manager_node.cpp | 9 +- src/exec/dml_node.cpp | 13 +- src/exec/fetcher_store.cpp | 22 +- src/exec/filter_node.cpp | 21 +- src/exec/index_ddl_manager_node.cpp | 1 + src/exec/packet_node.cpp | 3 + src/exec/rocksdb_scan_node.cpp | 17 +- src/logical_plan/delete_planner.cpp | 54 +- src/logical_plan/setkv_planner.cpp | 29 +- src/logical_plan/update_planner.cpp | 47 +- src/physical_plan/separate.cpp | 63 +- src/protocol/network_server.cpp | 100 +++- src/protocol/show_helper.cpp | 743 ++++++++++++++---------- src/raft/raft_control.cpp | 18 + src/runtime/runtime_state.cpp | 4 + src/store/region_binlog.cpp | 10 +- src/store/store.cpp | 26 + 32 files changed, 904 insertions(+), 391 deletions(-) diff --git a/BUILD b/BUILD index cd54acd62..beb66ca05 100644 --- a/BUILD +++ b/BUILD @@ -61,6 +61,7 @@ COPTS = [ "-Wno-parentheses", "-Wno-deprecated-declarations", "-DBAIKAL_TCMALLOC", + "-DBRPC_ENABLE_CPU_PROFILER", "-UNDEBUG", ] @@ -585,10 +586,6 @@ cc_binary( "include/engine", "include/common", ], - copts = [ - "-DBAIDU_RPC_ENABLE_CPU_PROFILER", - "-DBAIDU_RPC_ENABLE_HEAP_PROFILER", - ], linkopts = LINKOPTS, deps = [ ":meta_server", @@ -609,10 +606,6 @@ cc_binary( includes = [ "include/store", ], - copts = [ - "-DBAIDU_RPC_ENABLE_CPU_PROFILER", - "-DBAIDU_RPC_ENABLE_HEAP_PROFILER", - ], linkopts = LINKOPTS, deps = [ ":store", @@ -726,11 +719,8 @@ cc_library( cc_binary( name = "baikaldb", srcs = ["src/protocol/main.cpp"], - copts = COPTS + [ - "-DBAIDU_RPC_ENABLE_CPU_PROFILER", - "-DBAIDU_RPC_ENABLE_HEAP_PROFILER", - ], linkopts = LINKOPTS, + copts = COPTS, deps = [ ":protocol2", ":common", diff --git a/include/common/meta_server_interact.hpp b/include/common/meta_server_interact.hpp index e2babd8b0..3ce9cd7c7 100644 --- a/include/common/meta_server_interact.hpp +++ b/include/common/meta_server_interact.hpp @@ -54,12 +54,14 @@ class MetaServerInteract { return &_instance; } + MetaServerInteract() {} bool is_inited() { return _is_inited; } int init(bool is_backup = false); int init_internal(const std::string& meta_bns); + int reset_bns_channel(const std::string& meta_bns); template int send_request(const std::string& service_name, const Request& request, @@ -98,7 +100,8 @@ class MetaServerInteract { } short_channel.CallMethod(method, &cntl, &request, &response, NULL); } else { - _bns_channel.CallMethod(method, &cntl, &request, &response, NULL); + std::unique_lock lck(_bns_channel_mutex); + _bns_channel->CallMethod(method, &cntl, &request, &response, NULL); if (!cntl.Failed() && response.errcode() == pb::SUCCESS) { _set_leader_address(cntl.remote_side()); DB_WARNING("connet with meta server success by bns name, leader:%s", @@ -149,11 +152,12 @@ class MetaServerInteract { _master_leader_address = addr; } private: - brpc::Channel _bns_channel; + brpc::Channel *_bns_channel = nullptr; int32_t _request_timeout = 30000; int32_t _connect_timeout = 5000; bool _is_inited = false; std::mutex _master_leader_mutex; + std::mutex _bns_channel_mutex; butil::EndPoint _master_leader_address; }; }//namespace diff --git a/include/common/type_utils.h b/include/common/type_utils.h index a96f0772e..9a59db25b 100644 --- a/include/common/type_utils.h +++ b/include/common/type_utils.h @@ -417,21 +417,23 @@ inline std::string to_mysql_type_full_string(pb::PrimitiveType type) { case pb::DOUBLE: return "double"; case pb::STRING: - return "text"; + return "varchar(1024)"; case pb::DATETIME: - return "datetime(6)"; + return "datetime"; case pb::DATE: return "date"; case pb::TIME: return "time"; case pb::TIMESTAMP: - return "timestamp(0)"; + return "timestamp"; case pb::HLL: + return "HLL"; case pb::BITMAP: + return "BITMAP"; case pb::TDIGEST: - return "binary"; + return "TDIGEST"; default: - return "text"; + return ""; } } diff --git a/include/logical_plan/delete_planner.h b/include/logical_plan/delete_planner.h index 207d719c3..870ee60e3 100644 --- a/include/logical_plan/delete_planner.h +++ b/include/logical_plan/delete_planner.h @@ -32,6 +32,7 @@ class DeletePlanner : public LogicalPlanner { private: // method to create plan node int create_delete_node(pb::PlanNode* delete_node); + int create_limit_node(); // method to create plan node int create_truncate_node(); diff --git a/include/logical_plan/setkv_planner.h b/include/logical_plan/setkv_planner.h index 458cba367..f36a4b785 100644 --- a/include/logical_plan/setkv_planner.h +++ b/include/logical_plan/setkv_planner.h @@ -32,8 +32,9 @@ class SetKVPlanner : public LogicalPlanner { int set_autocommit_1(); int set_autocommit(parser::ExprNode* expr); int set_user_variable(const std::string& key, parser::ExprNode* expr); + int set_sql_mode(parser::ExprNode* expr); private: parser::SetStmt* _set_stmt; }; -} //namespace baikal \ No newline at end of file +} //namespace baikal diff --git a/include/logical_plan/update_planner.h b/include/logical_plan/update_planner.h index e976e60e9..f43389a1b 100644 --- a/include/logical_plan/update_planner.h +++ b/include/logical_plan/update_planner.h @@ -24,8 +24,7 @@ namespace baikaldb { class UpdatePlanner : public LogicalPlanner { public: UpdatePlanner(QueryContext* ctx) : - LogicalPlanner(ctx), - _limit_count(-1) {} + LogicalPlanner(ctx) {} virtual ~UpdatePlanner() {} virtual int plan(); @@ -33,6 +32,7 @@ class UpdatePlanner : public LogicalPlanner { private: int create_update_node(pb::PlanNode* update_node); + int create_limit_node(); // method to parse SQL parts int parse_kv_list(); @@ -43,9 +43,10 @@ class UpdatePlanner : public LogicalPlanner { private: parser::UpdateStmt* _update; std::vector _where_filters; - int32_t _limit_count; std::vector _update_slots; std::vector _update_values; + pb::Expr _limit_offset; + pb::Expr _limit_count; }; } //namespace baikal diff --git a/include/protocol/network_server.h b/include/protocol/network_server.h index 989adca7e..1c0aa4280 100644 --- a/include/protocol/network_server.h +++ b/include/protocol/network_server.h @@ -103,6 +103,8 @@ class NetworkServer { SmartSocket create_listen_socket(); void construct_other_heart_beat_request(pb::BaikalOtherHeartBeatRequest& request); void process_other_heart_beat_response(const pb::BaikalOtherHeartBeatResponse& response); + void update_meta_list(); + void client_conn_bvars_update(); std::string state2str(SmartSocket client); @@ -149,10 +151,14 @@ class NetworkServer { Bthread _other_heartbeat_bth; Bthread _agg_sql_bth; Bthread _health_check_bth; + Bthread _conn_bvars_update_bth; uint32_t _driver_thread_num; uint64_t _instance_id = 0; std::string _physical_room; bvar::Adder _heart_beat_count; + bvar::Status _client_conn_count; + bvar::Status _client_sql_running_count; + bvar::Status _client_sql_running_max_latency; // for print_agg_sql baikal::client::Manager _manager; baikal::client::Service* _baikaldb; diff --git a/include/protocol/show_helper.h b/include/protocol/show_helper.h index 2844434cf..b43e31826 100644 --- a/include/protocol/show_helper.h +++ b/include/protocol/show_helper.h @@ -29,7 +29,6 @@ const std::string SQL_SHOW_DATABASES = "databases"; // s const std::string SQL_SHOW_NAMESPACE = "namespace"; // show namespace const std::string SQL_SHOW_META = "meta"; // show meta; const std::string SQL_SHOW_TABLE_STATUS = "table"; // show table status; -const std::string SQL_SHOW_TABLES = "tables"; // show tables; const std::string SQL_SHOW_FUNCTION_STATUS = "function"; // show function status; const std::string SQL_SHOW_PROCEDURE_STATUS = "procedure"; // show procedure status; const std::string SQL_SHOW_TRIGGERS = "triggers"; // show triggers; @@ -39,7 +38,9 @@ const std::string SQL_SHOW_WARNINGS = "warnings"; // s const std::string SQL_SHOW_PROCESSLIST = "processlist"; // show processlist; const std::string SQL_SHOW_COST = "cost"; // show cost switch; const std::string SQL_SHOW_FULL_TABLES = "full_tables"; // show full tables; +const std::string SQL_SHOW_TABLES = "tables"; // show tables; const std::string SQL_SHOW_FULL_COLUMNS = "full_columns"; // show full columns; +const std::string SQL_SHOW_COLUMNS = "columns"; // show columns; const std::string SQL_SHOW_SCHEMA_CONF = "schema_conf"; // show schema_conf database_table; const std::string SQL_SHOW_VIRTUAL_INDEX = "virtual"; // show virtual index; const std::string SQL_SHOW_SESSION_VARIABLES = "show session variables"; @@ -102,8 +103,6 @@ class ShowHelper { bool _show_abnormal_regions(const SmartSocket& client, const std::vector& split_vec); // sql: show databases; bool _show_databases(const SmartSocket& client, const std::vector& split_vec); - // sql: show tables; - bool _show_tables(const SmartSocket& client, const std::vector& split_vec); // sql: show create table tableName; bool _show_create_table(const SmartSocket& client, const std::vector& split_vec); // sql: show collation @@ -118,6 +117,8 @@ class ShowHelper { bool _show_cost(const SmartSocket& client, const std::vector& split_vec); // sql: show full tables; bool _show_full_tables(const SmartSocket& client, const std::vector& split_vec); + // sql: show tables; + bool _show_tables(const SmartSocket& client, const std::vector& split_vec); // sql: show function status; bool _show_function_status(const SmartSocket& client, const std::vector& split_vec); // sql: show procedure status; @@ -128,6 +129,8 @@ class ShowHelper { bool _show_events(const SmartSocket& client, const std::vector& split_vec); // sql: show full columns from tableName; bool _show_full_columns(const SmartSocket& client, const std::vector& split_vec); + // sql: show columns from tableName; + bool _show_columns(const SmartSocket& client, const std::vector& split_vec); // sql: show schema_conf database_table; bool _show_schema_conf(const SmartSocket& client, const std::vector& split_vec); // sql: show all_tables ttl/binlog; diff --git a/include/runtime/runtime_state.h b/include/runtime/runtime_state.h index 9266b2f41..b4dd2487e 100644 --- a/include/runtime/runtime_state.h +++ b/include/runtime/runtime_state.h @@ -408,6 +408,16 @@ class RuntimeState { int64_t get_cost_time() { return time_cost.get_time(); } + bool is_timeout() { + return _sql_exec_timeout > 0 && time_cost.get_time() > _sql_exec_timeout * 1000L; + } + + bool is_ddl_work() { + return _is_ddl_work; + } + void set_is_ddl_work(bool is_ddl_work) { + _is_ddl_work = is_ddl_work; + } public: uint64_t txn_id = 0; @@ -446,6 +456,8 @@ class RuntimeState { int keypoint_range = 100 * 10000; int partition_threshold = 10000; int range_count_limit = 0; + int64_t _sql_exec_timeout = -1; + bool _is_ddl_work = false; private: bool _is_inited = false; bool _is_cancelled = false; diff --git a/include/store/region.h b/include/store/region.h index b06f4fd74..8da76eebb 100644 --- a/include/store/region.h +++ b/include/store/region.h @@ -1601,6 +1601,7 @@ friend class Backup; std::string _rocksdb_end; pb::PeerStatus _region_status = pb::STATUS_NORMAL; BinlogAlarm _binlog_alarm; + TimeCost _binlog_update_ck_tc; //learner std::unique_ptr _learner; diff --git a/include/store/store.h b/include/store/store.h index cfdd08936..e87e6da73 100644 --- a/include/store/store.h +++ b/include/store/store.h @@ -156,6 +156,8 @@ class Store : public pb::StoreService { //上报心跳 void heart_beat_thread(); + void update_meta_list(); + void send_heart_beat(); void start_db_statistics(); diff --git a/proto/raft.proto b/proto/raft.proto index fef86eb8d..d6ec3caa5 100755 --- a/proto/raft.proto +++ b/proto/raft.proto @@ -11,6 +11,7 @@ enum RaftControlOp { ShutDown = 5; //leader and follower both can do, 只关闭,但并没有从raft group里去掉 Vote = 6; //调用node的vote接口 ResetVoteTime = 7; //调用node的reset_election_timeout_ms接口 + GetPeerList = 8; //获取当前peer list, 只能发leader }; // operation request/response @@ -31,4 +32,5 @@ message RaftControlResponse { required ErrCode errcode = 2; optional string leader = 3; optional string errmsg = 4; + repeated string peers = 5; }; diff --git a/proto/store.interface.proto b/proto/store.interface.proto index 7f9144578..64241fba7 100755 --- a/proto/store.interface.proto +++ b/proto/store.interface.proto @@ -160,6 +160,7 @@ message StoreReq { optional uint64 sql_sign = 28; // sql 签名 repeated RegionInfo multi_new_region_infos = 29; optional ExtraReq extra_req = 30; // 非关键路径上的额外信息可以放在这里,避免该message过度膨胀 + optional int64 sql_exec_timeout = 31; }; message RowValue { diff --git a/src/common/information_schema.cpp b/src/common/information_schema.cpp index 02796cf35..45832bac2 100644 --- a/src/common/information_schema.cpp +++ b/src/common/information_schema.cpp @@ -855,7 +855,11 @@ void InformationSchema::init_columns() { record->set_string(record->get_field_by_name("COLUMN_NAME"), field.short_name); record->set_int64(record->get_field_by_name("ORDINAL_POSITION"), ++i); if (field.default_expr_value.type != pb::NULL_TYPE) { - record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), field.default_value); + if (field.default_value == "(current_timestamp())") { + record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), "CURRENT_TIMESTAMP"); + } else { + record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), field.default_value); + } } record->set_string(record->get_field_by_name("IS_NULLABLE"), field.can_null ? "YES" : "NO"); record->set_string(record->get_field_by_name("DATA_TYPE"), to_mysql_type_string(field.type)); diff --git a/src/common/meta_server_interact.cpp b/src/common/meta_server_interact.cpp index 21a924f89..5ad19138b 100644 --- a/src/common/meta_server_interact.cpp +++ b/src/common/meta_server_interact.cpp @@ -23,6 +23,7 @@ DEFINE_int32(meta_connect_timeout, 5000, DEFINE_string(meta_server_bns, "group.opera-qa-baikalMeta-000-yz.FENGCHAO.all", "meta server bns"); DEFINE_string(backup_meta_server_bns, "", "backup_meta_server_bns"); DEFINE_int64(time_between_meta_connect_error_ms, 0, "time_between_meta_connect_error_ms. default(0ms)"); +DEFINE_bool(auto_update_meta_list, false, "auto_update_meta_list, default false"); int MetaServerInteract::init(bool is_backup) { if (is_backup) { @@ -51,13 +52,41 @@ int MetaServerInteract::init_internal(const std::string& meta_bns) { } else { meta_server_addr = std::string("list://") + meta_bns; } - if (_bns_channel.Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) { + std::unique_lock lck(_bns_channel_mutex); + if (_bns_channel == nullptr) { + _bns_channel = new brpc::Channel(); + } + if (_bns_channel->Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) { DB_FATAL("meta server bns pool init fail. bns_name:%s", meta_server_addr.c_str()); return -1; } _is_inited = true; return 0; } + + +int MetaServerInteract::reset_bns_channel(const std::string& meta_bns) { + brpc::ChannelOptions channel_opt; + channel_opt.timeout_ms = FLAGS_meta_request_timeout; + channel_opt.connect_timeout_ms = FLAGS_meta_connect_timeout; + std::string meta_server_addr = meta_bns; + //bns + if (meta_bns.find(":") == std::string::npos) { + meta_server_addr = std::string("bns://") + meta_bns; + } else { + meta_server_addr = std::string("list://") + meta_bns; + } + brpc::Channel *tmp = new brpc::Channel(); + if (tmp->Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) { + delete tmp; + DB_FATAL("meta server bns pool init fail. bns_name:%s", meta_server_addr.c_str()); + return -1; + } + std::unique_lock lck(_bns_channel_mutex); + SAFE_DELETE(_bns_channel); + _bns_channel = tmp; + return 0; +} } /* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/exec/delete_manager_node.cpp b/src/exec/delete_manager_node.cpp index b6189ebfb..67d72ea86 100755 --- a/src/exec/delete_manager_node.cpp +++ b/src/exec/delete_manager_node.cpp @@ -21,7 +21,8 @@ namespace baikaldb { int DeleteManagerNode::open(RuntimeState* state) { ExecNode* child_node = _children[0]; //如果没有全局二级索引,直接走逻辑 - if (child_node->node_type() != pb::SELECT_MANAGER_NODE) { + if ((child_node->node_type() != pb::SELECT_MANAGER_NODE) && + (child_node->node_type() != pb::LIMIT_NODE)) { int ret = DmlManagerNode::open(state); if (ret >= 0) { if (process_binlog(state, true) < 0) { @@ -62,8 +63,8 @@ int DeleteManagerNode::init_delete_info(const pb::UpdateNode& update_node) { } int DeleteManagerNode::open_global_delete(RuntimeState* state) { - ExecNode* select_manager_node = _children[_execute_child_idx++]; - auto ret = select_manager_node->open(state); + ExecNode* select_manager_or_limit_node = _children[_execute_child_idx++]; + auto ret = select_manager_or_limit_node->open(state); if (ret < 0) { DB_WARNING("select manager node fail"); return ret; @@ -74,7 +75,7 @@ int DeleteManagerNode::open_global_delete(RuntimeState* state) { std::vector scan_records; do { RowBatch batch; - ret = select_manager_node->get_next(state, &batch, &eos); + ret = select_manager_or_limit_node->get_next(state, &batch, &eos); if (ret < 0) { DB_WARNING("children:get_next fail:%d", ret); return ret; diff --git a/src/exec/dml_node.cpp b/src/exec/dml_node.cpp index d7e8a61d6..0b00f5446 100644 --- a/src/exec/dml_node.cpp +++ b/src/exec/dml_node.cpp @@ -175,7 +175,7 @@ int DMLNode::init_schema_info(RuntimeState* state) { // 如果更新主键或ttl表,那么影响了全部索引 if (!_update_affect_primary && !ttl) { // cstore下只更新涉及列 - if (_table_info->engine == pb::ROCKSDB_CSTORE && !_local_index_binlog) { + if (_table_info->engine == pb::ROCKSDB_CSTORE) { for (size_t i = 0; i < _update_slots.size(); i++) { auto field_id = _update_slots[i].field_id(); if (_pri_field_ids.count(field_id) == 0 && @@ -195,6 +195,14 @@ int DMLNode::init_schema_info(RuntimeState* state) { } } else { _affected_indexes = _all_indexes; + if (ttl || _local_index_binlog) { + // cstore_update_fields_partly=true时,需要用到_update_field_ids,修复ttl列存表更新默认字段不生效的bug + if (_table_info->engine == pb::ROCKSDB_CSTORE) { + for (auto iter : _field_ids) { + _update_field_ids.insert(iter.first); + } + } + } } } else { _affected_indexes = _all_indexes; @@ -214,9 +222,8 @@ int DMLNode::insert_row(RuntimeState* state, SmartRecord record, bool is_update) _indexes_ptr = &_all_indexes; } // LOCK_PRIMARY_NODE目前无法区分update与insert,暂用update兼容 - // 由于cstore的字段是分开存储的,不涉及主键与ttl时,可以优化为更新部分涉及字段. + // 由于cstore的字段是分开存储的,不涉及主键时,可以优化为更新部分涉及字段. bool cstore_update_fields_partly = !_update_affect_primary && - (_ttl_timestamp_us == 0) && (is_update || _node_type == pb::LOCK_PRIMARY_NODE); bool need_increase = true; auto& reverse_index_map = state->reverse_index_map(); diff --git a/src/exec/fetcher_store.cpp b/src/exec/fetcher_store.cpp index 267fc1ac5..d675bc854 100755 --- a/src/exec/fetcher_store.cpp +++ b/src/exec/fetcher_store.cpp @@ -36,7 +36,7 @@ DEFINE_int64(print_time_us, 10000, "print log when time_cost > print_time_us(us) DEFINE_int64(baikaldb_alive_time_s, 10 * 60, "obervation time length in baikaldb, default:10 min"); BRPC_VALIDATE_GFLAG(print_time_us, brpc::NonNegativeInteger); DEFINE_int32(fetcher_request_timeout, 100000, - "store as server request timeout, default:10000ms"); + "store as server request timeout, default:100000ms"); DEFINE_int32(fetcher_connect_timeout, 1000, "store as server connect timeout, default:1000ms"); DEFINE_bool(fetcher_follower_read, true, "where allow follower read for fether"); @@ -45,10 +45,11 @@ DEFINE_string(insulate_fetcher_resource_tag, "", "store read insulate resource_t DEFINE_string(fetcher_resource_tag, "", "store read resource_tag perfered, only first time valid"); DECLARE_int32(transaction_clear_delay_ms); DEFINE_bool(use_dynamic_timeout, false, "whether use dynamic_timeout"); +BRPC_VALIDATE_GFLAG(use_dynamic_timeout, brpc::PassValidate); DEFINE_bool(use_read_index, false, "whether use follower read"); DEFINE_bool(read_random_select_peer, false, "read random select peers"); - -BRPC_VALIDATE_GFLAG(use_dynamic_timeout, brpc::PassValidate); +DEFINE_int32(sql_exec_timeout, -1, "sql exec timeout. -1 means no limit"); +BRPC_VALIDATE_GFLAG(sql_exec_timeout, brpc::PassValidate); bvar::Adder OnRPCDone::async_rpc_region_count {"async_rpc_region_count"}; bvar::LatencyRecorder OnRPCDone::total_send_request {"total_send_request"}; bvar::LatencyRecorder OnRPCDone::add_backup_send_request {"add_backup_send_request"}; @@ -438,6 +439,21 @@ ErrorType OnRPCDone::send_async() { if (_fetcher_store->dynamic_timeout_ms > 0 && !_backup.empty() && _backup != _addr) { option.backup_request_ms = _fetcher_store->dynamic_timeout_ms; } + + if (!_state->is_ddl_work() && _request.op_type() == 4 && _state->explain_type == EXPLAIN_NULL) { + int32_t sql_exec_time_left = FLAGS_fetcher_request_timeout; + if (FLAGS_sql_exec_timeout > 0) { + int64_t sql_exec_time_left = std::min(FLAGS_sql_exec_timeout - _state->get_cost_time() / 1000, + FLAGS_fetcher_request_timeout * 1L); + if (sql_exec_time_left <= 0) { + DB_WARNING("logid: %lu, sql exec timeout, op_type: %d, total_cost: %ld", _state->log_id(), + _request.op_type(), _state->get_cost_time()); + return E_FATAL; + } + } + _request.set_sql_exec_timeout(sql_exec_time_left); + option.timeout_ms = sql_exec_time_left; + } // SelectiveChannel在init时会出core,开源版先注释掉 #ifdef BAIDU_INTERNAL brpc::SelectiveChannel channel; diff --git a/src/exec/filter_node.cpp b/src/exec/filter_node.cpp index 6f4b8f1f9..4ae61fa39 100644 --- a/src/exec/filter_node.cpp +++ b/src/exec/filter_node.cpp @@ -363,13 +363,20 @@ int FilterNode::expr_optimize(QueryContext* ctx) { } } // TODO 除了not in外,其他计算null的地方在index_selector判断了,应该统一处理 - if (expr->node_type() == pb::NOT_PREDICATE) { - if (static_cast(expr)->always_null_or_false()) { - DB_WARNING("expr not is always null or false"); - ctx->return_empty = true; - return 0; - } - } + // + // not in (结果为空的子查询)这里应该为true, 删掉 + // mysql行为比较奇怪,可以再验证下 + // select * from t1 where id not in (null) 不返回 + // select * from t1 where id not in (select id from t1 where 0=1) 返回 + // select * from t1 where id not in (select null from t1 where id=1) 返回 + // select * from t1 where id not in (select sum(null) from t1 where id=1) 不返回 + // if (expr->node_type() == pb::NOT_PREDICATE) { + // if (static_cast(expr)->always_null_or_false()) { + // DB_WARNING("expr not is always null or false"); + // ctx->return_empty = true; + // return 0; + // } + // } if (expr->children_size() < 2) { continue; } diff --git a/src/exec/index_ddl_manager_node.cpp b/src/exec/index_ddl_manager_node.cpp index 978b81028..cf36e5dc2 100755 --- a/src/exec/index_ddl_manager_node.cpp +++ b/src/exec/index_ddl_manager_node.cpp @@ -28,6 +28,7 @@ IndexDDLManagerNode::~IndexDDLManagerNode() { } int IndexDDLManagerNode::open(RuntimeState* state) { + state->set_is_ddl_work(true); int ret = 0; auto client_conn = state->client_conn(); if (client_conn == nullptr) { diff --git a/src/exec/packet_node.cpp b/src/exec/packet_node.cpp index ea9fbf29f..67155a418 100644 --- a/src/exec/packet_node.cpp +++ b/src/exec/packet_node.cpp @@ -61,6 +61,9 @@ int PacketNode::expr_optimize(QueryContext* ctx) { //db table_name先不填,后续有影响再填 _fields[i].type = to_mysql_type(expr->col_type()); _fields[i].flags = 1; + if (_fields[i].type == MYSQL_TYPE_STRING) { + _fields[i].length = 255; + } if (is_uint(expr->col_type())) { _fields[i].flags |= 32; } diff --git a/src/exec/rocksdb_scan_node.cpp b/src/exec/rocksdb_scan_node.cpp index 8dc901783..9bdeb709c 100644 --- a/src/exec/rocksdb_scan_node.cpp +++ b/src/exec/rocksdb_scan_node.cpp @@ -75,8 +75,19 @@ int RocksdbScanNode::choose_index(RuntimeState* state) { } if (pos_index.has_sort_index()) { if (pos_index.ranges_size() > 1) { - // limit没下推,不能走_sort_limit_by_range逻辑 + bool multi_range_limit = false; if (_limit != -1) { + multi_range_limit = true; + } else { + // limit没下推,并且filter条件不为空,不能走_sort_limit_by_range逻辑 + if (get_parent() != nullptr && (get_parent()->node_type() == pb::TABLE_FILTER_NODE || + get_parent()->node_type() == pb::WHERE_FILTER_NODE)) { + if (static_cast(get_parent())->mutable_conjuncts()->empty()) { + multi_range_limit = true; + } + } + } + if (multi_range_limit) { _sort_use_index_by_range = true; _sort_limit_by_range = pos_index.sort_index().sort_limit(); } @@ -624,6 +635,10 @@ int RocksdbScanNode::get_next(RuntimeState* state, RowBatch* batch, bool* eos) { *eos = true; return 0; } + if (state->is_timeout()) { + DB_WARNING_STATE(state, "sql exec reach timeout"); + return -1; + } ON_SCOPE_EXIT(([this, state]() { state->set_num_scan_rows(_scan_rows); state->set_read_disk_size(_read_disk_size); diff --git a/src/logical_plan/delete_planner.cpp b/src/logical_plan/delete_planner.cpp index def6d8665..4ede6e7aa 100644 --- a/src/logical_plan/delete_planner.cpp +++ b/src/logical_plan/delete_planner.cpp @@ -126,6 +126,9 @@ int DeletePlanner::plan() { if (0 != create_delete_node(delete_node)) { return -1; } + if (0 != create_limit_node()) { + return -1; + } if (0 != create_sort_node()) { return -1; } @@ -148,6 +151,31 @@ int DeletePlanner::plan() { return 0; } +int DeletePlanner::create_limit_node() { + if (_delete_stmt->limit == nullptr) { + return 0; + } + pb::PlanNode* limit_node = _ctx->add_plan_node(); + limit_node->set_node_type(pb::LIMIT_NODE); + limit_node->set_limit(-1); + limit_node->set_is_explain(_ctx->is_explain); + limit_node->set_num_children(1); //TODO + + pb::DerivePlanNode* derive = limit_node->mutable_derive_node(); + pb::LimitNode* limit = derive->mutable_limit_node(); + if (_limit_offset.nodes_size() > 0) { + limit->mutable_offset_expr()->CopyFrom(_limit_offset); + limit->set_offset(0); + } else { + limit->set_offset(0); + } + + if (_limit_count.nodes_size() > 0) { + limit->mutable_count_expr()->CopyFrom(_limit_count); + } + return 0; +} + int DeletePlanner::create_delete_node(pb::PlanNode* delete_node) { if (_current_tables.size() != 1 || _plan_table_ctx->table_tuple_mapping.count(try_to_lower(_current_tables[0])) == 0) { DB_WARNING("invalid sql format: %s", _ctx->sql.c_str()); @@ -249,27 +277,25 @@ int DeletePlanner::parse_where() { int DeletePlanner::parse_orderby() { if (_delete_stmt != nullptr && _delete_stmt->order != nullptr) { - DB_WARNING("delete does not support orderby"); - return -1; + return create_orderby_exprs(_delete_stmt->order); } return 0; } int DeletePlanner::parse_limit() { - if (_delete_stmt->limit != nullptr) { - _ctx->stat_info.error_code = ER_SYNTAX_ERROR; - _ctx->stat_info.error_msg << "syntax error! delete does not support limit"; + if (_delete_stmt->limit == nullptr) { + return 0; + } + parser::LimitClause* limit = _delete_stmt->limit; + if (limit->offset != nullptr && 0 != create_expr_tree(limit->offset, _limit_offset, CreateExprOptions())) { + DB_WARNING("create limit offset expr failed"); + return -1; + } + if (limit->count != nullptr && 0 != create_expr_tree(limit->count, _limit_count, CreateExprOptions())) { + DB_WARNING("create limit offset expr failed"); return -1; } - // parser::LimitClause* limit = _delete_stmt->limit; - // if (limit->offset != nullptr && 0 != create_expr_tree(limit->offset, _limit_offset)) { - // DB_WARNING("create limit offset expr failed"); - // return -1; - // } - // if (limit->count != nullptr && 0 != create_expr_tree(limit->count, _limit_count)) { - // DB_WARNING("create limit offset expr failed"); - // return -1; - // } + _ctx->execute_global_flow = true; return 0; } diff --git a/src/logical_plan/setkv_planner.cpp b/src/logical_plan/setkv_planner.cpp index cf3035d45..74809d59c 100644 --- a/src/logical_plan/setkv_planner.cpp +++ b/src/logical_plan/setkv_planner.cpp @@ -75,9 +75,9 @@ int SetKVPlanner::plan() { //_ctx->succ_after_logical_plan = true; return set_autocommit(var_assign->value); } else if (key == "sql_mode") { - // ignore sql_mode: may be support in the future - _ctx->succ_after_logical_plan = true; - return 0; + // SET sql_mode = "compatible" 时, show create table将兼容mysql模式, + // 去掉resource_tag等特殊格式,方便建表语句的导入导出。 + return set_sql_mode(var_assign->value); } else { DB_WARNING("unrecoginized command: %s", _ctx->sql.c_str()); _ctx->succ_after_logical_plan = true; @@ -146,6 +146,29 @@ int SetKVPlanner::set_autocommit_1() { return 0; } +int SetKVPlanner::set_sql_mode(parser::ExprNode* expr) { + if (expr->expr_type != parser::ET_LITETAL) { + DB_WARNING("invalid expr type: %d", expr->expr_type); + return -1; + } + parser::LiteralExpr* literal = (parser::LiteralExpr*)expr; + if (literal->literal_type != parser::LT_STRING) { + DB_WARNING("invalid literal expr type: %d", literal->literal_type); + return -1; + } + auto client = _ctx->client_conn; + pb::ExprNode str_node; + str_node.set_node_type(pb::STRING_LITERAL); + str_node.set_col_type(pb::STRING); + str_node.set_num_children(0); + if (!literal->_u.str_val.empty()) { + str_node.mutable_derive_node()->set_string_val(literal->_u.str_val.to_string()); + } + client->session_vars["sql_mode"] = str_node; + _ctx->succ_after_logical_plan = true; + return 0; +} + int SetKVPlanner::set_user_variable(const std::string& key, parser::ExprNode* expr) { auto client = _ctx->client_conn; pb::Expr var_expr_pb; diff --git a/src/logical_plan/update_planner.cpp b/src/logical_plan/update_planner.cpp index 0d8a4b6c6..451e93089 100644 --- a/src/logical_plan/update_planner.cpp +++ b/src/logical_plan/update_planner.cpp @@ -57,6 +57,9 @@ int UpdatePlanner::plan() { if (0 != create_update_node(update_node)) { return -1; } + if (0 != create_limit_node()) { + return -1; + } if (0 != create_sort_node()) { return -1; } @@ -80,9 +83,16 @@ int UpdatePlanner::plan() { } int UpdatePlanner::parse_limit() { - if (_update->limit != nullptr) { - _ctx->stat_info.error_code = ER_SYNTAX_ERROR; - _ctx->stat_info.error_msg << "syntax error! update does not support limit"; + if (_update->limit == nullptr) { + return 0; + } + parser::LimitClause* limit = _update->limit; + if (limit->offset != nullptr && 0 != create_expr_tree(limit->offset, _limit_offset, CreateExprOptions())) { + DB_WARNING("create limit offset expr failed"); + return -1; + } + if (limit->count != nullptr && 0 != create_expr_tree(limit->count, _limit_count, CreateExprOptions())) { + DB_WARNING("create limit offset expr failed"); return -1; } return 0; @@ -105,7 +115,7 @@ int UpdatePlanner::create_update_node(pb::PlanNode* update_node) { } update_node->set_node_type(pb::UPDATE_NODE); - update_node->set_limit(_limit_count); + update_node->set_limit(-1); update_node->set_is_explain(_ctx->is_explain); update_node->set_num_children(1); //TODO pb::DerivePlanNode* derive = update_node->mutable_derive_node(); @@ -129,6 +139,32 @@ int UpdatePlanner::create_update_node(pb::PlanNode* update_node) { return 0; } +int UpdatePlanner::create_limit_node() { + if (_update->limit == nullptr) { + return 0; + } + pb::PlanNode* limit_node = _ctx->add_plan_node(); + limit_node->set_node_type(pb::LIMIT_NODE); + limit_node->set_limit(-1); + limit_node->set_is_explain(_ctx->is_explain); + limit_node->set_num_children(1); //TODO + + pb::DerivePlanNode* derive = limit_node->mutable_derive_node(); + pb::LimitNode* limit = derive->mutable_limit_node(); + if (_limit_offset.nodes_size() > 0) { + limit->mutable_offset_expr()->CopyFrom(_limit_offset); + limit->set_offset(0); + } else { + limit->set_offset(0); + } + + if (_limit_count.nodes_size() > 0) { + limit->mutable_count_expr()->CopyFrom(_limit_count); + } + _ctx->execute_global_flow = true; + return 0; +} + int UpdatePlanner::parse_kv_list() { ScanTupleInfo& info = _plan_table_ctx->table_tuple_mapping[try_to_lower(_current_tables[0])]; int64_t table_id = info.table_id; @@ -241,8 +277,7 @@ int UpdatePlanner::parse_where() { int UpdatePlanner::parse_orderby() { if (_update->order != nullptr) { - DB_WARNING("update doesnot support orderby"); - return -1; + return create_orderby_exprs(_update->order); } return 0; } diff --git a/src/physical_plan/separate.cpp b/src/physical_plan/separate.cpp index 7403a5555..8870bcbb9 100644 --- a/src/physical_plan/separate.cpp +++ b/src/physical_plan/separate.cpp @@ -531,8 +531,8 @@ int Separate::separate_join(QueryContext* ctx, const std::vector& joi } bool Separate::need_separate_single_txn(QueryContext* ctx, const int64_t main_table_id) { - if (ctx->get_runtime_state()->single_sql_autocommit() && - (ctx->enable_2pc + if (ctx->get_runtime_state()->single_sql_autocommit() && + (ctx->enable_2pc || _factory->need_begin_txn(main_table_id) || ctx->open_binlog || ctx->execute_global_flow)) { auto client = ctx->client_conn; @@ -683,6 +683,9 @@ int Separate::separate_global_insert(InsertManagerNode* manager_node, InsertNode create_lock_node(table_id, pb::LOCK_DML, Separate::BOTH, manager_node); } // 复用 + if (insert_node->get_parent() != nullptr) { + insert_node->get_parent()->clear_children(); + } delete insert_node; return 0; } @@ -744,7 +747,7 @@ int Separate::create_lock_node( const std::vector& global_affected_indexs, const std::vector& local_affected_indexs, ExecNode* manager_node) { - + //构造LockAndPutPrimaryNode if (mode == Separate::BOTH || mode == Separate::PRIMARY) { std::unique_ptr primary_node(new (std::nothrow) LockPrimaryNode); @@ -761,7 +764,7 @@ int Separate::create_lock_node( lock_primary_node->set_table_id(table_id); lock_primary_node->set_row_ttl_duration_s(_row_ttl_duration); primary_node->init(plan_node); - primary_node->set_affected_index_ids(local_affected_indexs); + primary_node->set_affected_index_ids(local_affected_indexs); manager_node->add_child(primary_node.release()); } //构造LockAndPutSecondaryNode @@ -853,7 +856,7 @@ int Separate::separate_delete(QueryContext* ctx) { auto region_infos = static_cast(scan_nodes[0])->region_infos(); manager_node->set_op_type(pb::OP_DELETE); manager_node->set_region_infos(region_infos); - manager_node->add_child(packet_node->children(0)); + manager_node->add_child(packet_node->children(0)); } else { int ret = separate_global_delete(manager_node.get(), delete_node, scan_nodes[0]); if (ret < 0) { @@ -896,11 +899,29 @@ int Separate::separate_global_update( return -1; } select_manager_node->set_region_infos(scan_node->region_infos()); - select_manager_node->add_child(update_node->children(0)); - delete_manager_node->add_child(select_manager_node.release()); + + LimitNode* limit_node = static_cast(update_node->get_node(pb::LIMIT_NODE)); + SortNode* sort_node = static_cast(update_node->get_node(pb::SORT_NODE)); + if (limit_node != nullptr) { + if (sort_node != nullptr) { + select_manager_node->init_sort_info(sort_node); + sort_node->set_limit(limit_node->other_limit()); + } + select_manager_node->set_limit(limit_node->other_limit()); + select_manager_node->add_child(limit_node->children(0)); + limit_node->clear_children(); + limit_node->add_child(select_manager_node.release()); + delete_manager_node->add_child(limit_node); + } else { + select_manager_node->add_child(update_node->children(0)); + delete_manager_node->add_child(select_manager_node.release()); + } manager_node->set_update_exprs(update_node->update_exprs()); update_node->clear_children(); update_node->clear_update_exprs(); + if (update_node->get_parent() != nullptr) { + update_node->get_parent()->clear_children(); + } delete update_node; create_lock_node( main_table_id, @@ -942,9 +963,9 @@ int Separate::separate_global_update( return -1; } insert_manager_node->init(pb_insert_manager_node); - create_lock_node(main_table_id, pb::LOCK_DML, Separate::BOTH, - manager_node->global_affected_index_ids(), - manager_node->local_affected_index_ids(), + create_lock_node(main_table_id, pb::LOCK_DML, Separate::BOTH, + manager_node->global_affected_index_ids(), + manager_node->local_affected_index_ids(), insert_manager_node.get()); pri_node = static_cast(insert_manager_node->children(0)); pri_node->set_affect_primary(manager_node->affect_primary()); @@ -976,8 +997,23 @@ int Separate::separate_global_delete( std::map region_infos = static_cast(scan_node)->region_infos(); select_manager_node->set_region_infos(region_infos); - select_manager_node->add_child(delete_node->children(0)); - manager_node->add_child(select_manager_node.release()); + + LimitNode* limit_node = static_cast(delete_node->get_node(pb::LIMIT_NODE)); + SortNode* sort_node = static_cast(delete_node->get_node(pb::SORT_NODE)); + if (limit_node != nullptr) { + if (sort_node != nullptr) { + select_manager_node->init_sort_info(sort_node); + sort_node->set_limit(limit_node->other_limit()); + } + select_manager_node->set_limit(limit_node->other_limit()); + select_manager_node->add_child(limit_node->children(0)); + limit_node->clear_children(); + limit_node->add_child(select_manager_node.release()); + manager_node->add_child(limit_node); + } else { + select_manager_node->add_child(delete_node->children(0)); + manager_node->add_child(select_manager_node.release()); + } delete_node->clear_children(); create_lock_node(main_table_id, pb::LOCK_GET_DML, Separate::PRIMARY, manager_node); @@ -997,6 +1033,9 @@ int Separate::separate_global_delete( pri_node->add_conjunct(conjunct); } } + if (delete_node->get_parent() != nullptr) { + delete_node->get_parent()->clear_children(); + } delete delete_node; return 0; } diff --git a/src/protocol/network_server.cpp b/src/protocol/network_server.cpp index e19945c0a..cc131dcda 100644 --- a/src/protocol/network_server.cpp +++ b/src/protocol/network_server.cpp @@ -55,6 +55,7 @@ DECLARE_int32(baikal_heartbeat_interval_us); DEFINE_bool(open_to_collect_slow_query_infos, false, "open to collect slow_query_infos, default: false"); DEFINE_uint64(limit_slow_sql_size, 50, "each sign to slow query sql counts, default: 50"); DEFINE_int32(slow_query_batch_size, 100, "slow query sql batch size, default: 100"); +DECLARE_bool(auto_update_meta_list); static const std::string instance_table_name = "INTERNAL.baikaldb.__baikaldb_instance"; @@ -97,11 +98,45 @@ void NetworkServer::report_heart_beat() { } } + if (FLAGS_auto_update_meta_list) { + update_meta_list(); + } + _heart_beat_count << -1; bthread_usleep_fast_shutdown(FLAGS_baikal_heartbeat_interval_us, _shutdown); } } +void NetworkServer::update_meta_list() { + pb::RaftControlRequest req; + req.set_op_type(pb::GetPeerList); + pb::RaftControlResponse res; + if (!MetaServerInteract::get_instance()->is_inited()) { + return; + } + if (MetaServerInteract::get_instance()->send_request("raft_control", req, res) == 0) { + std::string meta_list = ""; + for (auto i = 0; i < res.peers_size(); i ++) { + if (i != 0) { + meta_list += ","; + } + meta_list += res.peers(i); + } + if (meta_list != "" && meta_list != FLAGS_meta_server_bns) { + DB_WARNING("meta list %s change to:%s", FLAGS_meta_server_bns.c_str(), meta_list.c_str()); + if (MetaServerInteract::get_instance()->reset_bns_channel(meta_list) == 0) { + FLAGS_meta_server_bns = meta_list; + } + if (MetaServerInteract::get_auto_incr_instance()->is_inited()) { + MetaServerInteract::get_auto_incr_instance()->reset_bns_channel(meta_list); + } + if (MetaServerInteract::get_tso_instance()->is_inited()) { + MetaServerInteract::get_tso_instance()->reset_bns_channel(meta_list); + } + } + } +} + void NetworkServer::report_other_heart_beat() { while (!_shutdown) { TimeCost cost; @@ -974,12 +1009,30 @@ void NetworkServer::connection_timeout_check() { int query_time_diff = time_now - ctx->stat_info.start_stamp.tv_sec; if (query_time_diff > FLAGS_slow_query_timeout_s) { - DB_NOTICE("query is slow, [cost=%d][fd=%d][ip=%s:%d][now=%ld][active=%ld][user=%s][log_id=%lu][sql=%s]", + std::string sql = ctx->sql; + size_t slow_idx = 0; + bool is_blank = false; + for (size_t i = 0; i < sql.size(); i++) { + if (sql[i] == ' ' || sql[i] == '\t' || sql[i] == '\n') { + if (!is_blank) { + sql[slow_idx++] = ' '; + is_blank = true; + } + } else { + is_blank = false; + sql[slow_idx++] = sql[i]; + } + } + sql.resize(slow_idx); + DB_NOTICE("query is slow, [cost=%d][fd=%d][ip=%s:%d][now=%ld][active=%ld][user=%s][log_id=%lu][conn_id=%ld][sign=%lu][server_addr=%s:%d][sql=%s]", query_time_diff, sock->fd, sock->ip.c_str(), sock->port, time_now, sock->last_active, sock->user_info->username.c_str(), ctx->stat_info.log_id, - ctx->sql.c_str()); + sock->conn_id, + ctx->stat_info.sign, + butil::my_ip_cstr(), FLAGS_baikal_port, + sql.c_str()); if (FLAGS_open_to_collect_slow_query_infos) { SlowQueryInfo slow_query_info(ctx->stat_info.log_id, ctx->stat_info.sign, @@ -1134,7 +1187,10 @@ NetworkServer::NetworkServer(): _is_init(false), _shutdown(false), _epoll_info(NULL), - _heart_beat_count("heart_beat_count") { + _heart_beat_count("heart_beat_count"), + _client_conn_count("client_connection_count", 0), + _client_sql_running_count("client_sql_running_count", 0), + _client_sql_running_max_latency("client_sql_running_max_latency", 0) { } NetworkServer::~NetworkServer() { @@ -1399,6 +1455,7 @@ int NetworkServer::make_worker_process() { _health_check_bth.run([this]() {store_health_check();}); } + _conn_bvars_update_bth.run([this](){client_conn_bvars_update();}); // Create listen socket. _service = create_listen_socket(); @@ -1611,5 +1668,42 @@ bool NetworkServer::set_fd_flags(int fd) { return true; } +void NetworkServer::client_conn_bvars_update() { + while (!_shutdown) { + bthread_usleep(2000000); + int32_t running_sql_cnt = 0; + int32_t client_cnt = 0; + int32_t max_running_time = 0; + EpollInfo* epoll_info = NetworkServer::get_instance()->get_epoll_info(); + for (int32_t idx = 0; idx < CONFIG_MPL_EPOLL_MAX_SIZE; ++idx) { + const SmartSocket& sock = epoll_info->get_fd_mapping(idx); + if (sock == NULL || sock->is_free || sock->fd == -1 || sock->ip == "") { + if (sock != NULL) { + DB_WARNING_CLIENT(sock, "processlist, free:%d", sock->is_free); + } + continue; + } + if (!sock->user_info || !sock->query_ctx) { + continue; + } + auto query_ctx = sock->get_query_ctx(); + if (query_ctx == nullptr) { + continue; + } + auto command = query_ctx->mysql_cmd; + client_cnt ++; + if (command != COM_SLEEP) { + running_sql_cnt ++; + int32_t cost = time(NULL) - sock->last_active; + if (cost > max_running_time) { + max_running_time = cost; + } + } + } + _client_conn_count.set_value(client_cnt); + _client_sql_running_count.set_value(running_sql_cnt); + _client_sql_running_max_latency.set_value(max_running_time); + } +} } // namespace baikal diff --git a/src/protocol/show_helper.cpp b/src/protocol/show_helper.cpp index b945d30d1..d6997e482 100644 --- a/src/protocol/show_helper.cpp +++ b/src/protocol/show_helper.cpp @@ -42,8 +42,12 @@ void ShowHelper::init() { this, std::placeholders::_1, std::placeholders::_2); _calls[SQL_SHOW_FULL_COLUMNS] = std::bind(&ShowHelper::_show_full_columns, this, std::placeholders::_1, std::placeholders::_2); + _calls[SQL_SHOW_COLUMNS] = std::bind(&ShowHelper::_show_columns, + this, std::placeholders::_1, std::placeholders::_2); _calls[SQL_SHOW_FULL_TABLES] = std::bind(&ShowHelper::_show_full_tables, this, std::placeholders::_1, std::placeholders::_2); + _calls[SQL_SHOW_TABLES] = std::bind(&ShowHelper::_show_tables, + this, std::placeholders::_1, std::placeholders::_2); _calls[SQL_SHOW_SCHEMA_CONF] = std::bind(&ShowHelper::_show_schema_conf, this, std::placeholders::_1, std::placeholders::_2); _calls[SQL_SHOW_TABLE_STATUS] = std::bind(&ShowHelper::_show_table_status, @@ -153,15 +157,8 @@ bool ShowHelper::execute(const SmartSocket& client) { } auto iter = _calls.find(key); if (iter == _calls.end() || iter->second == nullptr) { - // Make mysql packet. - std::vector fields; - std::vector< std::vector > rows; - if (_make_common_resultset_packet(client, fields, rows) != 0) { - DB_FATAL_CLIENT(client, "Failed to make result packet."); - _wrapper->make_err_packet(client, ER_MAKE_RESULT_PACKET, "Failed to make result packet."); - client->state = STATE_ERROR; - return true; - } + _wrapper->make_simple_ok_packet(client); + client->state = STATE_READ_QUERY_RESULT; return true; } return iter->second(client, split_vec); @@ -829,92 +826,21 @@ bool ShowHelper::_show_procedure_status(const SmartSocket& client, const std::ve return true; } -bool ShowHelper::_show_tables(const SmartSocket& client, const std::vector& split_vec) { +bool ShowHelper::_show_create_table(const SmartSocket& client, const std::vector& split_vec) { SchemaFactory* factory = SchemaFactory::get_instance(); if (client == nullptr || client->query_ctx == nullptr || client->user_info == nullptr || factory == nullptr) { DB_FATAL("param invalid"); //client->state = STATE_ERROR; return false; } - - std::string namespace_ = client->user_info->namespace_; - std::string db = client->current_db; - if (split_vec.size() == 4) { - db = remove_quote(split_vec[3].c_str(), '`'); - } - if (db == "") { - DB_WARNING("no database selected"); - _wrapper->make_err_packet(client, ER_NO_DB_ERROR, "No database selected"); - client->state = STATE_READ_QUERY_RESULT; - return false; - } - if (db == "information_schema") { - namespace_ = "INTERNAL"; - } - - // Make fields. - std::vector fields; - fields.reserve(1); - do { - ResultField field; - field.name = "Tables_in_" + db; - field.db = db; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - - // Make rows. - std::vector< std::vector > rows; - rows.reserve(10); - std::vector tables = factory->get_table_list( - namespace_, db, client->user_info.get()); - for (uint32_t cnt = 0; cnt < tables.size(); ++cnt) { - //DB_NOTICE("table:%s", tables[cnt].c_str()); - std::vector row; - row.emplace_back(tables[cnt]); - rows.emplace_back(row); - } - - // Make mysql packet. - if (_make_common_resultset_packet(client, fields, rows) != 0) { - DB_FATAL_CLIENT(client, "Failed to make result packet."); - _wrapper->make_err_packet(client, ER_MAKE_RESULT_PACKET, "%s", client->query_ctx->sql.c_str()); - client->state = STATE_ERROR; - return false; + bool compatible = false; + auto iter = client->session_vars.find("sql_mode"); + if (iter != client->session_vars.end() && iter->second.node_type() == pb::STRING_LITERAL) { + if (iter->second.derive_node().string_val() == "compatible") { + compatible = true; + } } - client->state = STATE_READ_QUERY_RESULT; - return true; -} -bool ShowHelper::_show_create_table(const SmartSocket& client, const std::vector& split_vec) { - SchemaFactory* factory = SchemaFactory::get_instance(); - if (client == nullptr || client->query_ctx == nullptr || client->user_info == nullptr || factory == nullptr) { - DB_FATAL("param invalid"); - //client->state = STATE_ERROR; - return false; - } - static std::map type_map = { - {pb::BOOL, "boolean"}, - {pb::INT8, "tinyint(4)"}, - {pb::UINT8, "tinyint(4) unsigned"}, - {pb::INT16, "smallint(6)"}, - {pb::UINT16, "smallint(6) unsigned"}, - {pb::INT32, "int(10)"}, - {pb::UINT32, "int(10) unsigned"}, - {pb::INT64, "bigint(20)"}, - {pb::UINT64, "bigint(20) unsigned"}, - {pb::FLOAT, "float"}, - {pb::DOUBLE, "double"}, - {pb::STRING, "varchar(1024)"}, - {pb::DATETIME, "DATETIME"}, - {pb::TIME, "TIME"}, - {pb::TIMESTAMP, "TIMESTAMP"}, - {pb::DATE, "DATE"}, - {pb::HLL, "HLL"}, - {pb::BITMAP, "BITMAP"}, - {pb::TDIGEST, "TDIGEST"}, - }; static std::map index_map = { {pb::I_PRIMARY, "PRIMARY KEY"}, {pb::I_UNIQ, "UNIQUE KEY"}, @@ -988,7 +914,7 @@ bool ShowHelper::_show_create_table(const SmartSocket& client, const std::vector continue; } oss << " " << "`" << field.short_name << "` "; - oss << type_map[field.type]; + oss << to_mysql_type_full_string[field.type]; if ((field.type == pb::FLOAT || field.type == pb::DOUBLE) && field.float_total_len != -1 && field.float_precision_len != -1) { oss << "(" << field.float_total_len << "," << field.float_precision_len << ")"; } @@ -1025,159 +951,196 @@ bool ShowHelper::_show_create_table(const SmartSocket& client, const std::vector } continue; } - if (index_info.is_global) { - oss << " " << index_map[index_info.type] << " GLOBAL "; - } else if (index_info.type == pb::I_PRIMARY || index_info.type == pb::I_FULLTEXT || index_info.type == pb::I_VECTOR) { - oss << " " << index_map[index_info.type] << " "; - } else { - oss << " " << index_map[index_info.type] << " LOCAL "; - } - if (index_info.index_hint_status == pb::IHS_VIRTUAL) { - oss << "VIRTUAL "; - } - if (index_info.type != pb::I_PRIMARY) { - std::vector split_vec; - boost::split(split_vec, index_info.name, - boost::is_any_of("."), boost::token_compress_on); - oss << "`" << split_vec[split_vec.size() - 1] << "` "; - } - oss << "("; - uint32_t field_idx = 0; - for (auto& field : index_info.fields) { - std::vector split_vec; - boost::split(split_vec, field.name, - boost::is_any_of("."), boost::token_compress_on); - if (++field_idx < index_info.fields.size()) { - oss << "`" << split_vec[split_vec.size() - 1] << "`,"; + if (!compatible) { + if (index_info.is_global) { + oss << " " << index_map[index_info.type] << " GLOBAL "; + } else if (index_info.type == pb::I_PRIMARY || index_info.type == pb::I_FULLTEXT || index_info.type == pb::I_VECTOR) { + oss << " " << index_map[index_info.type] << " "; } else { - oss << "`" << split_vec[split_vec.size() - 1] << "`"; + oss << " " << index_map[index_info.type] << " LOCAL "; } + if (index_info.index_hint_status == pb::IHS_VIRTUAL) { + oss << "VIRTUAL "; + } + if (index_info.type != pb::I_PRIMARY) { + // std::vector split_vec; + // boost::split(split_vec, index_info.name, + // boost::is_any_of("."), boost::token_compress_on); + // oss << "`" << split_vec[split_vec.size() - 1] << "` "; + oss << "`" << index_info.short_name << "`"; + } + oss << "("; + uint32_t field_idx = 0; + for (auto& field : index_info.fields) { + std::vector split_vec; + boost::split(split_vec, field.name, + boost::is_any_of("."), boost::token_compress_on); + if (++field_idx < index_info.fields.size()) { + oss << "`" << split_vec[split_vec.size() - 1] << "`,"; + } else { + oss << "`" << split_vec[split_vec.size() - 1] << "`"; + } + } + oss << ") COMMENT '{\"index_state\":\""; + oss << pb::IndexState_Name(index_info.state) << "\", "; + if (index_info.type == pb::I_FULLTEXT) { + oss << "\"segment_type\":\"" << pb::SegmentType_Name(index_info.segment_type) << "\", "; + oss << "\"storage_type\":\"" << pb::StorageType_Name(index_info.storage_type) << "\", "; + } + if (index_info.type == pb::I_VECTOR) { + oss << "\"vector_description\":\"" << index_info.vector_description << "\", "; + oss << "\"dimension\":" << index_info.dimension << ", "; + oss << "\"nprobe\":" << index_info.nprobe << ", "; + oss << "\"metric_type\":\"" << pb::MetricType_Name(index_info.metric_type) << "\", "; + } + oss << "\"hint_status\":\"" << pb::IndexHintStatus_Name(index_info.index_hint_status) << "\"}'"; + } else { + if (index_info.index_hint_status == pb::IHS_VIRTUAL) { + continue; + } + oss << " " << index_map[index_info.type] << " "; + if (index_info.type != pb::I_PRIMARY) { + std::vector split_vec; + boost::split(split_vec, index_info.name, + boost::is_any_of("."), boost::token_compress_on); + oss << "`" << split_vec[split_vec.size() - 1] << "` "; + } + oss << "("; + uint32_t field_idx = 0; + for (auto& field : index_info.fields) { + std::vector split_vec; + boost::split(split_vec, field.name, + boost::is_any_of("."), boost::token_compress_on); + if (++field_idx < index_info.fields.size()) { + oss << "`" << split_vec[split_vec.size() - 1] << "`,"; + } else { + oss << "`" << split_vec[split_vec.size() - 1] << "`"; + } + } + oss << ")"; + } - oss << ") COMMENT '{\"index_state\":\""; - oss << pb::IndexState_Name(index_info.state) << "\", "; - if (index_info.type == pb::I_FULLTEXT) { - oss << "\"segment_type\":\"" << pb::SegmentType_Name(index_info.segment_type) << "\", "; - oss << "\"storage_type\":\"" << pb::StorageType_Name(index_info.storage_type) << "\", "; - } - if (index_info.type == pb::I_VECTOR) { - oss << "\"vector_description\":\"" << index_info.vector_description << "\", "; - oss << "\"dimension\":" << index_info.dimension << ", "; - oss << "\"nprobe\":" << index_info.nprobe << ", "; - oss << "\"metric_type\":\"" << pb::MetricType_Name(index_info.metric_type) << "\", "; - } - oss << "\"hint_status\":\"" << pb::IndexHintStatus_Name(index_info.index_hint_status) << "\"}'"; + if (++index_idx < info.indices.size()) { oss << ",\n"; } else { oss << "\n"; } } - static std::map engine_map = { - {pb::ROCKSDB, "Rocksdb"}, - {pb::REDIS, "Redis"}, - {pb::ROCKSDB_CSTORE, "Rocksdb_cstore"}, - {pb::BINLOG, "Binlog"}, - {pb::INFORMATION_SCHEMA, "MEMORY"} - }; - oss << ") ENGINE=" << engine_map[info.engine]; - oss << " DEFAULT CHARSET=" << charset_map[info.charset]; - oss <<" AVG_ROW_LENGTH=" << info.byte_size_per_record; - oss << " COMMENT='{\"resource_tag\":\"" << info.resource_tag << "\""; - if (!info.comment.empty()) { - oss << ", \"comment\":\"" << info.comment << "\""; - } - oss << ", \"replica_num\":" << info.replica_num; - oss << ", \"region_split_lines\":" << info.region_split_lines; - if (info.ttl_info.ttl_duration_s > 0) { - oss << ", \"ttl_duration\":" << info.ttl_info.ttl_duration_s; - } - if (info.learner_resource_tags.size() > 0) { - oss << ", \"learner_resource_tag\": ["; - for (size_t i = 0; i < info.learner_resource_tags.size(); i++) { - oss << "\"" << info.learner_resource_tags[i] << "\""; - if (i != info.learner_resource_tags.size() - 1) { - oss << ","; - } + + if (!compatible) { + static std::map engine_map = { + {pb::ROCKSDB, "Rocksdb"}, + {pb::REDIS, "Redis"}, + {pb::ROCKSDB_CSTORE, "Rocksdb_cstore"}, + {pb::BINLOG, "Binlog"}, + {pb::INFORMATION_SCHEMA, "MEMORY"} + }; + oss << ") ENGINE=" << engine_map[info.engine]; + oss << " DEFAULT CHARSET=" << charset_map[info.charset]; + oss <<" AVG_ROW_LENGTH=" << info.byte_size_per_record; + oss << " COMMENT='{\"resource_tag\":\"" << info.resource_tag << "\""; + if (!info.comment.empty()) { + oss << ", \"comment\":\"" << info.comment << "\""; } - oss << "]"; - } - if (info.dists.size() > 0) { - oss << ", \"dists\": ["; - for (size_t i = 0; i < info.dists.size(); ++i) { - oss << " { "; - if (!info.dists[i].resource_tag.empty()) { - oss << "\"resource_tag\":\"" << info.dists[i].resource_tag << "\","; - } - if (!info.dists[i].logical_room.empty()) { - oss << "\"logical_room\":\"" << info.dists[i].logical_room << "\", "; - } - if (!info.dists[i].physical_room.empty()) { - oss << "\"physical_room\":\"" << info.dists[i].physical_room << "\", "; - } - oss << "\"count\":" << info.dists[i].count << "}"; - if (i != info.dists.size() -1) { - oss << ","; + oss << ", \"replica_num\":" << info.replica_num; + oss << ", \"region_split_lines\":" << info.region_split_lines; + if (info.ttl_info.ttl_duration_s > 0) { + oss << ", \"ttl_duration\":" << info.ttl_info.ttl_duration_s; + } + if (info.learner_resource_tags.size() > 0) { + oss << ", \"learner_resource_tag\": ["; + for (size_t i = 0; i < info.learner_resource_tags.size(); i++) { + oss << "\"" << info.learner_resource_tags[i] << "\""; + if (i != info.learner_resource_tags.size() - 1) { + oss << ","; + } } + oss << "]"; } - oss << "]"; - } - if (!info.main_logical_room.empty()) { - oss << ", \"main_logical_room\": \"" << info.main_logical_room << "\""; - } - - if (info.region_num > 0) { - oss << ", \"region_num\":" << info.region_num; - } - oss << ", \"namespace\":\"" << info.namespace_ << "\""; - - if (info.partition_info.has_primary_range_partition_type()) { - oss << ", \"primary_range_partition_type\": \"" - << pb::RangePartitionType_Name(info.partition_info.primary_range_partition_type()) << "\""; - } - - const size_t type_size = info.partition_info.gen_range_partition_types().size(); - if (type_size > 0) { - oss << ", \"gen_range_partition_types\": ["; - for (size_t i = 0; i < type_size; ++i) { - oss << "\"" << pb::RangePartitionType_Name(info.partition_info.gen_range_partition_types(i)) << "\""; - if (i != type_size -1) { - oss << ","; + if (info.dists.size() > 0) { + oss << ", \"dists\": ["; + for (size_t i = 0; i < info.dists.size(); ++i) { + oss << " { "; + if (!info.dists[i].resource_tag.empty()) { + oss << "\"resource_tag\":\"" << info.dists[i].resource_tag << "\","; + } + if (!info.dists[i].logical_room.empty()) { + oss << "\"logical_room\":\"" << info.dists[i].logical_room << "\", "; + } + if (!info.dists[i].physical_room.empty()) { + oss << "\"physical_room\":\"" << info.dists[i].physical_room << "\", "; + } + oss << "\"count\":" << info.dists[i].count << "}"; + if (i != info.dists.size() -1) { + oss << ","; + } } + oss << "]"; } - oss << "]"; - } - - if (info.partition_info.has_dynamic_partition_attr()) { - const auto& dynamic_partition_attr = info.partition_info.dynamic_partition_attr(); - oss << ", \"dynamic_partition\": {"; - if (dynamic_partition_attr.has_enable()) { - oss << "\"enable\":" << std::boolalpha << dynamic_partition_attr.enable(); + if (!info.main_logical_room.empty()) { + oss << ", \"main_logical_room\": \"" << info.main_logical_room << "\""; } - if (dynamic_partition_attr.has_time_unit()) { - oss << ",\"time_unit\":\"" << dynamic_partition_attr.time_unit() << "\""; + if (info.region_num > 0) { + oss << ", \"region_num\":" << info.region_num; } - if (boost::algorithm::iequals(dynamic_partition_attr.time_unit(), "MONTH") && - dynamic_partition_attr.has_start_day_of_month()) { - oss << ",\"start_day_of_month\":" << dynamic_partition_attr.start_day_of_month(); + oss << ", \"namespace\":\"" << info.namespace_ << "\""; + + if (info.partition_info.has_primary_range_partition_type()) { + oss << ", \"primary_range_partition_type\": \"" + << pb::RangePartitionType_Name(info.partition_info.primary_range_partition_type()) << "\""; } - if (dynamic_partition_attr.has_start()) { - oss << ",\"start\":" << dynamic_partition_attr.start(); + + const size_t type_size = info.partition_info.gen_range_partition_types().size(); + if (type_size > 0) { + oss << ", \"gen_range_partition_types\": ["; + for (size_t i = 0; i < type_size; ++i) { + oss << "\"" << pb::RangePartitionType_Name(info.partition_info.gen_range_partition_types(i)) << "\""; + if (i != type_size -1) { + oss << ","; + } + } + oss << "]"; } - if (dynamic_partition_attr.has_cold()) { - oss << ",\"cold\":" << dynamic_partition_attr.cold(); + + if (info.partition_info.has_dynamic_partition_attr()) { + const auto& dynamic_partition_attr = info.partition_info.dynamic_partition_attr(); + oss << ", \"dynamic_partition\": {"; + if (dynamic_partition_attr.has_enable()) { + oss << "\"enable\":" << std::boolalpha << dynamic_partition_attr.enable(); + } + if (dynamic_partition_attr.has_time_unit()) { + oss << ",\"time_unit\":\"" << dynamic_partition_attr.time_unit() << "\""; + } + if (boost::algorithm::iequals(dynamic_partition_attr.time_unit(), "MONTH") && + dynamic_partition_attr.has_start_day_of_month()) { + oss << ",\"start_day_of_month\":" << dynamic_partition_attr.start_day_of_month(); + } + if (dynamic_partition_attr.has_start()) { + oss << ",\"start\":" << dynamic_partition_attr.start(); + } + if (dynamic_partition_attr.has_cold()) { + oss << ",\"cold\":" << dynamic_partition_attr.cold(); + } + if (dynamic_partition_attr.has_end()) { + oss << ",\"end\":" << dynamic_partition_attr.end(); + } + if (dynamic_partition_attr.has_prefix()) { + oss << ",\"prefix\":\"" << dynamic_partition_attr.prefix() << "\""; + } + oss << "}"; } - if (dynamic_partition_attr.has_end()) { - oss << ",\"end\":" << dynamic_partition_attr.end(); + oss << "}'"; + + if (info.partition_ptr != nullptr) { + oss << info.partition_ptr->to_str(); } - if (dynamic_partition_attr.has_prefix()) { - oss << ",\"prefix\":\"" << dynamic_partition_attr.prefix() << "\""; + } else { + oss << ") ENGINE=" << "InnoDB"; + oss << " DEFAULT CHARSET=" << charset_map[info.charset]; + if (!info.comment.empty()) { + oss << "COMMENT='" << info.comment << "'"; } - oss << "}"; - } - oss << "}'"; - - if (info.partition_ptr != nullptr) { - oss << info.partition_ptr->to_str(); } row.emplace_back(oss.str()); rows.emplace_back(row); @@ -1335,16 +1298,21 @@ bool ShowHelper::_show_processlist(const SmartSocket& client, const std::vector< DB_FATAL("param invalid"); return false; } - if (only_show_doing_sql && sock->query_ctx->sql.size() == 0) { + auto query_ctx = sock->get_query_ctx(); + if (query_ctx == nullptr) { + continue; + } + if (only_show_doing_sql && query_ctx->sql.size() == 0) { continue; } DB_WARNING_CLIENT(sock, "processlist, free:%d", sock->is_free); std::vector row; + row.reserve(fields.size()); row.emplace_back(std::to_string(sock->conn_id)); row.emplace_back(sock->user_info->username); row.emplace_back(sock->ip); row.emplace_back(sock->current_db); - auto command = sock->query_ctx->mysql_cmd; + auto command = query_ctx->mysql_cmd; if (command == COM_SLEEP) { row.emplace_back("Sleep"); } else { @@ -1359,7 +1327,7 @@ bool ShowHelper::_show_processlist(const SmartSocket& client, const std::vector< if (command == COM_SLEEP) { row.emplace_back(""); } else { - row.emplace_back(sock->query_ctx->sql); + row.emplace_back(query_ctx->sql); } rows.emplace_back(row); } @@ -1530,27 +1498,15 @@ bool ShowHelper::_show_full_tables(const SmartSocket& client, const std::vector< client->state = STATE_READ_QUERY_RESULT; return false; } - if (current_db == "information_schema") { + if (to_lower(current_db) == "information_schema") { + current_db = "information_schema"; namespace_ = "INTERNAL"; } // Make fields. std::vector fields; - fields.reserve(2); - do { - ResultField field; - field.name = "Tables_in_" + current_db; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Table_type"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); + fields.emplace_back(make_result_field("Tables_in_" + current_db, MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Table_type", MYSQL_TYPE_VARCHAR, 1024)); // Make rows. std::vector< std::vector > rows; @@ -1583,6 +1539,102 @@ bool ShowHelper::_show_full_tables(const SmartSocket& client, const std::vector< return true; } +bool ShowHelper::_show_tables(const SmartSocket& client, const std::vector& split_vec) { + bool is_like_pattern = false; + std::string like_pattern; + re2::RE2::Options option; + std::unique_ptr regex_ptr; + if (client == nullptr) { + DB_FATAL("param invalid"); + //client->state = STATE_ERROR; + return false; + } + SchemaFactory* factory = SchemaFactory::get_instance(); + if (factory == nullptr || client == nullptr || client->user_info == nullptr || client->query_ctx == nullptr) { + DB_FATAL("param invalid"); + return false; + } + + std::string namespace_ = client->user_info->namespace_; + std::string current_db = client->current_db; + + + if (split_vec.size() == 4) { + current_db = remove_quote(split_vec[3].c_str(), '`'); + } else if (split_vec.size() == 2) { + } else if (split_vec.size() == 6) { + // TODO: where [LIKE 'pattern' | WHERE expr] + is_like_pattern = true; + std::string like_str; + current_db = remove_quote(split_vec[3].c_str(), '`'); + like_str = remove_quote(split_vec[5].c_str(), '"'); + like_str = remove_quote(like_str.c_str(), '\''); + for (auto ch : like_str) { + if (ch == '%') { + like_pattern.append(".*"); + } else { + like_pattern.append(1, ch); + } + } + option.set_utf8(false); + option.set_case_sensitive(false); + regex_ptr.reset(new re2::RE2(like_pattern, option)); + } else if (split_vec.size() > 6 && (split_vec[4] == "where")) { + // show tables in `db` where table_type = 'BASE TABLE'; + current_db = remove_quote(split_vec[3].c_str(), '`'); + } else { + for (int i = 0; i < split_vec.size();i++) { + DB_WARNING("vec[%d] %s", i, split_vec[i].c_str()); + } + client->state = STATE_ERROR; + return false; + } + + if (current_db == "") { + DB_WARNING("no database selected"); + _wrapper->make_err_packet(client, ER_NO_DB_ERROR, "No database selected"); + client->state = STATE_READ_QUERY_RESULT; + return false; + } + if (to_lower(current_db) == "information_schema") { + current_db = "information_schema"; + namespace_ = "INTERNAL"; + } + + // Make fields. + std::vector fields; + fields.emplace_back(make_result_field("Tables_in_" + current_db, MYSQL_TYPE_VARCHAR, 1024)); + + // Make rows. + std::vector< std::vector > rows; + rows.reserve(10); + std::vector tables = factory->get_table_list( + namespace_, current_db, client->user_info.get()); + //DB_NOTICE("db:%s table.size:%d", current_db.c_str(), tables.size()); + for (uint32_t cnt = 0; cnt < tables.size(); ++cnt) { + //DB_NOTICE("table:%s", tables[cnt].c_str()); + if (is_like_pattern) { + if (!RE2::FullMatch(tables[cnt], *regex_ptr)) { + DB_NOTICE("not match"); + continue; + } + } + std::vector row; + row.emplace_back(tables[cnt]); + rows.emplace_back(row); + } + + // Make mysql packet. + if (_make_common_resultset_packet(client, fields, rows) != 0) { + DB_FATAL_CLIENT(client, "Failed to make result packet."); + _wrapper->make_err_packet(client, ER_MAKE_RESULT_PACKET, "%s", client->query_ctx->sql.c_str()); + client->state = STATE_ERROR; + return false; + } + client->state = STATE_READ_QUERY_RESULT; + return true; +} + bool ShowHelper::_show_full_columns(const SmartSocket& client, const std::vector& split_vec) { SchemaFactory* factory = SchemaFactory::get_instance(); if (client == nullptr || client->user_info == nullptr || factory == nullptr) { @@ -1598,69 +1650,15 @@ bool ShowHelper::_show_full_columns(const SmartSocket& client, const std::vector // Make fields. std::vector fields; fields.reserve(9); - do { - ResultField field; - field.name = "Field"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Type"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Collation"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Null"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Key"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "default"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Extra"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Privileges"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); - do { - ResultField field; - field.name = "Comment"; - field.type = MYSQL_TYPE_VARCHAR; - field.length = 1024; - fields.emplace_back(field); - } while (0); + fields.emplace_back(make_result_field("Field", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Type", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Collation", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Null", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Key", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Default", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Extra", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Privileges", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Comment", MYSQL_TYPE_VARCHAR, 1024)); std::string db = client->current_db; std::string table; @@ -1738,7 +1736,7 @@ bool ShowHelper::_show_full_columns(const SmartSocket& client, const std::vector } } row.emplace_back(split_vec[split_vec.size() - 1]); - row.emplace_back(PrimitiveType_Name(field.type)); + row.emplace_back(to_mysql_type_full_string(field.type)); row.emplace_back("NULL"); row.emplace_back(field.can_null ? "YES" : "NO"); if (field_index.count(field.id) == 0) { @@ -1750,7 +1748,11 @@ bool ShowHelper::_show_full_columns(const SmartSocket& client, const std::vector } row.emplace_back(index); } - row.emplace_back(field.default_value); + if (field.default_value == "(current_timestamp())") { + row.emplace_back("CURRENT_TIMESTAMP"); + } else { + row.emplace_back(field.default_value); + } if (info.auto_inc_field_id == field.id) { row.emplace_back("auto_increment"); } else { @@ -1772,6 +1774,139 @@ bool ShowHelper::_show_full_columns(const SmartSocket& client, const std::vector return true; } +bool ShowHelper::_show_columns(const SmartSocket& client, const std::vector& split_vec) { + SchemaFactory* factory = SchemaFactory::get_instance(); + if (client == nullptr || client->user_info == nullptr || factory == nullptr) { + DB_FATAL("param invalid"); + //client->state = STATE_ERROR; + return false; + } + bool is_like_pattern = false; + std::string like_pattern; + re2::RE2::Options option; + std::unique_ptr regex_ptr; + + // Make fields. + std::vector fields; + fields.reserve(6); + fields.emplace_back(make_result_field("Field", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Type", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Null", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Key", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Default", MYSQL_TYPE_VARCHAR, 1024)); + fields.emplace_back(make_result_field("Extra", MYSQL_TYPE_VARCHAR, 1024)); + + std::string db = client->current_db; + std::string table; + if (split_vec.size() == 4) { + std::string db_table = split_vec[3]; + std::string::size_type position = db_table.find_first_of('.'); + if (position == std::string::npos) { + // `table_name` + table = remove_quote(db_table.c_str(), '`'); + } else { + // `db_name`.`table_name` + db = remove_quote(db_table.substr(0, position).c_str(), '`'); + table = remove_quote(db_table.substr(position + 1, + db_table.length() - position - 1).c_str(), '`'); + } + } else if (split_vec.size() == 6) { + db = remove_quote(split_vec[5].c_str(), '`'); + table = remove_quote(split_vec[3].c_str(), '`'); + } else if (split_vec.size() == 8) { + is_like_pattern = true; + std::string like_str; + db = remove_quote(split_vec[5].c_str(), '`'); + table = remove_quote(split_vec[3].c_str(), '`'); + like_str = remove_quote(split_vec[7].c_str(), '"'); + like_str = remove_quote(like_str.c_str(), '\''); + for (auto ch : like_str) { + if (ch == '%') { + like_pattern.append(".*"); + } else { + like_pattern.append(1, ch); + } + } + option.set_utf8(false); + option.set_case_sensitive(false); + regex_ptr.reset(new re2::RE2(like_pattern, option)); + } else { + client->state = STATE_ERROR; + return false; + } + std::string namespace_ = client->user_info->namespace_; + if (db == "information_schema") { + namespace_ = "INTERNAL"; + } + std::string full_name = namespace_ + "." + db + "." + table; + int64_t table_id = -1; + if (factory->get_table_id(full_name, table_id) != 0) { + client->state = STATE_ERROR; + return false; + } + TableInfo info = factory->get_table_info(table_id); + std::map field_index; + for (auto& index_id : info.indices) { + IndexInfo index_info = factory->get_index_info(index_id); + for (auto& field : index_info.fields) { + if (field_index.count(field.id) == 0) { + field_index[field.id] = index_info; + } + } + } + // Make rows. + std::vector > rows; + rows.reserve(10); + for (auto& field : info.fields) { + if (field.deleted) { + continue; + } + std::vector row; + std::vector split_vec; + boost::split(split_vec, field.name, + boost::is_any_of(" \t\n\r."), boost::token_compress_on); + if (is_like_pattern) { + if (!RE2::FullMatch(split_vec[split_vec.size() - 1], *regex_ptr)) { + DB_NOTICE("not match"); + continue; + } + } + row.emplace_back(split_vec[split_vec.size() - 1]); + row.emplace_back(to_mysql_type_full_string(field.type)); + row.emplace_back(field.can_null ? "YES" : "NO"); + if (field_index.count(field.id) == 0) { + row.emplace_back(" "); + } else { + std::string index = IndexType_Name(field_index[field.id].type); + if (field_index[field.id].type == pb::I_FULLTEXT) { + index += "(" + pb::SegmentType_Name(field_index[field.id].segment_type) + ")"; + } + row.emplace_back(index); + } + if (field.default_value == "(current_timestamp())") { + row.emplace_back("CURRENT_TIMESTAMP"); + } else { + row.emplace_back(field.default_value); + } + if (info.auto_inc_field_id == field.id) { + row.emplace_back("auto_increment"); + } else { + row.emplace_back(" "); + } + rows.emplace_back(row); + } + + // Make mysql packet. + if (_make_common_resultset_packet(client, fields, rows) != 0) { + DB_FATAL_CLIENT(client, "Failed to make result packet."); + _wrapper->make_err_packet(client, ER_MAKE_RESULT_PACKET, "Failed to make result packet."); + client->state = STATE_ERROR; + return false; + } + client->state = STATE_READ_QUERY_RESULT; + return true; +} + bool ShowHelper::_show_table_status(const SmartSocket& client, const std::vector& split_vec) { SchemaFactory* factory = SchemaFactory::get_instance(); if (client == nullptr || client->user_info == nullptr || factory == nullptr) { diff --git a/src/raft/raft_control.cpp b/src/raft/raft_control.cpp index bbbfc1610..09d767435 100644 --- a/src/raft/raft_control.cpp +++ b/src/raft/raft_control.cpp @@ -160,6 +160,24 @@ void common_raft_control(google::protobuf::RpcController* controller, response->set_errcode(pb::SUCCESS); return; } + + case pb::GetPeerList : { + std::vector peers; + std::string leader = butil::endpoint2str(node->leader_id().addr).c_str(); + std::string self = butil::endpoint2str(node->node_id().peer_id.addr).c_str(); + if (self != leader || !node->list_peers(&peers).ok()) { + response->set_errcode(pb::NOT_LEADER); + response->set_leader(leader); + return; + } + response->set_errcode(pb::SUCCESS); + for (auto &peer : peers) { + std::string peer_string = butil::endpoint2str(peer.addr).c_str(); + response->add_peers(peer_string); + } + return; + } + default: DB_FATAL("node:%s %s unsupport request type:%d, log_id:%lu", node->node_id().group_id.c_str(), diff --git a/src/runtime/runtime_state.cpp b/src/runtime/runtime_state.cpp index a408f5fa1..0af2c095f 100644 --- a/src/runtime/runtime_state.cpp +++ b/src/runtime/runtime_state.cpp @@ -39,6 +39,10 @@ int RuntimeState::init(const pb::StoreReq& req, sign = req.sql_sign(); uint64_t tuple_sign = tuple_descs_to_sign(); + if (req.sql_exec_timeout() > 0) { + _sql_exec_timeout = req.sql_exec_timeout(); + } + //取出缓存的动态编译结果(按照签名) if (_tuple_descs.size() > 0 && sign != 0 && tuple_sign != 0) { if (sql_sign_to_mem_row_descriptor.count(tuple_sign) == 1) { diff --git a/src/store/region_binlog.cpp b/src/store/region_binlog.cpp index 47a5fc6f2..7cecdf17a 100755 --- a/src/store/region_binlog.cpp +++ b/src/store/region_binlog.cpp @@ -63,6 +63,7 @@ DECLARE_string(db_path); }\ }while (0); +DEFINE_int64(binlog_update_checkpoint_interval_us, 10 * 1000 * 1000LL, "binlog update checkpoint interval us: 10s"); void print_oldest_ts(std::ostream& os, void*) { int64_t oldest_ts = RocksWrapper::get_instance()->get_oldest_ts_in_binlog_cf(); @@ -768,9 +769,14 @@ int Region::binlog_update_check_point() { } int64_t check_point_ts = _binlog_param.ts_binlog_map.empty() ? _binlog_param.max_ts_applied : _binlog_param.ts_binlog_map.begin()->first; - if (_binlog_param.check_point_ts >= check_point_ts) { + if (_binlog_param.check_point_ts < check_point_ts) { + _binlog_param.check_point_ts = check_point_ts; + } + + if (_binlog_update_ck_tc.get_time() < FLAGS_binlog_update_checkpoint_interval_us) { return 0; } + _binlog_update_ck_tc.reset(); int ret = _meta_writer->write_binlog_check_point(_region_id, check_point_ts); if (ret != 0) { @@ -780,8 +786,6 @@ int Region::binlog_update_check_point() { DB_WARNING("region_id: %ld, check point ts %ld, %s => %ld, %s", _region_id, _binlog_param.check_point_ts, ts_to_datetime_str(_binlog_param.check_point_ts).c_str(), check_point_ts, ts_to_datetime_str(check_point_ts).c_str()); - _binlog_param.check_point_ts = check_point_ts; - return 0; } diff --git a/src/store/store.cpp b/src/store/store.cpp index 51b0cd571..f090820e7 100644 --- a/src/store/store.cpp +++ b/src/store/store.cpp @@ -91,6 +91,8 @@ DECLARE_int32(store_rocks_hang_cnt_limit); DEFINE_int32(cold_region_flush_concurrency, 5, "cold_region_flush_concurrency"); DEFINE_int32(region_size_alarm_threshold_G, 5, "default 5G"); DECLARE_string(cold_rocksdb_afs_infos); +DECLARE_string(meta_server_bns); +DECLARE_bool(auto_update_meta_list); BRPC_VALIDATE_GFLAG(rocksdb_perf_level, brpc::NonNegativeInteger); Store::~Store() { @@ -1258,6 +1260,30 @@ void Store::send_heart_beat() { _last_heart_time.reset(); heart_beat_count << -1; DB_WARNING("heart beat"); + if (FLAGS_auto_update_meta_list) { + update_meta_list(); + } +} + +void Store::update_meta_list() { + pb::RaftControlRequest req; + req.set_op_type(pb::GetPeerList); + pb::RaftControlResponse res; + if (_meta_server_interact.send_request("raft_control", req, res) == 0) { + std::string meta_list = ""; + for (auto i = 0; i < res.peers_size(); i ++) { + if (i != 0) { + meta_list += ","; + } + meta_list += res.peers(i); + } + if (meta_list != "" && meta_list != FLAGS_meta_server_bns) { + DB_WARNING("meta list %s change to:%s", FLAGS_meta_server_bns.c_str(), meta_list.c_str()); + if ( _meta_server_interact.reset_bns_channel(meta_list) == 0) { + FLAGS_meta_server_bns = meta_list; + } + } + } } void Store::vector_compact_thread() {