From 38076036fb367b43891abbd8e07720dcb080f393 Mon Sep 17 00:00:00 2001 From: caijieming Date: Wed, 16 Dec 2015 13:00:39 +0800 Subject: [PATCH 1/3] LowLevelScan analyse, step 1, add limit function --- src/io/tablet_io.cc | 20 +++++++++++++++----- src/io/tablet_io.h | 8 +++++--- src/proto/tabletnode_rpc.proto | 1 + src/sdk/scan.cc | 8 ++++++++ src/sdk/scan_impl.cc | 9 +++++++++ src/sdk/scan_impl.h | 3 +++ src/sdk/tera.h | 4 ++++ 7 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index f92282c9e..c4a416d48 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -648,6 +648,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, std::list row_buf; std::string last_key, last_col, last_qual; uint32_t buffer_size = 0; + int64_t number_limit = 0; uint32_t version_num = 1; uint64_t nr_scan_round = 0; value_list->clear_key_values(); @@ -804,7 +805,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, if (key.compare(last_key) != 0) { *read_row_count += 1; if (has_filter) { - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size, &number_limit); row_buf.clear(); } } @@ -844,6 +845,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, if (!has_filter) { if (!FilterCell(scan_options, col.ToString(), qual.ToString(), ts)) { value_list->add_key_values()->CopyFrom(kv); + number_limit++; buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); VLOG(10) << "ll-scan Get kv " << key.ToString() << ", buffer_size " << buffer_size; } @@ -852,7 +854,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, } // check scan buffer - if (buffer_size >= scan_options.max_size) { + if (buffer_size >= scan_options.max_size || number_limit >= scan_options.number_limit) { VLOG(10) << "ll-scan, stream scan, break scan context, version_num " << version_num << ", key " << DebugString(key.ToString()) << ", col " << DebugString(col.ToString()) << ", qual " << DebugString(qual.ToString()); @@ -866,7 +868,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, // process the last row of tablet if (has_filter) { - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size, &number_limit); } leveldb::Status it_status; if (!it->Valid()) { @@ -884,7 +886,8 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, // check if scan finished SetStatusCode(kTableOk, status); - if (buffer_size < scan_options.max_size && now_time <= time_out) { + // if not timeout or buffer size or number limit overflow, then set complete flag + if ((buffer_size < scan_options.max_size && number_limit < scan_options.number_limit) && now_time <= time_out) { *is_complete = true; } else { if (now_time > time_out && next_start_point) { @@ -1526,6 +1529,11 @@ void TabletIO::SetupScanRowOptions(const ScanTabletRequest* request, if (request->has_buffer_limit()) { scan_options->max_size = request->buffer_limit(); } + if (request->has_number_limit()) { + if (request->number_limit() > 0) { + scan_options->number_limit = request->number_limit(); + } + } if (request->timeout()) { scan_options->timeout = request->timeout(); } @@ -1760,7 +1768,8 @@ static bool CheckCell(const KeyValuePair& kv, const Filter& filter) { void TabletIO::ProcessRowBuffer(std::list& row_buf, const ScanOptions& scan_options, RowResult* value_list, - uint32_t* buffer_size) { + uint32_t* buffer_size, + int64_t* number_limit) { if (row_buf.size() <= 0) { return; } @@ -1797,6 +1806,7 @@ void TabletIO::ProcessRowBuffer(std::list& row_buf, continue; } value_list->add_key_values()->CopyFrom(*it); + (*number_limit)++; *buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); } diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index af691064d..28c82bba2 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -48,7 +48,8 @@ class TabletIO { struct ScanOptions { uint32_t max_versions; uint32_t version_num; // restore version_num for stream scan - uint32_t max_size; + uint32_t max_size; // kv size > max_size, return to user + int64_t number_limit; // kv number > number_limit, return to user int64_t ts_start; int64_t ts_end; QualifierRange qu_range;// {cf, } @@ -59,7 +60,7 @@ class TabletIO { int64_t timeout; ScanOptions() - : max_versions(UINT32_MAX), version_num(0), max_size(UINT32_MAX), + : max_versions(UINT32_MAX), version_num(0), max_size(UINT32_MAX), number_limit(INT64_MAX), ts_start(kOldestTs), ts_end(kLatestTs), snapshot_id(0), timeout(INT64_MAX / 2) {} }; @@ -190,7 +191,8 @@ class TabletIO { void ProcessRowBuffer(std::list& row_buf, const ScanOptions& scan_options, RowResult* value_list, - uint32_t* buffer_size); + uint32_t* buffer_size, + int64_t* number_limit); bool FilterCell(const ScanOptions& scan_options, const std::string& col, const std::string& qual, int64_t ts); diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 405180eed..1bb58074b 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -239,6 +239,7 @@ message ScanTabletRequest { optional int64 timestamp = 18 [default = 0]; optional int64 timeout = 19; repeated ScanQualifierRange qu_range = 20; + optional int64 number_limit = 21; } message ScanTabletResponse { diff --git a/src/sdk/scan.cc b/src/sdk/scan.cc index 23f736535..18b1f824d 100644 --- a/src/sdk/scan.cc +++ b/src/sdk/scan.cc @@ -61,6 +61,14 @@ void ScanDescriptor::SetBufferSize(int64_t buf_size) { _impl->SetBufferSize(buf_size); } +void ScanDescriptor::SetNumberLimit(int64_t number_limit) { + _impl->SetNumberLimit(number_limit); +} + +int64_t ScanDescriptor::GetNumberLimit() { + return _impl->GetNumberLimit(); +} + void ScanDescriptor::SetAsync(bool async) { _impl->SetAsync(async); } diff --git a/src/sdk/scan_impl.cc b/src/sdk/scan_impl.cc index 510b941e1..ed46790a2 100644 --- a/src/sdk/scan_impl.cc +++ b/src/sdk/scan_impl.cc @@ -753,6 +753,7 @@ ScanDescImpl::ScanDescImpl(const string& rowkey) : _start_timestamp(0), _timer_range(NULL), _buf_size(65536), + _number_limit(std::numeric_limits::max()), _is_async(FLAGS_tera_sdk_scan_async_enabled), _max_version(1), _pack_interval(5000), @@ -894,6 +895,14 @@ void ScanDescImpl::SetBufferSize(int64_t buf_size) { _buf_size = buf_size; } +void ScanDescImpl::SetNumberLimit(int64_t number_limit) { + _number_limit = number_limit; +} + +int64_t ScanDescImpl::GetNumberLimit() { + return _number_limit; +} + void ScanDescImpl::SetAsync(bool async) { _is_async = async; } diff --git a/src/sdk/scan_impl.h b/src/sdk/scan_impl.h index 975a8937f..ccf5fca78 100644 --- a/src/sdk/scan_impl.h +++ b/src/sdk/scan_impl.h @@ -262,6 +262,8 @@ class ScanDescImpl { void SetSnapshot(uint64_t snapshot_id); void SetBufferSize(int64_t buf_size); + void SetNumberLimit(int64_t number_limit); + int64_t GetNumberLimit(); void SetAsync(bool async); @@ -323,6 +325,7 @@ class ScanDescImpl { tera::TimeRange* _timer_range; QualifierRange _qu_range; int64_t _buf_size; + int64_t _number_limit; bool _is_async; int32_t _max_version; int64_t _pack_interval; diff --git a/src/sdk/tera.h b/src/sdk/tera.h index d2d3dac98..92ec84983 100644 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -277,6 +277,10 @@ class ScanDescriptor { /// 设置预读的buffer大小, 默认64K void SetBufferSize(int64_t buf_size); + /// set number limit for each buffer + void SetNumberLimit(int64_t number_limit); + int64_t GetNumberLimit(); + /// 设置async, 缺省true void SetAsync(bool async); From a57b75550bb7c402c41c7e107daf51dd0fadf7ba Mon Sep 17 00:00:00 2001 From: caijieming Date: Wed, 16 Dec 2015 15:09:53 +0800 Subject: [PATCH 2/3] LowLevelScan analyse, step 1, add limit function, bugfix --- src/sdk/table_impl.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 3177dfdb6..aba966768 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -346,6 +346,9 @@ void TableImpl::CommitScan(ScanTask* scan_task, if (impl->GetBufferSize() != 0) { request->set_buffer_limit(impl->GetBufferSize()); } + if (impl->GetNumberLimit() != 0) { + request->set_number_limit(impl->GetNumberLimit()); + } if (impl->GetTimerRange() != NULL) { TimeRange* time_range = request->mutable_timerange(); time_range->CopyFrom(*(impl->GetTimerRange())); From c12037433bbe4f45f722c1a9daff9ce0f366e1b4 Mon Sep 17 00:00:00 2001 From: caijieming-baidu Date: Wed, 16 Dec 2015 18:53:20 +0800 Subject: [PATCH 3/3] LowLevelScan analyse, step 1, add limit function, bugfixing... --- src/io/tablet_io.cc | 1 + src/sdk/scan_impl.cc | 5 ++++- src/sdk/table_impl.cc | 4 +++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index c4a416d48..68e673bca 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -1534,6 +1534,7 @@ void TabletIO::SetupScanRowOptions(const ScanTabletRequest* request, scan_options->number_limit = request->number_limit(); } } + VLOG(10) << "setup scan_options, number_limit " << scan_options->number_limit; if (request->timeout()) { scan_options->timeout = request->timeout(); } diff --git a/src/sdk/scan_impl.cc b/src/sdk/scan_impl.cc index ed46790a2..11ec50385 100644 --- a/src/sdk/scan_impl.cc +++ b/src/sdk/scan_impl.cc @@ -194,7 +194,7 @@ void ResultStreamBatchImpl::ScanSessionReset() { ref_count_ += FLAGS_tera_sdk_max_batch_scan_req; _scan_desc_impl->SetStart(session_end_key_); VLOG(28) << "scan session reset, start key " << session_end_key_ - << ", ref_count " << ref_count_; + << ", ref_count " << ref_count_ << ", session id " << session_id_; mu_.Unlock(); // do io, release lock for (int32_t i = 0; i < FLAGS_tera_sdk_max_batch_scan_req; i++) { @@ -769,6 +769,7 @@ ScanDescImpl::ScanDescImpl(const ScanDescImpl& impl) _start_qualifier(impl._start_qualifier), _start_timestamp(impl._start_timestamp), _buf_size(impl._buf_size), + _number_limit(impl._number_limit), _is_async(impl._is_async), _max_version(impl._max_version), _pack_interval(impl._pack_interval), @@ -897,9 +898,11 @@ void ScanDescImpl::SetBufferSize(int64_t buf_size) { void ScanDescImpl::SetNumberLimit(int64_t number_limit) { _number_limit = number_limit; + VLOG(30) << "number_limit " << _number_limit; } int64_t ScanDescImpl::GetNumberLimit() { + VLOG(30) << "get number_limit " << _number_limit; return _number_limit; } diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index aba966768..310e79cb8 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -346,9 +346,11 @@ void TableImpl::CommitScan(ScanTask* scan_task, if (impl->GetBufferSize() != 0) { request->set_buffer_limit(impl->GetBufferSize()); } - if (impl->GetNumberLimit() != 0) { + if (impl->GetNumberLimit() > 0) { request->set_number_limit(impl->GetNumberLimit()); } + VLOG(30) << "server " << server_addr << ", number_limit " << request->number_limit() + << ", seq id " << request->sequence_id() << ", session id " << request->session_id(); if (impl->GetTimerRange() != NULL) { TimeRange* time_range = request->mutable_timerange(); time_range->CopyFrom(*(impl->GetTimerRange()));