diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index f92282c9e..68e673bca 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -648,6 +648,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, std::list row_buf; std::string last_key, last_col, last_qual; uint32_t buffer_size = 0; + int64_t number_limit = 0; uint32_t version_num = 1; uint64_t nr_scan_round = 0; value_list->clear_key_values(); @@ -804,7 +805,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, if (key.compare(last_key) != 0) { *read_row_count += 1; if (has_filter) { - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size, &number_limit); row_buf.clear(); } } @@ -844,6 +845,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, if (!has_filter) { if (!FilterCell(scan_options, col.ToString(), qual.ToString(), ts)) { value_list->add_key_values()->CopyFrom(kv); + number_limit++; buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); VLOG(10) << "ll-scan Get kv " << key.ToString() << ", buffer_size " << buffer_size; } @@ -852,7 +854,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, } // check scan buffer - if (buffer_size >= scan_options.max_size) { + if (buffer_size >= scan_options.max_size || number_limit >= scan_options.number_limit) { VLOG(10) << "ll-scan, stream scan, break scan context, version_num " << version_num << ", key " << DebugString(key.ToString()) << ", col " << DebugString(col.ToString()) << ", qual " << DebugString(qual.ToString()); @@ -866,7 +868,7 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, // process the last row of tablet if (has_filter) { - ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size); + ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size, &number_limit); } leveldb::Status it_status; if (!it->Valid()) { @@ -884,7 +886,8 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, // check if scan finished SetStatusCode(kTableOk, status); - if (buffer_size < scan_options.max_size && now_time <= time_out) { + // if not timeout or buffer size or number limit overflow, then set complete flag + if ((buffer_size < scan_options.max_size && number_limit < scan_options.number_limit) && now_time <= time_out) { *is_complete = true; } else { if (now_time > time_out && next_start_point) { @@ -1526,6 +1529,12 @@ void TabletIO::SetupScanRowOptions(const ScanTabletRequest* request, if (request->has_buffer_limit()) { scan_options->max_size = request->buffer_limit(); } + if (request->has_number_limit()) { + if (request->number_limit() > 0) { + scan_options->number_limit = request->number_limit(); + } + } + VLOG(10) << "setup scan_options, number_limit " << scan_options->number_limit; if (request->timeout()) { scan_options->timeout = request->timeout(); } @@ -1760,7 +1769,8 @@ static bool CheckCell(const KeyValuePair& kv, const Filter& filter) { void TabletIO::ProcessRowBuffer(std::list& row_buf, const ScanOptions& scan_options, RowResult* value_list, - uint32_t* buffer_size) { + uint32_t* buffer_size, + int64_t* number_limit) { if (row_buf.size() <= 0) { return; } @@ -1797,6 +1807,7 @@ void TabletIO::ProcessRowBuffer(std::list& row_buf, continue; } value_list->add_key_values()->CopyFrom(*it); + (*number_limit)++; *buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size(); } diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index af691064d..28c82bba2 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -48,7 +48,8 @@ class TabletIO { struct ScanOptions { uint32_t max_versions; uint32_t version_num; // restore version_num for stream scan - uint32_t max_size; + uint32_t max_size; // kv size > max_size, return to user + int64_t number_limit; // kv number > number_limit, return to user int64_t ts_start; int64_t ts_end; QualifierRange qu_range;// {cf, } @@ -59,7 +60,7 @@ class TabletIO { int64_t timeout; ScanOptions() - : max_versions(UINT32_MAX), version_num(0), max_size(UINT32_MAX), + : max_versions(UINT32_MAX), version_num(0), max_size(UINT32_MAX), number_limit(INT64_MAX), ts_start(kOldestTs), ts_end(kLatestTs), snapshot_id(0), timeout(INT64_MAX / 2) {} }; @@ -190,7 +191,8 @@ class TabletIO { void ProcessRowBuffer(std::list& row_buf, const ScanOptions& scan_options, RowResult* value_list, - uint32_t* buffer_size); + uint32_t* buffer_size, + int64_t* number_limit); bool FilterCell(const ScanOptions& scan_options, const std::string& col, const std::string& qual, int64_t ts); diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 405180eed..1bb58074b 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -239,6 +239,7 @@ message ScanTabletRequest { optional int64 timestamp = 18 [default = 0]; optional int64 timeout = 19; repeated ScanQualifierRange qu_range = 20; + optional int64 number_limit = 21; } message ScanTabletResponse { diff --git a/src/sdk/scan.cc b/src/sdk/scan.cc index 23f736535..18b1f824d 100644 --- a/src/sdk/scan.cc +++ b/src/sdk/scan.cc @@ -61,6 +61,14 @@ void ScanDescriptor::SetBufferSize(int64_t buf_size) { _impl->SetBufferSize(buf_size); } +void ScanDescriptor::SetNumberLimit(int64_t number_limit) { + _impl->SetNumberLimit(number_limit); +} + +int64_t ScanDescriptor::GetNumberLimit() { + return _impl->GetNumberLimit(); +} + void ScanDescriptor::SetAsync(bool async) { _impl->SetAsync(async); } diff --git a/src/sdk/scan_impl.cc b/src/sdk/scan_impl.cc index 510b941e1..11ec50385 100644 --- a/src/sdk/scan_impl.cc +++ b/src/sdk/scan_impl.cc @@ -194,7 +194,7 @@ void ResultStreamBatchImpl::ScanSessionReset() { ref_count_ += FLAGS_tera_sdk_max_batch_scan_req; _scan_desc_impl->SetStart(session_end_key_); VLOG(28) << "scan session reset, start key " << session_end_key_ - << ", ref_count " << ref_count_; + << ", ref_count " << ref_count_ << ", session id " << session_id_; mu_.Unlock(); // do io, release lock for (int32_t i = 0; i < FLAGS_tera_sdk_max_batch_scan_req; i++) { @@ -753,6 +753,7 @@ ScanDescImpl::ScanDescImpl(const string& rowkey) : _start_timestamp(0), _timer_range(NULL), _buf_size(65536), + _number_limit(std::numeric_limits::max()), _is_async(FLAGS_tera_sdk_scan_async_enabled), _max_version(1), _pack_interval(5000), @@ -768,6 +769,7 @@ ScanDescImpl::ScanDescImpl(const ScanDescImpl& impl) _start_qualifier(impl._start_qualifier), _start_timestamp(impl._start_timestamp), _buf_size(impl._buf_size), + _number_limit(impl._number_limit), _is_async(impl._is_async), _max_version(impl._max_version), _pack_interval(impl._pack_interval), @@ -894,6 +896,16 @@ void ScanDescImpl::SetBufferSize(int64_t buf_size) { _buf_size = buf_size; } +void ScanDescImpl::SetNumberLimit(int64_t number_limit) { + _number_limit = number_limit; + VLOG(30) << "number_limit " << _number_limit; +} + +int64_t ScanDescImpl::GetNumberLimit() { + VLOG(30) << "get number_limit " << _number_limit; + return _number_limit; +} + void ScanDescImpl::SetAsync(bool async) { _is_async = async; } diff --git a/src/sdk/scan_impl.h b/src/sdk/scan_impl.h index 975a8937f..ccf5fca78 100644 --- a/src/sdk/scan_impl.h +++ b/src/sdk/scan_impl.h @@ -262,6 +262,8 @@ class ScanDescImpl { void SetSnapshot(uint64_t snapshot_id); void SetBufferSize(int64_t buf_size); + void SetNumberLimit(int64_t number_limit); + int64_t GetNumberLimit(); void SetAsync(bool async); @@ -323,6 +325,7 @@ class ScanDescImpl { tera::TimeRange* _timer_range; QualifierRange _qu_range; int64_t _buf_size; + int64_t _number_limit; bool _is_async; int32_t _max_version; int64_t _pack_interval; diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 3177dfdb6..310e79cb8 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -346,6 +346,11 @@ void TableImpl::CommitScan(ScanTask* scan_task, if (impl->GetBufferSize() != 0) { request->set_buffer_limit(impl->GetBufferSize()); } + if (impl->GetNumberLimit() > 0) { + request->set_number_limit(impl->GetNumberLimit()); + } + VLOG(30) << "server " << server_addr << ", number_limit " << request->number_limit() + << ", seq id " << request->sequence_id() << ", session id " << request->session_id(); if (impl->GetTimerRange() != NULL) { TimeRange* time_range = request->mutable_timerange(); time_range->CopyFrom(*(impl->GetTimerRange())); diff --git a/src/sdk/tera.h b/src/sdk/tera.h index d2d3dac98..92ec84983 100644 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -277,6 +277,10 @@ class ScanDescriptor { /// 设置预读的buffer大小, 默认64K void SetBufferSize(int64_t buf_size); + /// set number limit for each buffer + void SetNumberLimit(int64_t number_limit); + int64_t GetNumberLimit(); + /// 设置async, 缺省true void SetAsync(bool async);