Skip to content

Commit

Permalink
Merge branch 'master' into 20231012_fix_table_sample
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Oct 24, 2023
2 parents 6ebfbfe + 08832d9 commit 411bd70
Show file tree
Hide file tree
Showing 1,069 changed files with 27,646 additions and 14,311 deletions.
19 changes: 7 additions & 12 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ Checks: |
clang-analyzer-*,
-*,
bugprone-redundant-branch-condition,
modernize-use-override,
modernize-use-equals-default,
modernize-use-equals-delete,
modernize-use-nodiscard,
modernize-use-nullptr,
modernize-use-bool-literals,
modernize-use-using,
modernize-*,
-modernize-use-trailing-return-type,
-modernize-use-nodiscard,
misc-redundant-expression,
misc-unused-*,
-misc-unused-parameters,
readability-make-member-function-const,
readability-non-const-parameter,
readability-static-accessed-through-instance,
readability-redundant-*,
readability-braces-around-statements,
readability-*,
-readability-identifier-length,
-readability-implicit-bool-conversion,
-readability-function-cognitive-complexity,
portability-simd-intrinsics,
performance-type-promotion-in-math-fn,
performance-faster-string-find,
Expand Down
16 changes: 10 additions & 6 deletions .github/workflows/auto_trigger_teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ on:

