From 5221d542a6144c9f945443a7f851699553ac2b77 Mon Sep 17 00:00:00 2001 From: caijieming-baidu Date: Wed, 16 Dec 2015 13:17:00 +0800 Subject: [PATCH 1/3] use tera scan number_limit feature --- src/sdk/mdt_flags.cc | 2 ++ src/sdk/table_impl.cc | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/src/sdk/mdt_flags.cc b/src/sdk/mdt_flags.cc index 3fcdd01..1e1d5a7 100644 --- a/src/sdk/mdt_flags.cc +++ b/src/sdk/mdt_flags.cc @@ -31,4 +31,6 @@ DEFINE_bool(enable_scan_control, false, "if true, batch scan in limit number"); DEFINE_int64(batch_scan_buffer_size, 1048576, "scan buffer size"); DEFINE_bool(enable_qu_range, true, "use qu to filter"); DEFINE_int64(tera_scan_pack_interval, 50000000, "scan timeout in one round"); +DEFINE_bool(enable_number_limit, true, "use scan number limit"); +DEFINE_int64(scan_number_limit, 30, "number of scan limit"); diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 10ef37b..1b593ec 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -28,6 +28,8 @@ DECLARE_bool(enable_scan_control); DECLARE_int64(batch_scan_buffer_size); DECLARE_bool(enable_qu_range); DECLARE_int64(tera_scan_pack_interval); +DECLARE_bool(enable_number_limit); +DECLARE_int64(scan_number_limit); namespace mdt { @@ -953,6 +955,9 @@ Status TableImpl::GetByExtendIndex(const std::vector& inde std::string end_qu(ebuf, sizeof(ebuf)); scan_desc->AddQualifierRange(kIndexTableColumnFamily, start_qu, end_qu); } + if (FLAGS_enable_number_limit) { + scan_desc->SetNumberLimit(FLAGS_scan_number_limit); + } scan_desc->SetBufferSize(FLAGS_batch_scan_buffer_size); scan_desc->SetPackInterval(FLAGS_tera_scan_pack_interval); From be6a52c7e4a8de0c208a82c5f3b34861411072eb Mon Sep 17 00:00:00 2001 From: caijieming Date: Wed, 16 Dec 2015 23:09:08 +0800 Subject: [PATCH 2/3] use tera scan number_limit feature, async delete scanstream --- src/sdk/mdt_flags.cc | 1 + src/sdk/table_impl.cc | 14 +++++++++++++- src/sdk/table_impl.h | 5 +++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/sdk/mdt_flags.cc b/src/sdk/mdt_flags.cc index 1e1d5a7..529f862 100644 --- a/src/sdk/mdt_flags.cc +++ b/src/sdk/mdt_flags.cc @@ -24,6 +24,7 @@ DEFINE_int64(max_timestamp_table_num, 10, "num of timestamp index table"); // read ops param DEFINE_int64(read_file_thread_num, 100, "num of read file threads"); +DEFINE_int64(cleaner_thread_num, 5, "num of cleaner threads"); // read row with timestamp DEFINE_bool(enable_multi_version_read, false, "enable search specify time version primary key row"); DEFINE_bool(read_by_index_filter, true, "read by index filter instead of index merger"); diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 1b593ec..98d52c0 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -22,6 +22,7 @@ DECLARE_int64(write_batch_queue_size); DECLARE_int64(request_queue_flush_internal); DECLARE_int64(max_timestamp_table_num); DECLARE_int64(read_file_thread_num); +DECLARE_int64(cleaner_thread_num); DECLARE_bool(enable_multi_version_read); DECLARE_bool(read_by_index_filter); DECLARE_bool(enable_scan_control); @@ -146,10 +147,12 @@ TableImpl::TableImpl(const TableDescription& table_desc, tera_(tera_adapter), fs_(fs_adapter), thread_pool_(FLAGS_read_file_thread_num), + cleaner_thread_(FLAGS_cleaner_thread_num), queue_timer_stop_(false), queue_timer_cv_(&queue_timer_mu_) { // create timer pthread_create(&timer_tid_, NULL, &TableImpl::TimerThreadWrapper, this); + } TableImpl::~TableImpl() { @@ -159,6 +162,8 @@ TableImpl::~TableImpl() { thread_pool_.Stop(false); FreeTeraTable(); + cleaner_thread_.Stop(false); + // stop timer, flush request queue_timer_mu_.Lock(); queue_timer_stop_ = true; @@ -176,6 +181,11 @@ TableImpl::~TableImpl() { return; } +void TableImpl::CleanerThread(tera::ResultStream* stream) { + delete stream; + VLOG(30) << "do something to clean up"; +} + ///////// batch write ///////////// struct DefaultBatchWriteCallbackParam { CondVar* cond_; @@ -1131,7 +1141,9 @@ void TableImpl::GetByFilterIndex(tera::Table* index_table, stream->Next(); } - delete stream; + // batch scan must wait other rpc callback + //delete stream; + cleaner_thread_.AddTask(boost::bind(&TableImpl::CleanerThread, this, stream)); // stop other index table scan streams { diff --git a/src/sdk/table_impl.h b/src/sdk/table_impl.h index eb90e57..63637da 100644 --- a/src/sdk/table_impl.h +++ b/src/sdk/table_impl.h @@ -165,6 +165,8 @@ class TableImpl : public Table { private: void FreeTeraTable(); Status Init(); + void CleanerThread(tera::ResultStream* stream); + // write op int InternalBatchWrite(WriteContext* context, std::vector& ctx_queue); static void* TimerThreadWrapper(void* arg); @@ -262,11 +264,14 @@ class TableImpl : public Table { TeraAdapter tera_; FilesystemAdapter fs_; ThreadPool thread_pool_; + // async cleaner + ThreadPool cleaner_thread_; // file handle cache relative mutable Mutex file_mutex_; std::map file_map_; + // use for put mutable Mutex write_mutex_; std::vector write_handle_list_; From dffd1b26a7c24a954e873e013eb31f13dd6cf035 Mon Sep 17 00:00:00 2001 From: caijieming Date: Thu, 17 Dec 2015 00:49:54 +0800 Subject: [PATCH 3/3] use tera scan number_limit feature, async delete scanstream, async filesystem read --- src/sdk/mdt_flags.cc | 5 ++- src/sdk/table_impl.cc | 99 +++++++++++++++++++++++++++++++++++++------ src/sdk/table_impl.h | 4 ++ 3 files changed, 94 insertions(+), 14 deletions(-) diff --git a/src/sdk/mdt_flags.cc b/src/sdk/mdt_flags.cc index 529f862..a4791a9 100644 --- a/src/sdk/mdt_flags.cc +++ b/src/sdk/mdt_flags.cc @@ -23,7 +23,7 @@ DEFINE_int64(request_queue_flush_internal, 10, "the number of ms wait before flu DEFINE_int64(max_timestamp_table_num, 10, "num of timestamp index table"); // read ops param -DEFINE_int64(read_file_thread_num, 100, "num of read file threads"); +DEFINE_int64(read_file_thread_num, 50, "num of read file threads"); DEFINE_int64(cleaner_thread_num, 5, "num of cleaner threads"); // read row with timestamp DEFINE_bool(enable_multi_version_read, false, "enable search specify time version primary key row"); @@ -35,3 +35,6 @@ DEFINE_int64(tera_scan_pack_interval, 50000000, "scan timeout in one round"); DEFINE_bool(enable_number_limit, true, "use scan number limit"); DEFINE_int64(scan_number_limit, 30, "number of scan limit"); +DEFINE_bool(enable_async_read, true, "use multithread for async filesystem read"); +DEFINE_int64(async_read_thread_num, 50, "num of async read threads"); + diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 98d52c0..861fb5c 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -31,6 +31,8 @@ DECLARE_bool(enable_qu_range); DECLARE_int64(tera_scan_pack_interval); DECLARE_bool(enable_number_limit); DECLARE_int64(scan_number_limit); +DECLARE_bool(enable_async_read); +DECLARE_int64(async_read_thread_num); namespace mdt { @@ -148,6 +150,7 @@ TableImpl::TableImpl(const TableDescription& table_desc, fs_(fs_adapter), thread_pool_(FLAGS_read_file_thread_num), cleaner_thread_(FLAGS_cleaner_thread_num), + async_read_thread_(FLAGS_async_read_thread_num), queue_timer_stop_(false), queue_timer_cv_(&queue_timer_mu_) { // create timer @@ -163,6 +166,7 @@ TableImpl::~TableImpl() { FreeTeraTable(); cleaner_thread_.Stop(false); + async_read_thread_.Stop(false); // stop timer, flush request queue_timer_mu_.Lock(); @@ -1420,6 +1424,7 @@ bool BreakOrPushData(ReadPrimaryTableContext* param, ResultStream* result, Statu data, primary_key); } else { // enqueue data in lock + MutexLock l(param->mutex); if (s.ok() && (data.size() > 0)) { result->result_data_list.push_back(data); } @@ -1438,6 +1443,14 @@ void TableImpl::ReadPrimaryTableCallback(tera::RowReader* reader) { param->table->thread_pool_.AddTask(boost::bind(&TableImpl::ReadData, param->table, reader)); } +struct AsyncReadParam { + ReadPrimaryTableContext* param; + FileLocation* location; + Mutex* lock; + CondVar* cond; + int* nr_record; + Counter* nr_reader; +}; void TableImpl::ReadData(tera::RowReader* reader) { ReadPrimaryTableContext* param = (ReadPrimaryTableContext*)reader->GetContext(); const std::string& primary_key = reader->RowName(); @@ -1467,19 +1480,46 @@ void TableImpl::ReadData(tera::RowReader* reader) { VLOG(12) << "test indexes of primary key: " << primary_key; if (!should_break && (TestIndexCondition(param->index_cond_list, indexes))) { VLOG(12) << "read data of primary key: " << primary_key; - for (size_t i = 0; i < locations.size(); ++i) { - FileLocation& location = locations[i]; - VLOG(12) << "begin to read data from " << location; - std::string data; - Status s = param->table->ReadDataFromFile(location, &data); - if (s.ok()) { - VLOG(12) << "finish read data from " << location; - // check break - should_break = BreakOrPushData(param, param->result, Status::OK(), data, ""); - if (should_break) { break; } - nr_record++; - } else { - LOG(WARNING) << "fail to read data from " << location << " error: " << s.ToString(); + // sync filesystem read + if (!FLAGS_enable_async_read) { + for (size_t i = 0; i < locations.size(); ++i) { + FileLocation& location = locations[i]; + VLOG(12) << "begin to read data from " << location; + std::string data; + Status s = param->table->ReadDataFromFile(location, &data); + if (s.ok()) { + VLOG(12) << "finish read data from " << location; + // check break + should_break = BreakOrPushData(param, param->result, Status::OK(), data, ""); + if (should_break) { break; } + nr_record++; + } else { + LOG(WARNING) << "fail to read data from " << location << " error: " << s.ToString(); + } + } + } else { + // use async filesystem read + Mutex lock; + CondVar cond(&lock); + Counter nr_reader; + nr_reader.Set((uint64_t)(locations.size())); + + for (size_t i = 0; i < locations.size(); ++i) { + AsyncReadParam* async_read_param = new AsyncReadParam; + async_read_param->param = param; + async_read_param->lock = &lock; + async_read_param->cond = &cond; + async_read_param->nr_record = &nr_record; + async_read_param->nr_reader = &nr_reader; + async_read_param->location = &(locations[i]); + async_read_thread_.AddTask(boost::bind(&TableImpl::AsyncRead, this, async_read_param)); + } + { + MutexLock l(&lock); + VLOG(30) << "async read, wait file read finish"; + while (nr_reader.Get()) { + cond.Wait(); + } } } } @@ -1503,6 +1543,39 @@ void TableImpl::ReadData(tera::RowReader* reader) { ReleaseReadPrimaryTableContext(param, param->result, s); } +void TableImpl::AsyncRead(void* read_param) { + AsyncReadParam* async_read_param = (AsyncReadParam*)read_param; + ReadPrimaryTableContext* param = async_read_param->param; + FileLocation* location = async_read_param->location; + Mutex* lock = async_read_param->lock; + CondVar* cond = async_read_param->cond; + int* nr_record = async_read_param->nr_record; + Counter* nr_reader = async_read_param->nr_reader; + delete async_read_param; + + // read from file system + VLOG(12) << "begin to async read data from " << *location; + std::string data; + Status s = param->table->ReadDataFromFile(*location, &data); + if (s.ok()) { + VLOG(12) << "finish async read data from " << location; + // check break + bool should_break = BreakOrPushData(param, param->result, Status::OK(), data, ""); + if (!should_break) { + MutexLock l(lock); + (*nr_record)++; + } + } else { + LOG(WARNING) << "fail to async read data from " << location << " error: " << s.ToString(); + } + { + MutexLock l(lock); + if (nr_reader->Dec() == 0) { + cond->Signal(); + } + } +} + void TableImpl::ParseIndexesFromString(const std::string& index_buffer, std::multimap* indexes) { const char* buf = index_buffer.data(); diff --git a/src/sdk/table_impl.h b/src/sdk/table_impl.h index 63637da..00a1d25 100644 --- a/src/sdk/table_impl.h +++ b/src/sdk/table_impl.h @@ -167,6 +167,9 @@ class TableImpl : public Table { Status Init(); void CleanerThread(tera::ResultStream* stream); + // filesystem async reader + void AsyncRead(void* async_read_param); + // write op int InternalBatchWrite(WriteContext* context, std::vector& ctx_queue); static void* TimerThreadWrapper(void* arg); @@ -266,6 +269,7 @@ class TableImpl : public Table { ThreadPool thread_pool_; // async cleaner ThreadPool cleaner_thread_; + ThreadPool async_read_thread_; // file handle cache relative mutable Mutex file_mutex_;