Skip to content

Commit

Permalink
Merge branch 'master' into limit-localAgg
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly authored Jun 28, 2024
2 parents 01b3824 + 70b816b commit ba8936b
Show file tree
Hide file tree
Showing 1,741 changed files with 75,024 additions and 20,200 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github:

required_pull_request_reviews:
dismiss_stale_reviews: true
require_code_owner_reviews: true
required_approving_review_count: 1
branch-1.1-lts:
required_status_checks:
Expand Down
18 changes: 18 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
be/src/io/* @platoneko @gavinchou @dataroaring
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ header:
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_orc.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/hive/scripts/data/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
Expand Down
5 changes: 0 additions & 5 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ option(USE_LIBCPP "Use libc++" OFF)
option(USE_MEM_TRACKER, "Use memory tracker" ON)
option(USE_UNWIND "Use libunwind" ON)
option(USE_JEMALLOC "Use jemalloc" ON)
option(USE_JEMALLOC_HOOK "Use jemalloc hook" ON)
if (OS_MACOSX)
set(GLIBC_COMPATIBILITY OFF)
set(USE_LIBCPP ON)
Expand All @@ -91,7 +90,6 @@ message(STATUS "GLIBC_COMPATIBILITY is ${GLIBC_COMPATIBILITY}")
message(STATUS "USE_LIBCPP is ${USE_LIBCPP}")
message(STATUS "USE_MEM_TRACKER is ${USE_MEM_TRACKER}")
message(STATUS "USE_JEMALLOC is ${USE_JEMALLOC}")
message(STATUS "USE_JEMALLOC_HOOK is ${USE_JEMALLOC_HOOK}")
message(STATUS "USE_UNWIND is ${USE_UNWIND}")
message(STATUS "ENABLE_PCH is ${ENABLE_PCH}")

Expand Down Expand Up @@ -350,9 +348,6 @@ endif()
if (USE_JEMALLOC)
add_definitions(-DUSE_JEMALLOC)
endif()
if (USE_JEMALLOC_HOOK)
add_definitions(-DUSE_JEMALLOC_HOOK)
endif()

# Compile with libunwind
if (USE_UNWIND)
Expand Down
7 changes: 5 additions & 2 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ add_thirdparty(re2)
add_thirdparty(hyperscan LIBNAME "lib64/libhs.a")
add_thirdparty(odbc)
add_thirdparty(pprof WHOLELIBPATH ${GPERFTOOLS_HOME}/lib/libprofiler.a)
add_thirdparty(tcmalloc WHOLELIBPATH ${GPERFTOOLS_HOME}/lib/libtcmalloc.a NOTADD)
add_thirdparty(protobuf)
add_thirdparty(gtest)
add_thirdparty(gtest_main)
Expand All @@ -77,7 +76,11 @@ add_thirdparty(libz LIBNAME "lib/libz.a")
add_thirdparty(crypto)
add_thirdparty(openssl LIBNAME "lib/libssl.a")
add_thirdparty(leveldb)
add_thirdparty(jemalloc LIBNAME "lib/libjemalloc_doris.a")
if (USE_JEMALLOC)
add_thirdparty(jemalloc LIBNAME "lib/libjemalloc_doris.a")
else()
add_thirdparty(tcmalloc WHOLELIBPATH ${GPERFTOOLS_HOME}/lib/libtcmalloc.a NOTADD)
endif()
add_thirdparty(jemalloc_arrow LIBNAME "lib/libjemalloc_arrow.a")

if (WITH_MYSQL)
Expand Down
3 changes: 3 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });

_workers[TTaskType::CLEAN_UDF_CACHE] = std::make_unique<TaskWorkerPool>(
"CLEAN_UDF_CACHE", 1, [](auto&& task) {return clean_udf_cache_callback(task); });

_workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

Expand Down
8 changes: 4 additions & 4 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path;
return Status::InternalError<false>("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
return Status::InvalidArgument<false>("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
}

if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(INFO) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path;
return Status::InternalError<false>("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
return Status::InvalidArgument<false>("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
}

if (_doris_cgroup_cpu_path.back() != '/') {
Expand Down
7 changes: 6 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time();
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time()
<< ", host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
}
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/jni-util.h"
#include "util/mem_info.h"
#include "util/random.h"
#include "util/s3_util.h"
Expand Down Expand Up @@ -2060,4 +2061,11 @@ void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
LOG(INFO) << "clean trash finish";
}

void clean_udf_cache_callback(const TAgentTaskRequest& req) {
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
static_cast<void>(
JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_udf_cache_callback(const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);
Expand Down
18 changes: 10 additions & 8 deletions be/src/agent/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste
try {
try {
client->finishTask(*result, request);
} catch (TTransportException& e) {
} catch ([[maybe_unused]] TTransportException& e) {
#ifdef ADDRESS_SANITIZER
return Status::RpcError<false>("Master client finish task failed due to {}", e.what());
#else
LOG(WARNING) << "master client, retry finishTask: " << e.what();
#endif
client_status = client.reopen(config::thrift_rpc_timeout_ms);
if (!client_status.ok()) {
#ifdef ADDRESS_SANITIZER
LOG(WARNING) << "fail to get master client from cache. "
<< "host=" << _master_info.network_address.hostname
<< ", port=" << _master_info.network_address.port
<< ", code=" << client_status.code();
#endif
return Status::RpcError("Master client finish task failed");
}
client->finishTask(*result, request);
#endif
}
} catch (std::exception& e) {
RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
Expand Down Expand Up @@ -133,31 +133,33 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult*
try {
client->report(*result, request);
} catch (TTransportException& e) {
#ifdef ADDRESS_SANITIZER
return Status::RpcError<false>("Master client report failed due to {}", e.what());
#else
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
#ifdef ADDRESS_SANITIZER
// if not TIMED_OUT, retry
LOG(WARNING) << "master client, retry finishTask: " << e.what();
#endif

client_status = client.reopen(config::thrift_rpc_timeout_ms);
if (!client_status.ok()) {
#ifdef ADDRESS_SANITIZER
LOG(WARNING) << "fail to get master client from cache. "
<< "host=" << _master_info.network_address.hostname
<< ", port=" << _master_info.network_address.port
<< ", code=" << client_status.code();
#endif
return Status::InternalError("Fail to get master client from cache");
}

client->report(*result, request);
} else {
// TIMED_OUT exception. do not retry
// actually we don't care what FE returns.
#ifdef ADDRESS_SANITIZER
LOG(WARNING) << "fail to report to master: " << e.what();
#endif
return Status::InternalError("Fail to report to master");
}
#endif
}
} catch (std::exception& e) {
RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
Expand Down
10 changes: 10 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
13 changes: 11 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,20 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
return st;
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

Expand Down
15 changes: 12 additions & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
int64_t txn_expiration;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info);
&partial_update_info, &publish_status);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
Expand All @@ -172,8 +173,16 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration);
auto update_delete_bitmap_time_us = MonotonicMicros() - t3;
txn_info.publish_status = publish_status;
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) {
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id,
txn_expiration);
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
Expand Down
41 changes: 41 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <type_traits>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
Expand All @@ -50,6 +51,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -546,6 +548,36 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
}
}

bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap) {
std::set<int64_t> txn_processed;
for (auto& rs_meta : rs_metas) {
auto txn_id = rs_meta.txn_id();
if (txn_processed.find(txn_id) != txn_processed.end()) {
continue;
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
RowsetIdUnorderedSet tmp_rowset_ids;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status);
if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
delete_bitmap->merge(*tmp_delete_bitmap);
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id()
<< ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
return true;
}

Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
Expand All @@ -554,6 +586,15 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
return Status::OK();
}

if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
<< tablet->tablet_id();
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class CloudMetaMgr {
int64_t initiator);

private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap);

Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
Expand Down
Loading

0 comments on commit ba8936b

Please sign in to comment.