Skip to content

Commit

Permalink
User specified split key #1227 (#1228)
Browse files Browse the repository at this point in the history
* issue=#1227 User specified split key #1227
  • Loading branch information
lylei authored and ajie committed May 9, 2017
1 parent 4d3fa57 commit b65f535
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 58 deletions.
45 changes: 23 additions & 22 deletions src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,29 +460,30 @@ bool TabletIO::Split(std::string* split_key, StatusCode* status) {
db_ref_count_++;
}

std::string raw_split_key;
split_key->clear();
if (db_->FindSplitKey(0.5, &raw_split_key)) {
ParseRowKey(raw_split_key, split_key);
}
if (split_key->empty()) {
std::string raw_split_key;
if (db_->FindSplitKey(0.5, &raw_split_key)) {
ParseRowKey(raw_split_key, split_key);
}

if (split_key->empty() || *split_key == end_key_) {
// could not find split_key, try calc average key
std::string smallest_key, largest_key;
CHECK(db_->FindKeyRange(&smallest_key, &largest_key));
if (split_key->empty() || *split_key == end_key_) {
// could not find split_key, try calc average key
std::string smallest_key, largest_key;
CHECK(db_->FindKeyRange(&smallest_key, &largest_key));

std::string srow_key, lrow_key;
if (!smallest_key.empty()) {
ParseRowKey(smallest_key, &srow_key);
} else {
srow_key = start_key_;
}
if (!largest_key.empty()) {
ParseRowKey(largest_key, &lrow_key);
} else {
lrow_key = end_key_;
std::string srow_key, lrow_key;
if (!smallest_key.empty()) {
ParseRowKey(smallest_key, &srow_key);
} else {
srow_key = start_key_;
}
if (!largest_key.empty()) {
ParseRowKey(largest_key, &lrow_key);
} else {
lrow_key = end_key_;
}
FindAverageKey(srow_key, lrow_key, split_key);
}
FindAverageKey(srow_key, lrow_key, split_key);
}

VLOG(5) << "start: [" << DebugString(start_key_)
Expand All @@ -492,8 +493,8 @@ bool TabletIO::Split(std::string* split_key, StatusCode* status) {
MutexLock lock(&mutex_);
db_ref_count_--;
if (*split_key != ""
&& *split_key != start_key_
&& *split_key != end_key_) {
&& *split_key > start_key_
&& (end_key_ == "" || *split_key < end_key_)) {
status_ = kSplited;
return true;
} else {
Expand Down
31 changes: 17 additions & 14 deletions src/master/master_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1495,9 +1495,9 @@ void MasterImpl::TabletCmdCtrl(const CmdCtrlRequest* request,
std::string split_key;
if (request->arg_list_size() == 3) {
split_key = request->arg_list(2);
LOG(INFO) << "ignore user specified split key: not support";
LOG(INFO) << "User specified split key: " << split_key;
}
TrySplitTablet(tablet);
TrySplitTablet(tablet, split_key);
response->set_status(kMasterOk);
} else if (request->arg_list(0) == "merge") {
if (request->arg_list_size() > 3) {
Expand Down Expand Up @@ -2704,15 +2704,16 @@ void MasterImpl::UnloadTabletCallback(TabletPtr tablet, int32_t retry,
TabletNodePtr node;
if (tabletnode_manager_->FindTabletNode(server_addr, &node)
&& node->uuid_ == tablet->GetServerId()) {
node->FinishSplit(tablet);
node->FinishSplit();
TabletPtr next_tablet;
while (node->SplitNextWaitTablet(&next_tablet)) {
std::string split_key;
while (node->SplitNextWaitTablet(&next_tablet, &split_key)) {
if (next_tablet->SetStatusIf(kTableOnSplit, kTableReady)) {
next_tablet->SetServerId(node->uuid_);
SplitTabletAsync(next_tablet);
SplitTabletAsync(next_tablet, split_key);
break;
}
node->FinishSplit(next_tablet);
node->FinishSplit();
}
}
}
Expand Down Expand Up @@ -3647,7 +3648,7 @@ void MasterImpl::RetryCollectTabletInfo(std::string addr,
QueryTabletNodeAsync(addr, FLAGS_tera_master_collect_info_timeout, false, done);
}

void MasterImpl::SplitTabletAsync(TabletPtr tablet) {
void MasterImpl::SplitTabletAsync(TabletPtr tablet, const std::string& split_key) {
const std::string& table_name = tablet->GetTableName();
const std::string& server_addr = tablet->GetServerAddr();
const std::string& key_start = tablet->GetKeyStart();
Expand All @@ -3664,6 +3665,7 @@ void MasterImpl::SplitTabletAsync(TabletPtr tablet) {
request->mutable_key_range()->set_key_end(key_end);
request->add_child_tablets(tablet->GetTable()->GetNextTabletNo());
request->add_child_tablets(tablet->GetTable()->GetNextTabletNo());
request->set_split_key(split_key);

tablet->ToMeta(request->mutable_tablet_meta());
std::vector<uint64_t> snapshots;
Expand Down Expand Up @@ -3724,17 +3726,18 @@ void MasterImpl::SplitTabletCallback(TabletPtr tablet,
TabletNodePtr node;
if (tabletnode_manager_->FindTabletNode(server_addr, &node)
&& node->uuid_ == tablet->GetServerId()) {
node->FinishSplit(tablet);
node->FinishSplit();

// schedule next split task
TabletPtr next_tablet;
while (node->SplitNextWaitTablet(&next_tablet)) {
std::string split_key;
while (node->SplitNextWaitTablet(&next_tablet, &split_key)) {
if (next_tablet->SetStatusIf(kTableOnSplit, kTableReady)) {
next_tablet->SetServerId(node->uuid_);
SplitTabletAsync(next_tablet);
SplitTabletAsync(next_tablet, split_key);
break;
}
node->FinishSplit(next_tablet);
node->FinishSplit();
}
} else { // server down or restart
if (tablet->SetStatusIf(kTableOffLine, kTableReady)) {
Expand Down Expand Up @@ -3940,7 +3943,7 @@ void MasterImpl::RetryUnloadTablet(TabletPtr tablet, int32_t retry_times) {
UnloadTabletAsync(tablet, done);
}

bool MasterImpl::TrySplitTablet(TabletPtr tablet) {
bool MasterImpl::TrySplitTablet(TabletPtr tablet, const std::string& split_key) {
const std::string& server_addr = tablet->GetServerAddr();

// abort if server down
Expand All @@ -3951,7 +3954,7 @@ bool MasterImpl::TrySplitTablet(TabletPtr tablet) {
}

// delay split
if (!node->TrySplit(tablet)) {
if (!node->TrySplit(tablet, split_key)) {
LOG(INFO) << "delay split table " << tablet->GetPath()
<< ", too many tablets are splitting on server: " << server_addr;
return false;
Expand All @@ -3965,7 +3968,7 @@ bool MasterImpl::TrySplitTablet(TabletPtr tablet) {
// if server down here, let split callback take care of status switch
LOG(INFO) << "begin split table " << tablet->GetPath();
tablet->SetServerId(node->uuid_);
SplitTabletAsync(tablet);
SplitTabletAsync(tablet, split_key);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/master/master_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class MasterImpl {

void RetryLoadTablet(TabletPtr tablet, int32_t retry_times);
void RetryUnloadTablet(TabletPtr tablet, int32_t retry_times);
bool TrySplitTablet(TabletPtr tablet);
bool TrySplitTablet(TabletPtr tablet, const std::string& split_key = "");
bool TryMergeTablet(TabletPtr tablet);
void TryMoveTablet(TabletPtr tablet, const std::string& server_addr = "", bool in_place = false);

Expand Down Expand Up @@ -359,7 +359,7 @@ class MasterImpl {
std::vector<TabletMeta>* tablet_list,
sem_t* finish_counter, Mutex* mutex);

void SplitTabletAsync(TabletPtr tablet);
void SplitTabletAsync(TabletPtr tablet, const std::string& split_key = "");
void SplitTabletCallback(TabletPtr tablet, SplitTabletRequest* request,
SplitTabletResponse* response, bool failed,
int error_code);
Expand Down
23 changes: 15 additions & 8 deletions src/master/tabletnode_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ bool TabletNode::LoadNextWaitTablet(TabletPtr* tablet) {
return true;
}

bool TabletNode::TrySplit(TabletPtr tablet) {
bool TabletNode::TrySplit(TabletPtr tablet, const std::string& split_key) {
MutexLock lock(&mutex_);
data_size_ -= tablet->GetDataSize();
// VLOG(5) << "split on: " << addr_ << ", size: " << tablet->GetDataSize()
Expand All @@ -226,29 +226,36 @@ bool TabletNode::TrySplit(TabletPtr tablet) {
++onsplit_count_;
return true;
}
if (std::find(wait_split_list_.begin(), wait_split_list_.end(), tablet) ==
wait_split_list_.end()) {
wait_split_list_.push_back(tablet);
std::list<std::pair<TabletPtr, std::string> >::iterator it;
for (it = wait_split_list_.begin(); it != wait_split_list_.end(); ++it) {
if (it->first == tablet) {
return false;
}
}
if (it == wait_split_list_.end()) {
wait_split_list_.push_back(std::make_pair(tablet, split_key));
}

return false;
}

bool TabletNode::FinishSplit(TabletPtr tablet) {
bool TabletNode::FinishSplit() {
MutexLock lock(&mutex_);
--onsplit_count_;
return true;
}

bool TabletNode::SplitNextWaitTablet(TabletPtr* tablet) {
bool TabletNode::SplitNextWaitTablet(TabletPtr* tablet, std::string* split_key) {
MutexLock lock(&mutex_);
if (onsplit_count_ >= static_cast<uint32_t>(FLAGS_tera_master_max_split_concurrency)) {
return false;
}
std::list<TabletPtr>::iterator it = wait_split_list_.begin();
std::list<std::pair<TabletPtr, std::string> >::iterator it = wait_split_list_.begin();
if (it == wait_split_list_.end()) {
return false;
}
*tablet = *it;
*tablet = it->first;
*split_key = it->second;
wait_split_list_.pop_front();
++onsplit_count_;
return true;
Expand Down
8 changes: 4 additions & 4 deletions src/master/tabletnode_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct TabletNode {
uint32_t onsplit_count_;
uint32_t plan_move_in_count_;
std::list<TabletPtr> wait_load_list_;
std::list<TabletPtr> wait_split_list_;
std::list<std::pair<TabletPtr, std::string> > wait_split_list_; // (tablet, split_key)

// The start time of recent load operation.
// Used to tell if node load too many tablets within short time.
Expand Down Expand Up @@ -100,9 +100,9 @@ struct TabletNode {
bool FinishLoad(TabletPtr tablet);
bool LoadNextWaitTablet(TabletPtr* tablet);

bool TrySplit(TabletPtr tablet);
bool FinishSplit(TabletPtr tablet);
bool SplitNextWaitTablet(TabletPtr* tablet);
bool TrySplit(TabletPtr tablet, const std::string& split_key = "");
bool FinishSplit();
bool SplitNextWaitTablet(TabletPtr* tablet, std::string* split_key);

NodeState GetState();
bool SetState(NodeState new_state, NodeState* old_state);
Expand Down
13 changes: 7 additions & 6 deletions src/proto/tabletnode_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ message QueryResponse {
}

enum UpdateType {
kUpdateSchema = 0;
kUpdateSchema = 0;
}

message UpdateRequest {
optional uint64 sequence_id = 1;
optional UpdateType type = 2;
optional TableSchema schema = 3;
optional string tablet_name = 4;
optional KeyRange key_range = 5;
optional KeyRange key_range = 5;
}

message UpdateResponse {
Expand All @@ -81,7 +81,7 @@ message UpdateResponse {
message LoadTabletRequest {
required uint64 sequence_id = 1;
required string tablet_name = 2;
optional KeyRange key_range = 3;
optional KeyRange key_range = 3;
optional string path = 4;
optional string session_id = 6;
optional TableSchema schema = 8;
Expand Down Expand Up @@ -163,7 +163,7 @@ message RowMutationSequence {

message WriteTabletRequest {
optional uint64 sequence_id = 1;
required string tablet_name = 2;
required string tablet_name = 2;
//repeated KeyValuePair pair_list = 3; // for compatible
optional bool is_sync = 4 [default = false];
optional bool is_instant = 5 [default = false];
Expand Down Expand Up @@ -241,7 +241,7 @@ message ResultCell {
message ScanTabletRequest {
optional uint64 sequence_id = 1;
optional string table_name = 2;
//optional KeyRange key_range = 3;
//optional KeyRange key_range = 3;
optional uint32 max_version = 4;
optional uint32 buffer_limit = 5 [default = 102400];
optional bytes start = 6;
Expand Down Expand Up @@ -283,7 +283,7 @@ message RowReaderInfo {
message ReadTabletRequest {
optional uint64 sequence_id = 1;
required string tablet_name = 2;
//repeated bytes key_list = 3;
//repeated bytes key_list = 3;
repeated RowReaderInfo row_info_list = 4;
//repeated KeyValuePair key_values = 5;
optional uint64 snapshot_id = 6;
Expand All @@ -304,6 +304,7 @@ message SplitTabletRequest {
required KeyRange key_range = 3;
optional TabletMeta tablet_meta = 4;
repeated uint64 child_tablets = 5;
optional bytes split_key = 6;
}

message SplitTabletResponse {
Expand Down
5 changes: 3 additions & 2 deletions src/tabletnode/tabletnode_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ void TabletNodeImpl::SplitTablet(const SplitTabletRequest* request,
google::protobuf::Closure* done) {
response->set_sequence_id(request->sequence_id());

std::string split_key, path;
std::string split_key = request->split_key();
std::string path;
StatusCode status = kTabletNodeOk;
io::TabletIO* tablet_io = tablet_manager_->GetTablet(request->tablet_name(),
request->key_range().key_start(),
Expand All @@ -823,7 +824,7 @@ void TabletNodeImpl::SplitTablet(const SplitTabletRequest* request,
LOG(ERROR) << "fail to split tablet: " << tablet_io->GetTablePath()
<< " [" << DebugString(tablet_io->GetStartKey())
<< ", " << DebugString(tablet_io->GetEndKey())
<< "], status: " << StatusCodeToString(status);
<< "], split_key: " << DebugString(split_key) << ". status: " << StatusCodeToString(status);
if (status == kTableNotSupport) {
response->set_status(kTableNotSupport);
} else {
Expand Down

0 comments on commit b65f535

Please sign in to comment.