Skip to content

Commit

Permalink
Merge branch 'master' into 20230819_fix_stacktrace
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Sep 20, 2023
2 parents 36fb5e7 + cf4143f commit e48c9c4
Show file tree
Hide file tree
Showing 300 changed files with 40,748 additions and 3,897 deletions.
1 change: 1 addition & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_http_port(config::webserver_port);
heartbeat_result.backend_info.__set_be_rpc_port(-1);
heartbeat_result.backend_info.__set_brpc_port(config::brpc_port);
heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port);
heartbeat_result.backend_info.__set_version(get_short_version());
heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ DEFINE_Int32(be_port, "9060");
// port for brpc
DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_port, "-1");
DEFINE_Int32(arrow_flight_sql_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -581,7 +581,7 @@ DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

// sync tablet_meta when modifying meta
DEFINE_mBool(sync_tablet_meta, "false");
DEFINE_mBool(sync_tablet_meta, "true");

// default thrift rpc timeout ms
DEFINE_mInt32(thrift_rpc_timeout_ms, "60000");
Expand Down
6 changes: 3 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ DECLARE_Int32(be_port);
// port for brpc
DECLARE_Int32(brpc_port);

// port for arrow flight
// Default -1, do not start arrow flight server.
DECLARE_Int32(arrow_flight_port);
// port for arrow flight sql
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
23 changes: 10 additions & 13 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

#include "common/config.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_vtablet_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
Expand Down Expand Up @@ -146,21 +145,20 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node) {
sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(
new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
Expand Down Expand Up @@ -294,12 +292,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node) {
sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
Expand All @@ -313,10 +311,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(
new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ODBCConnector::ODBCConnector(const ODBCConnectorParam& param)
_dbc(nullptr),
_stmt(nullptr) {}

Status ODBCConnector::close() {
Status ODBCConnector::close(Status) {
// do not commit transaction, roll back
if (_is_in_transaction) {
abort_trans();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ODBCConnector : public TableConnector {
uint32_t big_column_size_buffer = config::big_column_size_buffer;
uint32_t small_column_size_buffer = config::small_column_size_buffer;

Status close() override;
Status close(Status) override;

private:
static Status error_status(const std::string& prefix, const std::string& error_msg);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/table_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TableConnector {
virtual Status abort_trans() = 0; // should be call after transaction abort
virtual Status finish_trans() = 0; // should be call after transaction commit

virtual Status close() = 0;
virtual Status close(Status) = 0;

virtual Status exec_stmt_write(vectorized::Block* block,
const vectorized::VExprContextSPtrs& _output_vexpr_ctxs,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace doris {
namespace pipeline {

class OlapTableSinkOperatorBuilder final
: public DataSinkOperatorBuilder<stream_load::VOlapTableSink> {
: public DataSinkOperatorBuilder<vectorized::VOlapTableSink> {
public:
OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace doris {
namespace pipeline {

class OlapTableSinkV2OperatorBuilder final
: public DataSinkOperatorBuilder<stream_load::VOlapTableSinkV2> {
: public DataSinkOperatorBuilder<vectorized::VOlapTableSinkV2> {
public:
OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ void PipelineTask::_init_profile() {
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
_wait_bf_counts = ADD_COUNTER(_task_profile, "WaitBfTimes", TUnit::UNIT);
_wait_dependency_counts = ADD_COUNTER(_task_profile, "WaitDenpendencyTimes", TUnit::UNIT);
_pending_finish_counts = ADD_COUNTER(_task_profile, "PendingFinishTimes", TUnit::UNIT);

_begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime");
_eos_timer = ADD_TIMER(_task_profile, "Task2EosTime");
Expand Down Expand Up @@ -385,6 +388,11 @@ void PipelineTask::set_state(PipelineTaskState state) {
COUNTER_UPDATE(_block_by_sink_counts, 1);
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
_wait_bf_watcher.start();
COUNTER_UPDATE(_wait_bf_counts, 1);
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
COUNTER_UPDATE(_wait_dependency_counts, 1);
} else if (state == PipelineTaskState::PENDING_FINISH) {
COUNTER_UPDATE(_pending_finish_counts, 1);
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,13 @@ class PipelineTask {
RuntimeProfile::Counter* _wait_source_timer;
MonotonicStopWatch _wait_bf_watcher;
RuntimeProfile::Counter* _wait_bf_timer;
RuntimeProfile::Counter* _wait_bf_counts;
MonotonicStopWatch _wait_sink_watcher;
RuntimeProfile::Counter* _wait_sink_timer;
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer;
RuntimeProfile::Counter* _wait_dependency_counts;
RuntimeProfile::Counter* _pending_finish_counts;
// TODO we should calculate the time between when really runnable and runnable
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ void PipelineXTask::_init_profile() {
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);

_wait_bf_counts = ADD_COUNTER(_task_profile, "WaitBfTimes", TUnit::UNIT);
_wait_dependency_counts = ADD_COUNTER(_task_profile, "WaitDenpendencyTimes", TUnit::UNIT);
_pending_finish_counts = ADD_COUNTER(_task_profile, "PendingFinishTimes", TUnit::UNIT);
}

void PipelineXTask::_fresh_profile_counter() {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
_fe_result_batch_queue.push_back(std::move(result));
}
_buffer_rows += num_rows;
_data_arrival.notify_one();
} else {
auto ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace doris {
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
class DeltaWriterV2Pool;
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
} // namespace vectorized
namespace pipeline {
Expand All @@ -51,7 +52,6 @@ namespace taskgroup {
class TaskGroupManager;
}
namespace stream_load {
class DeltaWriterV2Pool;
class LoadStreamStubPool;
} // namespace stream_load
namespace io {
Expand Down Expand Up @@ -241,7 +241,8 @@ class ExecEnv {
stream_load::LoadStreamStubPool* load_stream_stub_pool() {
return _load_stream_stub_pool.get();
}
stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

void wait_for_all_tasks_done();

Expand Down Expand Up @@ -344,7 +345,7 @@ class ExecEnv {
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
std::unique_ptr<stream_load::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;

std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<stream_load::LoadStreamStubPool>();
_delta_writer_v2_pool = std::make_unique<stream_load::DeltaWriterV2Pool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();

_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
Expand Down
40 changes: 23 additions & 17 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <stdint.h>

#include <chrono>
#include <thread>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <memory>
Expand All @@ -33,6 +36,7 @@
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/thread.h"
#include "util/uid_util.h"

namespace doris {

Expand All @@ -42,7 +46,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) {
// Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the
// actual size of all BufferControlBlock.
REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
// std::lock_guard<std::mutex> l(_buffer_map_lock);
return _buffer_map.size();
});
}
Expand Down Expand Up @@ -80,7 +84,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

{
std::lock_guard<std::mutex> l(_lock);
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
_buffer_map.insert(std::make_pair(query_id, control_block));
// BufferControlBlock should destroy after max_timeout
// for exceed max_timeout FE will return timeout to client
Expand All @@ -95,8 +99,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) {
// TODO(zhaochun): this lock can be bottleneck?
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
Expand All @@ -108,14 +111,12 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU

void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
const RowDescriptor& row_desc) {
{
std::lock_guard<std::mutex> l(_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}

RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != iter) {
Expand Down Expand Up @@ -150,18 +151,23 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
}

Status ResultBufferMgr::cancel(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
}
}

RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
}
}

return Status::OK();
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -86,9 +87,11 @@ class ResultBufferMgr {
void cancel_thread();

// lock for buffer map
std::mutex _lock;
std::shared_mutex _buffer_map_lock;
// buffer block map
BufferMap _buffer_map;
// lock for descriptor map
std::shared_mutex _row_descriptor_map_lock;
// for arrow flight
RowDescriptorMap _row_descriptor_map;

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ResultWriter {

virtual Status init(RuntimeState* state) = 0;

virtual Status close() = 0;
virtual Status close(Status s = Status::OK()) = 0;

[[nodiscard]] virtual int64_t get_written_rows() const { return _written_rows; }

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class RuntimeState {
const TQueryGlobals& query_globals, ExecEnv* exec_env);

// for ut and non-query.
void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; }
void init_mem_trackers(const TUniqueId& id = TUniqueId(), const std::string& name = "unknown");

const TQueryOptions& query_options() const { return _query_options; }
Expand Down
Loading

0 comments on commit e48c9c4

Please sign in to comment.