Skip to content

Commit

Permalink
Merge branch 'master' into 20240625_arrow_flight_regressiontest
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 8, 2024
2 parents bc2668a + 111886a commit 6cbb726
Show file tree
Hide file tree
Showing 2,279 changed files with 89,506 additions and 51,487 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ docker/thirdparties/docker-compose/hive/scripts/paimon1
fe_plugins/output
fe_plugins/**/.factorypath

docker/thirdparties/docker-compose/hive/scripts/data/*/*/data

fs_brokers/apache_hdfs_broker/src/main/resources/
fs_brokers/apache_hdfs_broker/src/main/thrift/

Expand Down Expand Up @@ -100,7 +102,6 @@ be/tags
be/test/olap/test_data/tablet_meta_test.hdr
be/.devcontainer/
be/src/apache-orc/
zoneinfo/

# Cloud
cloud/build*/
Expand Down
7 changes: 1 addition & 6 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ header:
- "docs/package-lock.json"
- "regression-test/script/README"
- "regression-test/suites/load_p0/stream_load/data"
- "docker/thirdparties/docker-compose/hive/scripts/README"
- "docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_orc.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/hive/scripts/suites/**"
- "docker/thirdparties/docker-compose/hive/scripts/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
Expand Down
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ under the License.
# Apache Doris
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![GitHub release](https://img.shields.io/github/release/apache/doris.svg)](https://github.com/apache/doris/releases)
[![OSSRank](https://shields.io/endpoint?url=https://ossrank.com/shield/516)](https://ossrank.com/p/516)
[![Jenkins Vec](https://img.shields.io/jenkins/tests?compact_message&jobUrl=https://ci-builds.apache.org/job/Doris/job/doris_daily_enable_vectorized&label=VectorizedEngine)](https://ci-builds.apache.org/job/Doris/job/doris_daily_enable_vectorized)
[![Total Lines](https://tokei.rs/b1/github/apache/doris?category=lines)](https://github.com/apache/doris)
[![Join the Doris Community on Slack](https://join.slack.com/t/apachedoriscommunity/shared_invite/zt-2kl08hzc0-SPJe4VWmL_qzrFd2u2XYQA)
Expand All @@ -32,7 +33,7 @@ under the License.

Apache Doris is an MPP-based real-time data warehouse known for its high query speed. For queries on large datasets, it returns results in sub-seconds. It supports both high-concurrency point queries and high-throughput complex analysis. It can be used for report analysis, ad-hoc queries, unified data warehouse building, and data lake query acceleration. Based on Apache Doris, users can build applications for user behavior analysis, A/B testing platform, log analysis, and e-commerce order analysis.

Please visit our [official download page](https://doris.apache.org/download/) to get the latest release version.
Please visit our 🔗[official download page](https://doris.apache.org/download/) to get the latest release version.

The current stable version is the 2.0.x series, and the latest version is the 2.1.x series. For production, it is recommended to use the latest version of the 2.0.x series. And if used for POC or testing, it is recommended to use the latest version of the 2.1.x series.

Expand Down Expand Up @@ -110,10 +111,14 @@ Apache Doris uses Adaptive Query Execution technology to dynamically adjust the

### 🚅 Query Optimizer

In terms of optimizers, Doris uses a combination of CBO and RBO. RBO supports constant folding, subquery rewriting, predicate pushdown and CBO supports Join Reorder. The Doris CBO is under continuous optimization for more accurate statistical information collection and derivation, and more accurate cost model prediction.
In terms of optimizers, Doris uses a combination of CBO and RBO. RBO supports constant folding, subquery rewriting, predicate pushdown. The Doris CBO is under continuous optimization for more accurate statistical information collection and derivation, and more accurate cost model prediction.

The query optimizer in V2.0 has a richer statistical base and adopts the Cascades framework. It is capable of self-tuning in most query scenarios and supports all 99 SQLs in TPC-DS, so users can expect high performance without any fine-tuning or SQL rewriting.

**Technical Overview**: 🔗[Introduction to Apache Doris](https://doris.apache.org/docs/dev/summary/basic-summary)
The query optimizer in V2.1 comes with enhanced statistics-based inference and enumeration framework. We have upgraded the cost model and expanded the optimization rules to serve the needs of more use cases.


**Technical Overview**: 🔗[Introduction to Apache Doris](https://doris.apache.org/docs/get-starting/what-is-apache-doris)

## 🎆 Why choose Apache Doris?

Expand All @@ -133,7 +138,7 @@ In terms of optimizers, Doris uses a combination of CBO and RBO. RBO supports co

**Apache Doris has graduated from Apache incubator successfully and become a Top-Level Project in June 2022**.

Currently, the Apache Doris community has gathered more than 600 contributors from over 200 companies in different industries, and the number of monthly active contributors exceeds 100.
Currently, the Apache Doris community has gathered more than 600 contributors from over 200 companies in different industries, and the number of monthly active contributors exceeds 120.


[![Monthly Active Contributors](https://contributor-overtime-api.apiseven.com/contributors-svg?chart=contributorMonthlyActivity&repo=apache/doris)](https://www.apiseven.com/en/contributor-graph?chart=contributorMonthlyActivity&repo=apache/doris)
Expand All @@ -156,17 +161,19 @@ Add your company logo at Apache Doris Website: 🔗[Add Your Company](https://gi

All Documentation 🔗[Docs](https://doris.apache.org/docs/get-starting/quick-start)

Documentation Repo 🔗[Docs Repo](https://github.com/apache/doris-website)

### ⬇️ Download

All release and binary version 🔗[Download](https://doris.apache.org/download)

### 🗄️ Compile

See how to compile 🔗[Compilation](https://doris.apache.org/docs/dev/install/source-install/compilation-general)
See how to compile 🔗[Compilation](https://doris.apache.org/docs/install/source-install/compilation-with-docker)

### 📮 Install

See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/standard-deployment)
See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/install/cluster-deployment/standard-deployment)

## 🧩 Components

Expand Down
7 changes: 4 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() {
return Status::InternalError<false>("invalid cgroup path, not find cpu quota file");
}

if (_tg_id == -1) {
if (_wg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains workload group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
Expand All @@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() {
}

// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id);
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
Expand Down Expand Up @@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return Status::OK();
#else
int tid = static_cast<int>(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::string msg =
"add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id);
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
#endif
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }

virtual Status init();

Expand Down Expand Up @@ -63,7 +63,7 @@ class CgroupCpuCtl {
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id = -1; // workload group id
uint64_t _wg_id = -1; // workload group id
uint64_t _cpu_shares = 0;
};

Expand Down
7 changes: 6 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time();
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time()
<< ", host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
}
}

Expand Down
38 changes: 15 additions & 23 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
}

PriorTaskWorkerPool::PriorTaskWorkerPool(
std::string_view name, int normal_worker_count, int high_prior_worker_count,
const std::string& name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
.set_min_threads(normal_worker_count)
.set_max_threads(normal_worker_count)
.build(&_normal_pool);
CHECK(st.ok()) << name << ": " << st;

st = _normal_pool->submit_func([this] { normal_loop(); });
CHECK(st.ok()) << name << ": " << st;

st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
.set_min_threads(high_prior_worker_count)
.set_max_threads(high_prior_worker_count)
.build(&_high_prior_pool);
CHECK(st.ok()) << name << ": " << st;
for (int i = 0; i < normal_worker_count; ++i) {
auto st = Thread::create(
"Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
CHECK(st.ok()) << name << ": " << st;
}

st = _high_prior_pool->submit_func([this] { high_prior_loop(); });
CHECK(st.ok()) << name << ": " << st;
for (int i = 0; i < high_prior_worker_count; ++i) {
auto st = Thread::create(
"HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
CHECK(st.ok()) << name << ": " << st;
}
}

PriorTaskWorkerPool::~PriorTaskWorkerPool() {
Expand All @@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() {
_normal_condv.notify_all();
_high_prior_condv.notify_all();

if (_normal_pool) {
_normal_pool->shutdown();
}

if (_high_prior_pool) {
_high_prior_pool->shutdown();
for (auto&& w : _workers) {
if (w) {
w->join();
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {

class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count,
PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback);

~PriorTaskWorkerPool() override;
Expand All @@ -101,8 +102,7 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
std::condition_variable _high_prior_condv;
std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue;

std::unique_ptr<ThreadPool> _normal_pool;
std::unique_ptr<ThreadPool> _high_prior_pool;
std::vector<scoped_refptr<Thread>> _workers;

std::function<void(const TAgentTaskRequest&)> _callback;
};
Expand Down
26 changes: 13 additions & 13 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,39 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
is_set_workload_group_info = true;

// 1 parse topic info to group info
WorkloadGroupInfo workload_group_info;
Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
&workload_group_info);
WorkloadGroupInfo workload_group_info =
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info);
// it means FE has this wg, but may parse failed, so we should not delete it.
if (workload_group_info.id != 0) {
current_wg_ids.insert(workload_group_info.id);
}
if (!ret.ok()) {
LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
<< workload_group_info.id << ", reason:" << ret.to_string();
if (!workload_group_info.valid) {
LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
<< workload_group_info.id << ", reason: [tworkload_group_info.__isset.id: "
<< topic_info.workload_group_info.__isset.id
<< ", tworkload_group_info.__isset.version: "
<< topic_info.workload_group_info.__isset.version << "]";
continue;
}

// 2 update workload group
auto tg =
auto wg =
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);

// 3 set cpu soft hard limit switch
_exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store(
workload_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
tg->upsert_task_scheduler(&workload_group_info, _exec_env);
wg->upsert_task_scheduler(&workload_group_info, _exec_env);

LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
<< tg->debug_string() << ", enable_cpu_hard_limit="
LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size;
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
}

// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
Expand Down
10 changes: 10 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
13 changes: 11 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,20 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
return st;
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,12 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
if (ctx.db_id > 0 && !ctx.label.empty()) {
req.set_db_id(ctx.db_id);
req.set_label(ctx.label);
} else {
} else if (ctx.txn_id > 0) {
req.set_txn_id(ctx.txn_id);
} else {
LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id
<< " txn_id=" << ctx.txn_id << " label=" << ctx.label;
return Status::InternalError<false>("failed to abort txn");
}
return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
}
Expand Down
Loading

0 comments on commit 6cbb726

Please sign in to comment.