Skip to content

Commit

Permalink
bdtbd#33 : high concurrency put
Browse files Browse the repository at this point in the history
  • Loading branch information
caijieming committed Oct 12, 2015
1 parent 502ba4d commit 1685f9b
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 8 deletions.
2 changes: 1 addition & 1 deletion src/sdk/mdt_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ DEFINE_string(database_root_dir, "/disk/tera/DatabaseDir", "database's data file
// write ops param
DEFINE_int64(concurrent_write_handle_num, 10, "num of fs writer");
DEFINE_int64(max_write_handle_seq, 10, "max num of req can schedule to current write_handle");
DEFINE_int64(data_size_per_sync, 262144, "num of data per Sync()");
DEFINE_int64(data_size_per_sync, 0, "num of data per Sync()");
7 changes: 0 additions & 7 deletions src/sdk/table_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,14 @@ int TableImpl::Put(const StoreRequest* req, StoreResponse* resp,
context.callback_param_ = callback_param;
context.sync_ = true;
context.done_ = false;
VLOG(30) << "ctx " << (uint64_t)(&context) << ", req " << (uint64_t)req;

// select write handle
WriteHandle* write_handle = GetWriteHandle();

// wait and lock
//MutexLock l(&write_mutex_);
write_mutex_.Lock();
write_handle->write_queue_.push_back(&context);
while (!context.done_ && (&context != write_handle->write_queue_.front())) {
VLOG(30) << "===== waitlock put";
context.cv_.Wait();
}
if (context.done_) {
Expand All @@ -220,7 +217,6 @@ int TableImpl::Put(const StoreRequest* req, StoreResponse* resp,
WriteBatch wb;
std::deque<WriteContext*>::iterator iter = write_handle->write_queue_.begin();
for (; iter != write_handle->write_queue_.end(); ++iter) {
VLOG(30) << "ctx " << (uint64_t)((WriteContext*)*iter);
wb.Append(*iter);
}

Expand All @@ -229,7 +225,6 @@ int TableImpl::Put(const StoreRequest* req, StoreResponse* resp,
VLOG(30) << ">>>>> lock put, ctx " << (uint64_t)(&context);

// batch write file system
VLOG(10) << "write file";
FileLocation location;
DataWriter* writer = GetDataWriter(write_handle);
writer->AddRecord(wb.rep_, &location);
Expand All @@ -238,7 +233,6 @@ int TableImpl::Put(const StoreRequest* req, StoreResponse* resp,
std::vector<WriteContext*>::iterator it = wb.context_list_.begin();
for (; it != wb.context_list_.end(); ++it) {
WriteContext* ctx = *it;
VLOG(30) << "write idx, ctx " << (uint64_t)ctx;
FileLocation data_location = location;
data_location.size_ = ctx->req_->data.size();
data_location.offset_ += ctx->offset_;
Expand All @@ -252,7 +246,6 @@ int TableImpl::Put(const StoreRequest* req, StoreResponse* resp,
WriteContext* last_writer = wb.context_list_.back();
while (true) {
WriteContext* ready = write_handle->write_queue_.front();
VLOG(30) << "finish ctx " << (uint64_t)ready;
write_handle->write_queue_.pop_front();
if (ready != &context) {
ready->done_ = true;
Expand Down

0 comments on commit 1685f9b

Please sign in to comment.