Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 14, 2024
1 parent 2f2ccc6 commit 25d97ba
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Status ResultBufferMgr::init() {
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size,
std::shared_ptr<BufferControlBlock>* sender, int exec_timout,
int batch_size) {
LOG(INFO) << "ResultBufferMgr::create_sender " << print_id(query_id);
*sender = find_control_block(query_id);
if (*sender != nullptr) {
LOG(WARNING) << "already have buffer control block for this instance " << query_id;
Expand All @@ -81,6 +82,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size

{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
LOG(INFO) << "ResultBufferMgr::create_sender insert " << print_id(query_id);
_buffer_map.insert(std::make_pair(query_id, control_block));
// BufferControlBlock should destroy after max_timeout
// for exceed max_timeout FE will return timeout to client
Expand Down Expand Up @@ -136,6 +138,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c

Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<arrow::RecordBatch>* result) {
LOG(INFO) << "ResultBufferMgr::fetch_arrow_data " << print_id(finst_id);
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
if (cb == nullptr) {
return Status::InternalError("no result for this query, finst_id={}", print_id(finst_id));
Expand All @@ -145,12 +148,14 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
}

void ResultBufferMgr::cancel(const TUniqueId& query_id) {
LOG(INFO) << "ResultBufferMgr::cancel " << print_id(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
auto iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
LOG(INFO) << "ResultBufferMgr::cancel _buffer_map " << print_id(query_id);
_buffer_map.erase(iter);
}
}
Expand All @@ -160,6 +165,7 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) {
auto arrow_schema_iter = _arrow_schema_map.find(query_id);

if (_arrow_schema_map.end() != arrow_schema_iter) {
LOG(INFO) << "ResultBufferMgr::cancel _arrow_schema_map " << print_id(query_id);
_arrow_schema_map.erase(arrow_schema_iter);
}
}
Expand Down

0 comments on commit 25d97ba

Please sign in to comment.