env:
PULL_REQUEST_NUM: $(echo "${{ github.event.issue.pull_request.url }}" | awk -F/ '{print $NF}')
LATEST_COMMIT: $(curl -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/$(echo '${{ github.event.issue.pull_request.url }}' | awk -F/ '{print $NF}')" > pr_details.json && jq -r '.head.sha' pr_details.json)
LATEST_COMMIT: $(curl -s -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/$(echo '${{ github.event.issue.pull_request.url }}' | awk -F/ '{print $NF}')" > pr_details.json && jq -r '.head.sha' pr_details.json)
TEAMCITY_URL: '-H \"Content-Type:text/plain\" -H \"Accept: application/json\" -u OneMoreChance:OneMoreChance http://43.132.222.7:8111'

jobs:
run_teamcity_pipeline:
if: (contains(github.event.comment.body, 'buildall') || contains(github.event.comment.body, 'p0') || contains(github.event.comment.body, 'nereids_p0') || contains(github.event.comment.body, 'p1') || contains(github.event.comment.body, 'feut') || contains(github.event.comment.body, 'beut') || contains(github.event.comment.body, 'compile') || contains(github.event.comment.body, 'clickbench') || contains(github.event.comment.body, 'arm') || contains(github.event.comment.body, 'external') || contains(github.event.comment.body, 'just_for_test')) && contains(github.event.comment.body, 'run') && !contains(github.event.comment.body, 'Thanks for your contribution')
if: (contains(github.event.comment.body, 'buildall') || contains(github.event.comment.body, 'p0') || contains(github.event.comment.body, 'pipelinex_p0') || contains(github.event.comment.body, 'p1') || contains(github.event.comment.body, 'feut') || contains(github.event.comment.body, 'beut') || contains(github.event.comment.body, 'compile') || contains(github.event.comment.body, 'clickbench') || contains(github.event.comment.body, 'arm') || contains(github.event.comment.body, 'external') || contains(github.event.comment.body, 'just_for_test')) && contains(github.event.comment.body, 'run') && !contains(github.event.comment.body, 'Thanks for your contribution')

runs-on: ubuntu-latest

Expand All @@ -56,14 +56,15 @@ jobs:
echo "encoded_string : ${encoded_string}"
echo "latest_commit_id : ${{ env.LATEST_COMMIT }}"
set -x
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "buildall" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
trigger_pipelines="Doris_Doris_FeUt Doris_DorisBeUt_BeUt Doris_DorisCompile_Compile Doris_Performance_Clickbench_ClickbenchNew Doris_ArmPipeline_P0Regression ${trigger_pipelines}"
trigger_pipelines="Doris_DorisRegression_P0RegressionPipelineX Doris_Doris_FeUt Doris_DorisBeUt_BeUt Doris_DorisCompile_Compile Doris_Performance_Clickbench_ClickbenchNew Doris_ArmPipeline_P0Regression ${trigger_pipelines}"
fi
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ " p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
trigger_pipelines="Doris_DorisRegression_P0Regression ${trigger_pipelines}"
fi
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "nereids_p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
trigger_pipelines="Doris_DorisRegression_NereidsP0Regression ${trigger_pipelines}"
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ " pipelinex_p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
trigger_pipelines="Doris_DorisRegression_P0RegressionPipelineX ${trigger_pipelines}"
fi
if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "p1" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
trigger_pipelines="Doris_DorisRegression_P1Regression ${trigger_pipelines}"
Expand Down Expand Up @@ -93,6 +94,7 @@ jobs:
if [ -z "${trigger_pipelines}" ];then
echo "Just a general comment that doesn't match any pipeline rules"
fi
set +x
echo "need run pipelines: ${trigger_pipelines}"
Expand Down Expand Up @@ -124,6 +126,8 @@ jobs:
echo "the same pr_commit build already exist, this build ${build} is running or in queue!"
same_build_sign="true"
break
else
same_build_sign="false"
fi
done
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/be-ut-mac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ concurrency:
jobs:
run-ut:
name: BE UT (macOS)
runs-on: macos-12
runs-on: macos-13
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -81,7 +81,7 @@ jobs:
'node'
'llvm@16'
)
brew install "${cellars[@]}"
brew install "${cellars[@]}" || true
pushd thirdparty
branch="${{ github.base_ref }}"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ jobs:
run: |
git clone https://github.com/apache/doris-website.git website
cd website
echo "[\"current\"]" > versions.json
mkdir -p docs
cp -R ../docs/en/docs/* docs/
cp -R ../docs/sidebars.json sidebars.json
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ be/tags
be/test/olap/test_data/tablet_meta_test.hdr
be/.devcontainer/
be/src/apache-orc/
zoneinfo/

## tools
tools/ssb-tools/ssb-data/
Expand All @@ -121,3 +122,6 @@ lru_cache_test
/conf/log4j2-spring.xml
/fe/fe-core/src/test/resources/real-help-resource.zip
/ui/dist

# other
compile_commands.json
8 changes: 6 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ if (COMPILER_GCC)
add_compile_options(-fdiagnostics-color=always
-Wno-nonnull
-Wno-stringop-overread
-Wno-stringop-overflow)
-Wno-stringop-overflow
-Wno-array-bounds)
endif ()

if (COMPILER_CLANG)
Expand All @@ -278,6 +279,7 @@ if (COMPILER_CLANG)
-Wunused-member-function
-Wunused-macros)
add_compile_options(-Wno-vla-extension)
add_compile_options(-Wno-gnu-statement-expression)
if (USE_LIBCPP)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
if (NOT OS_MACOSX)
Expand Down Expand Up @@ -370,7 +372,7 @@ set(CXX_FLAGS_LSAN "${CXX_GCC_FLAGS} -O0 -fsanitize=leak -DLEAK_SANITIZER")

# Set the flags to the undefined behavior sanitizer, also known as "ubsan"
# Turn on sanitizer and debug symbols to get stack traces:
set(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -O0 -fno-wrapv -mcmodel=large -fsanitize=undefined -DUNDEFINED_BEHAVIOR_SANITIZER")
set(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -O0 -fno-wrapv -mcmodel=medium -fsanitize=undefined -DUNDEFINED_BEHAVIOR_SANITIZER")

# Set the flags to the thread sanitizer, also known as "tsan"
# Turn on sanitizer and debug symbols to get stack traces:
Expand Down Expand Up @@ -468,6 +470,7 @@ set(DORIS_LINK_LIBS
GeoType
Vec
Pipeline
Cloud
${WL_END_GROUP}
)

Expand Down Expand Up @@ -717,6 +720,7 @@ add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/cloud)

option(BUILD_META_TOOL "Build meta tool" OFF)
if (BUILD_META_TOOL)
Expand Down
2 changes: 2 additions & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ add_thirdparty(krb5)
add_thirdparty(com_err)
add_thirdparty(k5crypto)
add_thirdparty(gssapi_krb5)
add_thirdparty(dragonbox_to_chars LIB64)
target_include_directories(dragonbox_to_chars INTERFACE "${THIRDPARTY_DIR}/include/dragonbox-1.1.3")

if (OS_MACOSX)
add_thirdparty(bfd)
Expand Down
4 changes: 3 additions & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class BeExecVersionManager {
* d. elt function return type change to nullable(string)
* e. add repeat_max_num in repeat function
* 3: start from doris 2.1
* a. aggregation function do not serialize bitmap to string
* a. aggregation function do not serialize bitmap to string.
* b. array contains/position/countequal function return nullable in less situations.
* c. cleared old version of Version 2.
*/
inline const int BeExecVersionManager::max_be_exec_version = 3;
inline const int BeExecVersionManager::min_be_exec_version = 0;
Expand Down
33 changes: 22 additions & 11 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "agent/cgroup_cpu_ctl.h"

#include <fmt/format.h>

namespace doris {

Status CgroupCpuCtl::init() {
Expand Down Expand Up @@ -60,9 +62,7 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st
return Status::InternalError("open path failed, path={}", file_path);
}

std::stringstream ss;
ss << value << std::endl;
const std::string& str = ss.str();
auto str = fmt::format("{}\n", value);
int ret = write(fd, str.c_str(), str.size());
if (ret == -1) {
LOG(ERROR) << msg << " write sys file failed";
Expand All @@ -86,14 +86,25 @@ Status CgroupV1CpuCtl::init() {
}
}

// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
}
}

