Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tera scan number_limit feature #5

Open
wants to merge 3 commits into
base: kepler
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/sdk/mdt_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ DEFINE_int64(request_queue_flush_internal, 10, "the number of ms wait before flu
DEFINE_int64(max_timestamp_table_num, 10, "num of timestamp index table");

// read ops param
DEFINE_int64(read_file_thread_num, 100, "num of read file threads");
DEFINE_int64(read_file_thread_num, 50, "num of read file threads");
DEFINE_int64(cleaner_thread_num, 5, "num of cleaner threads");
// read row with timestamp
DEFINE_bool(enable_multi_version_read, false, "enable search specify time version primary key row");
DEFINE_bool(read_by_index_filter, true, "read by index filter instead of index merger");
DEFINE_bool(enable_scan_control, false, "if true, batch scan in limit number");
DEFINE_int64(batch_scan_buffer_size, 1048576, "scan buffer size");
DEFINE_bool(enable_qu_range, true, "use qu to filter");
DEFINE_int64(tera_scan_pack_interval, 50000000, "scan timeout in one round");
DEFINE_bool(enable_number_limit, true, "use scan number limit");
DEFINE_int64(scan_number_limit, 30, "number of scan limit");

DEFINE_bool(enable_async_read, true, "use multithread for async filesystem read");
DEFINE_int64(async_read_thread_num, 50, "num of async read threads");

118 changes: 104 additions & 14 deletions src/sdk/table_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ DECLARE_int64(write_batch_queue_size);
DECLARE_int64(request_queue_flush_internal);
DECLARE_int64(max_timestamp_table_num);
DECLARE_int64(read_file_thread_num);
DECLARE_int64(cleaner_thread_num);
DECLARE_bool(enable_multi_version_read);
DECLARE_bool(read_by_index_filter);
DECLARE_bool(enable_scan_control);
DECLARE_int64(batch_scan_buffer_size);
DECLARE_bool(enable_qu_range);
DECLARE_int64(tera_scan_pack_interval);
DECLARE_bool(enable_number_limit);
DECLARE_int64(scan_number_limit);
DECLARE_bool(enable_async_read);
DECLARE_int64(async_read_thread_num);

namespace mdt {

Expand Down Expand Up @@ -144,10 +149,13 @@ TableImpl::TableImpl(const TableDescription& table_desc,
tera_(tera_adapter),
fs_(fs_adapter),
thread_pool_(FLAGS_read_file_thread_num),
cleaner_thread_(FLAGS_cleaner_thread_num),
async_read_thread_(FLAGS_async_read_thread_num),
queue_timer_stop_(false),
queue_timer_cv_(&queue_timer_mu_) {
// create timer
pthread_create(&timer_tid_, NULL, &TableImpl::TimerThreadWrapper, this);

}

TableImpl::~TableImpl() {
Expand All @@ -157,6 +165,9 @@ TableImpl::~TableImpl() {
thread_pool_.Stop(false);
FreeTeraTable();

cleaner_thread_.Stop(false);
async_read_thread_.Stop(false);

// stop timer, flush request
queue_timer_mu_.Lock();
queue_timer_stop_ = true;
Expand All @@ -174,6 +185,11 @@ TableImpl::~TableImpl() {
return;
}

void TableImpl::CleanerThread(tera::ResultStream* stream) {
delete stream;
VLOG(30) << "do something to clean up";
}

///////// batch write /////////////
struct DefaultBatchWriteCallbackParam {
CondVar* cond_;
Expand Down Expand Up @@ -953,6 +969,9 @@ Status TableImpl::GetByExtendIndex(const std::vector<IndexConditionExtend>& inde
std::string end_qu(ebuf, sizeof(ebuf));
scan_desc->AddQualifierRange(kIndexTableColumnFamily, start_qu, end_qu);
}
if (FLAGS_enable_number_limit) {
scan_desc->SetNumberLimit(FLAGS_scan_number_limit);
}
scan_desc->SetBufferSize(FLAGS_batch_scan_buffer_size);
scan_desc->SetPackInterval(FLAGS_tera_scan_pack_interval);

Expand Down Expand Up @@ -1126,7 +1145,9 @@ void TableImpl::GetByFilterIndex(tera::Table* index_table,

stream->Next();
}
delete stream;
// batch scan must wait other rpc callback
//delete stream;
cleaner_thread_.AddTask(boost::bind(&TableImpl::CleanerThread, this, stream));

// stop other index table scan streams
{
Expand Down Expand Up @@ -1403,6 +1424,7 @@ bool BreakOrPushData(ReadPrimaryTableContext* param, ResultStream* result, Statu
data, primary_key);
} else {
// enqueue data in lock
MutexLock l(param->mutex);
if (s.ok() && (data.size() > 0)) {
result->result_data_list.push_back(data);
}
Expand All @@ -1421,6 +1443,14 @@ void TableImpl::ReadPrimaryTableCallback(tera::RowReader* reader) {
param->table->thread_pool_.AddTask(boost::bind(&TableImpl::ReadData, param->table, reader));
}

struct AsyncReadParam {
ReadPrimaryTableContext* param;
FileLocation* location;
Mutex* lock;
CondVar* cond;
int* nr_record;
Counter* nr_reader;
};
void TableImpl::ReadData(tera::RowReader* reader) {
ReadPrimaryTableContext* param = (ReadPrimaryTableContext*)reader->GetContext();
const std::string& primary_key = reader->RowName();
Expand Down Expand Up @@ -1450,19 +1480,46 @@ void TableImpl::ReadData(tera::RowReader* reader) {
VLOG(12) << "test indexes of primary key: " << primary_key;
if (!should_break && (TestIndexCondition(param->index_cond_list, indexes))) {
VLOG(12) << "read data of primary key: " << primary_key;
for (size_t i = 0; i < locations.size(); ++i) {
FileLocation& location = locations[i];
VLOG(12) << "begin to read data from " << location;
std::string data;
Status s = param->table->ReadDataFromFile(location, &data);
if (s.ok()) {
VLOG(12) << "finish read data from " << location;
// check break
should_break = BreakOrPushData(param, param->result, Status::OK(), data, "");
if (should_break) { break; }
nr_record++;
} else {
LOG(WARNING) << "fail to read data from " << location << " error: " << s.ToString();
// sync filesystem read
if (!FLAGS_enable_async_read) {
for (size_t i = 0; i < locations.size(); ++i) {
FileLocation& location = locations[i];
VLOG(12) << "begin to read data from " << location;
std::string data;
Status s = param->table->ReadDataFromFile(location, &data);
if (s.ok()) {
VLOG(12) << "finish read data from " << location;
// check break
should_break = BreakOrPushData(param, param->result, Status::OK(), data, "");
if (should_break) { break; }
nr_record++;
} else {
LOG(WARNING) << "fail to read data from " << location << " error: " << s.ToString();
}
}
} else {
// use async filesystem read
Mutex lock;
CondVar cond(&lock);
Counter nr_reader;
nr_reader.Set((uint64_t)(locations.size()));

for (size_t i = 0; i < locations.size(); ++i) {
AsyncReadParam* async_read_param = new AsyncReadParam;
async_read_param->param = param;
async_read_param->lock = &lock;
async_read_param->cond = &cond;
async_read_param->nr_record = &nr_record;
async_read_param->nr_reader = &nr_reader;
async_read_param->location = &(locations[i]);
async_read_thread_.AddTask(boost::bind(&TableImpl::AsyncRead, this, async_read_param));
}
{
MutexLock l(&lock);
VLOG(30) << "async read, wait file read finish";
while (nr_reader.Get()) {
cond.Wait();
}
}
}
}
Expand All @@ -1486,6 +1543,39 @@ void TableImpl::ReadData(tera::RowReader* reader) {
ReleaseReadPrimaryTableContext(param, param->result, s);
}

void TableImpl::AsyncRead(void* read_param) {
AsyncReadParam* async_read_param = (AsyncReadParam*)read_param;
ReadPrimaryTableContext* param = async_read_param->param;
FileLocation* location = async_read_param->location;
Mutex* lock = async_read_param->lock;
CondVar* cond = async_read_param->cond;
int* nr_record = async_read_param->nr_record;
Counter* nr_reader = async_read_param->nr_reader;
delete async_read_param;

// read from file system
VLOG(12) << "begin to async read data from " << *location;
std::string data;
Status s = param->table->ReadDataFromFile(*location, &data);
if (s.ok()) {
VLOG(12) << "finish async read data from " << location;
// check break
bool should_break = BreakOrPushData(param, param->result, Status::OK(), data, "");
if (!should_break) {
MutexLock l(lock);
(*nr_record)++;
}
} else {
LOG(WARNING) << "fail to async read data from " << location << " error: " << s.ToString();
}
{
MutexLock l(lock);
if (nr_reader->Dec() == 0) {
cond->Signal();
}
}
}

void TableImpl::ParseIndexesFromString(const std::string& index_buffer,
std::multimap<std::string, std::string>* indexes) {
const char* buf = index_buffer.data();
Expand Down
9 changes: 9 additions & 0 deletions src/sdk/table_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class TableImpl : public Table {
private:
void FreeTeraTable();
Status Init();
void CleanerThread(tera::ResultStream* stream);

// filesystem async reader
void AsyncRead(void* async_read_param);

// write op
int InternalBatchWrite(WriteContext* context, std::vector<WriteContext*>& ctx_queue);
static void* TimerThreadWrapper(void* arg);
Expand Down Expand Up @@ -262,11 +267,15 @@ class TableImpl : public Table {
TeraAdapter tera_;
FilesystemAdapter fs_;
ThreadPool thread_pool_;
// async cleaner
ThreadPool cleaner_thread_;
ThreadPool async_read_thread_;

// file handle cache relative
mutable Mutex file_mutex_;
std::map<std::string, RandomAccessFile*> file_map_;


// use for put
mutable Mutex write_mutex_;
std::vector<WriteHandle> write_handle_list_;
Expand Down