Skip to content

Commit

Permalink
功能完善 (#237)
Browse files Browse the repository at this point in the history
* fix:  show full columns

* feat: show tables, show columns

* fix: binlog update checkpoint bug

* fix: sort_limit_by_range

* feat: show create table compatible with mysql

* fix: show tables information_schema库转小写

* fix: cstore + ttl表,update字段默认值时不生效的bug

* feat: slow query log add sign, server_addr and conn_id

* fix: show cmd make simple result bug

* fix: exec node delete after parent clear children

* feat: auto update meta list

* feat: add sql exec timeout && client count bvar

* fix: enable profilers

* fix: string type return field length 255

* fix: not in (null)

* feat: support delete/update stmt with OrderByOption and LimitCause

* fix: show processlist segment fault

* fix: for code reveiw

* fix: exec node delete after parent clear children

* fix: for code review

---------

Co-authored-by: yuzhong.chen <[email protected]>
  • Loading branch information
wy1433 and cyz-2023 authored Apr 17, 2024
1 parent 8374248 commit 6d563e5
Show file tree
Hide file tree
Showing 32 changed files with 904 additions and 391 deletions.
14 changes: 2 additions & 12 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ COPTS = [
"-Wno-parentheses",
"-Wno-deprecated-declarations",
"-DBAIKAL_TCMALLOC",
"-DBRPC_ENABLE_CPU_PROFILER",
"-UNDEBUG",
]

Expand Down Expand Up @@ -585,10 +586,6 @@ cc_binary(
"include/engine",
"include/common",
],
copts = [
"-DBAIDU_RPC_ENABLE_CPU_PROFILER",
"-DBAIDU_RPC_ENABLE_HEAP_PROFILER",
],
linkopts = LINKOPTS,
deps = [
":meta_server",
Expand All @@ -609,10 +606,6 @@ cc_binary(
includes = [
"include/store",
],
copts = [
"-DBAIDU_RPC_ENABLE_CPU_PROFILER",
"-DBAIDU_RPC_ENABLE_HEAP_PROFILER",
],
linkopts = LINKOPTS,
deps = [
":store",
Expand Down Expand Up @@ -726,11 +719,8 @@ cc_library(
cc_binary(
name = "baikaldb",
srcs = ["src/protocol/main.cpp"],
copts = COPTS + [
"-DBAIDU_RPC_ENABLE_CPU_PROFILER",
"-DBAIDU_RPC_ENABLE_HEAP_PROFILER",
],
linkopts = LINKOPTS,
copts = COPTS,
deps = [
":protocol2",
":common",
Expand Down
8 changes: 6 additions & 2 deletions include/common/meta_server_interact.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ class MetaServerInteract {
return &_instance;
}


MetaServerInteract() {}
bool is_inited() {
return _is_inited;
}
int init(bool is_backup = false);
int init_internal(const std::string& meta_bns);
int reset_bns_channel(const std::string& meta_bns);
template<typename Request, typename Response>
int send_request(const std::string& service_name,
const Request& request,
Expand Down Expand Up @@ -98,7 +100,8 @@ class MetaServerInteract {
}
short_channel.CallMethod(method, &cntl, &request, &response, NULL);
} else {
_bns_channel.CallMethod(method, &cntl, &request, &response, NULL);
std::unique_lock<std::mutex> lck(_bns_channel_mutex);
_bns_channel->CallMethod(method, &cntl, &request, &response, NULL);
if (!cntl.Failed() && response.errcode() == pb::SUCCESS) {
_set_leader_address(cntl.remote_side());
DB_WARNING("connet with meta server success by bns name, leader:%s",
Expand Down Expand Up @@ -149,11 +152,12 @@ class MetaServerInteract {
_master_leader_address = addr;
}
private:
brpc::Channel _bns_channel;
brpc::Channel *_bns_channel = nullptr;
int32_t _request_timeout = 30000;
int32_t _connect_timeout = 5000;
bool _is_inited = false;
std::mutex _master_leader_mutex;
std::mutex _bns_channel_mutex;
butil::EndPoint _master_leader_address;
};
}//namespace
Expand Down
12 changes: 7 additions & 5 deletions include/common/type_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,21 +417,23 @@ inline std::string to_mysql_type_full_string(pb::PrimitiveType type) {
case pb::DOUBLE:
return "double";
case pb::STRING:
return "text";
return "varchar(1024)";
case pb::DATETIME:
return "datetime(6)";
return "datetime";
case pb::DATE:
return "date";
case pb::TIME:
return "time";
case pb::TIMESTAMP:
return "timestamp(0)";
return "timestamp";
case pb::HLL:
return "HLL";
case pb::BITMAP:
return "BITMAP";
case pb::TDIGEST:
return "binary";
return "TDIGEST";
default:
return "text";
return "";
}
}

Expand Down
1 change: 1 addition & 0 deletions include/logical_plan/delete_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DeletePlanner : public LogicalPlanner {
private:
// method to create plan node
int create_delete_node(pb::PlanNode* delete_node);
int create_limit_node();

// method to create plan node
int create_truncate_node();
Expand Down
3 changes: 2 additions & 1 deletion include/logical_plan/setkv_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class SetKVPlanner : public LogicalPlanner {
int set_autocommit_1();
int set_autocommit(parser::ExprNode* expr);
int set_user_variable(const std::string& key, parser::ExprNode* expr);
int set_sql_mode(parser::ExprNode* expr);

private:
parser::SetStmt* _set_stmt;
};
} //namespace baikal
} //namespace baikal
7 changes: 4 additions & 3 deletions include/logical_plan/update_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ namespace baikaldb {
class UpdatePlanner : public LogicalPlanner {
public:
UpdatePlanner(QueryContext* ctx) :
LogicalPlanner(ctx),
_limit_count(-1) {}
LogicalPlanner(ctx) {}

virtual ~UpdatePlanner() {}
virtual int plan();

private:

int create_update_node(pb::PlanNode* update_node);
int create_limit_node();

// method to parse SQL parts
int parse_kv_list();
Expand All @@ -43,9 +43,10 @@ class UpdatePlanner : public LogicalPlanner {
private:
parser::UpdateStmt* _update;
std::vector<pb::Expr> _where_filters;
int32_t _limit_count;
std::vector<pb::SlotDescriptor> _update_slots;
std::vector<pb::Expr> _update_values;
pb::Expr _limit_offset;
pb::Expr _limit_count;
};
} //namespace baikal

6 changes: 6 additions & 0 deletions include/protocol/network_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class NetworkServer {
SmartSocket create_listen_socket();
void construct_other_heart_beat_request(pb::BaikalOtherHeartBeatRequest& request);
void process_other_heart_beat_response(const pb::BaikalOtherHeartBeatResponse& response);
void update_meta_list();
void client_conn_bvars_update();

std::string state2str(SmartSocket client);

Expand Down Expand Up @@ -149,10 +151,14 @@ class NetworkServer {
Bthread _other_heartbeat_bth;
Bthread _agg_sql_bth;
Bthread _health_check_bth;
Bthread _conn_bvars_update_bth;
uint32_t _driver_thread_num;
uint64_t _instance_id = 0;
std::string _physical_room;
bvar::Adder<int64_t> _heart_beat_count;
bvar::Status<int32_t> _client_conn_count;
bvar::Status<int32_t> _client_sql_running_count;
bvar::Status<int32_t> _client_sql_running_max_latency;
// for print_agg_sql
baikal::client::Manager _manager;
baikal::client::Service* _baikaldb;
Expand Down
9 changes: 6 additions & 3 deletions include/protocol/show_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const std::string SQL_SHOW_DATABASES = "databases"; // s
const std::string SQL_SHOW_NAMESPACE = "namespace"; // show namespace
const std::string SQL_SHOW_META = "meta"; // show meta;
const std::string SQL_SHOW_TABLE_STATUS = "table"; // show table status;
const std::string SQL_SHOW_TABLES = "tables"; // show tables;
const std::string SQL_SHOW_FUNCTION_STATUS = "function"; // show function status;
const std::string SQL_SHOW_PROCEDURE_STATUS = "procedure"; // show procedure status;
const std::string SQL_SHOW_TRIGGERS = "triggers"; // show triggers;
Expand All @@ -39,7 +38,9 @@ const std::string SQL_SHOW_WARNINGS = "warnings"; // s
const std::string SQL_SHOW_PROCESSLIST = "processlist"; // show processlist;
const std::string SQL_SHOW_COST = "cost"; // show cost switch;
const std::string SQL_SHOW_FULL_TABLES = "full_tables"; // show full tables;
const std::string SQL_SHOW_TABLES = "tables"; // show tables;
const std::string SQL_SHOW_FULL_COLUMNS = "full_columns"; // show full columns;
const std::string SQL_SHOW_COLUMNS = "columns"; // show columns;
const std::string SQL_SHOW_SCHEMA_CONF = "schema_conf"; // show schema_conf database_table;
const std::string SQL_SHOW_VIRTUAL_INDEX = "virtual"; // show virtual index;
const std::string SQL_SHOW_SESSION_VARIABLES = "show session variables";
Expand Down Expand Up @@ -102,8 +103,6 @@ class ShowHelper {
bool _show_abnormal_regions(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show databases;
bool _show_databases(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show tables;
bool _show_tables(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show create table tableName;
bool _show_create_table(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show collation
Expand All @@ -118,6 +117,8 @@ class ShowHelper {
bool _show_cost(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show full tables;
bool _show_full_tables(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show tables;
bool _show_tables(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show function status;
bool _show_function_status(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show procedure status;
Expand All @@ -128,6 +129,8 @@ class ShowHelper {
bool _show_events(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show full columns from tableName;
bool _show_full_columns(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show columns from tableName;
bool _show_columns(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show schema_conf database_table;
bool _show_schema_conf(const SmartSocket& client, const std::vector<std::string>& split_vec);
// sql: show all_tables ttl/binlog;
Expand Down
12 changes: 12 additions & 0 deletions include/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ class RuntimeState {
int64_t get_cost_time() {
return time_cost.get_time();
}
bool is_timeout() {
return _sql_exec_timeout > 0 && time_cost.get_time() > _sql_exec_timeout * 1000L;
}

bool is_ddl_work() {
return _is_ddl_work;
}
void set_is_ddl_work(bool is_ddl_work) {
_is_ddl_work = is_ddl_work;
}

public:
uint64_t txn_id = 0;
Expand Down Expand Up @@ -446,6 +456,8 @@ class RuntimeState {
int keypoint_range = 100 * 10000;
int partition_threshold = 10000;
int range_count_limit = 0;
int64_t _sql_exec_timeout = -1;
bool _is_ddl_work = false;
private:
bool _is_inited = false;
bool _is_cancelled = false;
Expand Down
1 change: 1 addition & 0 deletions include/store/region.h
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,7 @@ friend class Backup;
std::string _rocksdb_end;
pb::PeerStatus _region_status = pb::STATUS_NORMAL;
BinlogAlarm _binlog_alarm;
TimeCost _binlog_update_ck_tc;

//learner
std::unique_ptr<braft::Learner> _learner;
Expand Down
2 changes: 2 additions & 0 deletions include/store/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class Store : public pb::StoreService {
//上报心跳
void heart_beat_thread();

void update_meta_list();

void send_heart_beat();

void start_db_statistics();
Expand Down
2 changes: 2 additions & 0 deletions proto/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ enum RaftControlOp {
ShutDown = 5; //leader and follower both can do, 只关闭,但并没有从raft group里去掉
Vote = 6; //调用node的vote接口
ResetVoteTime = 7; //调用node的reset_election_timeout_ms接口
GetPeerList = 8; //获取当前peer list, 只能发leader
};

// operation request/response
Expand All @@ -31,4 +32,5 @@ message RaftControlResponse {
required ErrCode errcode = 2;
optional string leader = 3;
optional string errmsg = 4;
repeated string peers = 5;
};
1 change: 1 addition & 0 deletions proto/store.interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ message StoreReq {
optional uint64 sql_sign = 28; // sql 签名
repeated RegionInfo multi_new_region_infos = 29;
optional ExtraReq extra_req = 30; // 非关键路径上的额外信息可以放在这里,避免该message过度膨胀
optional int64 sql_exec_timeout = 31;
};

message RowValue {
Expand Down
6 changes: 5 additions & 1 deletion src/common/information_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,11 @@ void InformationSchema::init_columns() {
record->set_string(record->get_field_by_name("COLUMN_NAME"), field.short_name);
record->set_int64(record->get_field_by_name("ORDINAL_POSITION"), ++i);
if (field.default_expr_value.type != pb::NULL_TYPE) {
record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), field.default_value);
if (field.default_value == "(current_timestamp())") {
record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), "CURRENT_TIMESTAMP");
} else {
record->set_string(record->get_field_by_name("COLUMN_DEFAULT"), field.default_value);
}
}
record->set_string(record->get_field_by_name("IS_NULLABLE"), field.can_null ? "YES" : "NO");
record->set_string(record->get_field_by_name("DATA_TYPE"), to_mysql_type_string(field.type));
Expand Down
31 changes: 30 additions & 1 deletion src/common/meta_server_interact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ DEFINE_int32(meta_connect_timeout, 5000,
DEFINE_string(meta_server_bns, "group.opera-qa-baikalMeta-000-yz.FENGCHAO.all", "meta server bns");
DEFINE_string(backup_meta_server_bns, "", "backup_meta_server_bns");
DEFINE_int64(time_between_meta_connect_error_ms, 0, "time_between_meta_connect_error_ms. default(0ms)");
DEFINE_bool(auto_update_meta_list, false, "auto_update_meta_list, default false");

int MetaServerInteract::init(bool is_backup) {
if (is_backup) {
Expand Down Expand Up @@ -51,13 +52,41 @@ int MetaServerInteract::init_internal(const std::string& meta_bns) {
} else {
meta_server_addr = std::string("list://") + meta_bns;
}
if (_bns_channel.Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) {
std::unique_lock<std::mutex> lck(_bns_channel_mutex);
if (_bns_channel == nullptr) {
_bns_channel = new brpc::Channel();
}
if (_bns_channel->Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) {
DB_FATAL("meta server bns pool init fail. bns_name:%s", meta_server_addr.c_str());
return -1;
}
_is_inited = true;
return 0;
}


int MetaServerInteract::reset_bns_channel(const std::string& meta_bns) {
brpc::ChannelOptions channel_opt;
channel_opt.timeout_ms = FLAGS_meta_request_timeout;
channel_opt.connect_timeout_ms = FLAGS_meta_connect_timeout;
std::string meta_server_addr = meta_bns;
//bns
if (meta_bns.find(":") == std::string::npos) {
meta_server_addr = std::string("bns://") + meta_bns;
} else {
meta_server_addr = std::string("list://") + meta_bns;
}
brpc::Channel *tmp = new brpc::Channel();
if (tmp->Init(meta_server_addr.c_str(), "rr", &channel_opt) != 0) {
delete tmp;
DB_FATAL("meta server bns pool init fail. bns_name:%s", meta_server_addr.c_str());
return -1;
}
std::unique_lock<std::mutex> lck(_bns_channel_mutex);
SAFE_DELETE(_bns_channel);
_bns_channel = tmp;
return 0;
}
}

/* vim: set ts=4 sw=4 sts=4 tw=100 */
9 changes: 5 additions & 4 deletions src/exec/delete_manager_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace baikaldb {
int DeleteManagerNode::open(RuntimeState* state) {
ExecNode* child_node = _children[0];
//如果没有全局二级索引,直接走逻辑
if (child_node->node_type() != pb::SELECT_MANAGER_NODE) {
if ((child_node->node_type() != pb::SELECT_MANAGER_NODE) &&
(child_node->node_type() != pb::LIMIT_NODE)) {
int ret = DmlManagerNode::open(state);
if (ret >= 0) {
if (process_binlog(state, true) < 0) {
Expand Down Expand Up @@ -62,8 +63,8 @@ int DeleteManagerNode::init_delete_info(const pb::UpdateNode& update_node) {
}

int DeleteManagerNode::open_global_delete(RuntimeState* state) {
ExecNode* select_manager_node = _children[_execute_child_idx++];
auto ret = select_manager_node->open(state);
ExecNode* select_manager_or_limit_node = _children[_execute_child_idx++];
auto ret = select_manager_or_limit_node->open(state);
if (ret < 0) {
DB_WARNING("select manager node fail");
return ret;
Expand All @@ -74,7 +75,7 @@ int DeleteManagerNode::open_global_delete(RuntimeState* state) {
std::vector<SmartRecord> scan_records;
do {
RowBatch batch;
ret = select_manager_node->get_next(state, &batch, &eos);
ret = select_manager_or_limit_node->get_next(state, &batch, &eos);
if (ret < 0) {
DB_WARNING("children:get_next fail:%d", ret);
return ret;
Expand Down
Loading

0 comments on commit 6d563e5

Please sign in to comment.