// quota path
_cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us";
_cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us";
// task path
_cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks";
_cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
LOG(INFO) << "cgroup v1 cpu path init success"
<< ", query path=" << _cgroup_v1_cpu_query_path
<< ", query quota path=" << _cgroup_v1_cpu_query_quota_path
<< ", query tasks path=" << _cgroup_v1_cpu_query_task_path
<< ", query tg path=" << _cgroup_v1_cpu_tg_path
<< ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file
<< ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
Expand All @@ -102,7 +113,7 @@ Status CgroupV1CpuCtl::init() {
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
std::string msg = "modify cpu quota value to " + std::to_string(val);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path, val, msg, false);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false);
}

Status CgroupV1CpuCtl::add_thread_to_cgroup() {
Expand All @@ -112,6 +123,6 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
int tid = static_cast<int>(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path, tid, msg, true);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
}
} // namespace doris
} // namespace doris
23 changes: 14 additions & 9 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ namespace doris {

class CgroupCpuCtl {
public:
CgroupCpuCtl() {}
virtual ~CgroupCpuCtl() {}
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }

virtual Status init();

Expand All @@ -50,6 +50,7 @@ class CgroupCpuCtl {
uint64_t _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id; // workload group id
};

/*
Expand All @@ -66,23 +67,27 @@ class CgroupCpuCtl {
4 doris query path
/sys/fs/cgroup/cpu/{doris_home}/query
5 doris query quota file:
/sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us
5 workload group path
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}
6 doris query tasks file:
/sys/fs/cgroup/cpu/{doris_home}/query/tasks
6 workload group quota file:
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.cfs_quota_us
7 workload group tasks file:
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
*/
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status add_thread_to_cgroup() override;

private:
// todo(wb) support load/compaction path
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_query_quota_path;
std::string _cgroup_v1_cpu_query_task_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
std::string _cgroup_v1_cpu_tg_task_file;
};

} // namespace doris
37 changes: 31 additions & 6 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Status_types.h>
Expand All @@ -29,6 +30,7 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <atomic>
#include <chrono> // IWYU pragma: keep
#include <ctime>
#include <functional>
Expand All @@ -48,6 +50,7 @@
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
Expand Down Expand Up @@ -1202,6 +1205,22 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else if (resource.__isset.hdfs_storage_param) {
Status st;
std::shared_ptr<io::HdfsFileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs);
} else {
fs = std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
}
if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name);
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else {
LOG(WARNING) << "unknown resource=" << resource;
}
Expand Down Expand Up @@ -1597,12 +1616,18 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % 10 == 0) {
static_cast<void>(StorageEngine::instance()->submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true));
LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
<< ", publised:" << tablet->publised_count;
int64_t published_count =
tablet->published_count.fetch_add(1, std::memory_order_relaxed);
if (published_count % 10 == 0) {
auto st = StorageEngine::instance()->submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
<< ", published=" << published_count << " : " << st;
} else {
LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
<< ", published:" << published_count;
}
}
} else {
LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
Expand Down
Loading

0 comments on commit 411bd70

Please sign in to comment.