Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
koarz authored Nov 20, 2024
2 parents 29255ac + 6898b77 commit b9da63b
Show file tree
Hide file tree
Showing 288 changed files with 17,348 additions and 1,951 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
CONFLICT_LABEL: dev/3.0.x-conflict
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
- name: Auto cherry-pick to branch-2.1
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/2.1.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-2.1.x
CONFLICT_LABEL: dev/2.1.x-conflict
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-2.1
61 changes: 38 additions & 23 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ message(STATUS "THIRDPARTY_DIR is ${THIRDPARTY_DIR}")

option(MAKE_TEST "ON for make unit test or OFF for not" OFF)
message(STATUS "make test: ${MAKE_TEST}")
option(BUILD_BENCHMARK "ON for make google benchmark or OFF for not" OFF)
message(STATUS "make benchmark: ${BUILD_BENCHMARK}")

option(WITH_MYSQL "Support access MySQL" ON)

Expand Down Expand Up @@ -568,7 +570,7 @@ if (OS_MACOSX)
)
endif()

if (MAKE_TEST)
if (BUILD_BENCHMARK)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
benchmark
Expand Down Expand Up @@ -708,6 +710,11 @@ if (MAKE_TEST)
endif()
endif ()

# use this to avoid some runtime tracker. reuse BE_TEST symbol, no need another.
if (BUILD_BENCHMARK)
add_definitions(-DBE_TEST)
endif()

get_directory_property(COMPILER_FLAGS COMPILE_OPTIONS)
get_directory_property(COMPILER_DEFINES COMPILE_DEFINITIONS)
message(STATUS "Compiler: ${CMAKE_CXX_COMPILER_ID}-${CMAKE_CXX_COMPILER_VERSION}")
Expand Down Expand Up @@ -754,7 +761,7 @@ add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/service) # this include doris_be
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/cloud)

Expand All @@ -772,36 +779,44 @@ add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/vec)
add_subdirectory(${SRC_DIR}/pipeline)

# this include doris_be_test
if (MAKE_TEST)
add_subdirectory(${TEST_DIR})
endif ()

add_subdirectory(${COMMON_SRC_DIR}/cpp ${BUILD_DIR}/src/common_cpp)

# Install be
install(DIRECTORY DESTINATION ${OUTPUT_DIR})
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/bin)
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/conf)

install(FILES
${BASE_DIR}/../bin/start_be.sh
${BASE_DIR}/../bin/stop_be.sh
${BASE_DIR}/../tools/jeprof
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE
GROUP_READ GROUP_WRITE GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
DESTINATION ${OUTPUT_DIR}/bin)

install(FILES
${BASE_DIR}/../conf/be.conf
${BASE_DIR}/../conf/odbcinst.ini
${BASE_DIR}/../conf/asan_suppr.conf
${BASE_DIR}/../conf/lsan_suppr.conf
DESTINATION ${OUTPUT_DIR}/conf)
if(NOT BUILD_BENCHMARK)
# Install be
install(DIRECTORY DESTINATION ${OUTPUT_DIR})
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/bin)
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/conf)

install(FILES
${BASE_DIR}/../bin/start_be.sh
${BASE_DIR}/../bin/stop_be.sh
${BASE_DIR}/../tools/jeprof
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE
GROUP_READ GROUP_WRITE GROUP_EXECUTE
WORLD_READ WORLD_EXECUTE
DESTINATION ${OUTPUT_DIR}/bin)

install(FILES
${BASE_DIR}/../conf/be.conf
${BASE_DIR}/../conf/odbcinst.ini
${BASE_DIR}/../conf/asan_suppr.conf
${BASE_DIR}/../conf/lsan_suppr.conf
DESTINATION ${OUTPUT_DIR}/conf)
endif()

get_property(dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES)
foreach(dir ${dirs})
message(STATUS "dir='${dir}'")
endforeach()


if (BUILD_BENCHMARK)
add_executable(benchmark_test ${BASE_DIR}/benchmark/benchmark_main.cpp)
target_link_libraries(benchmark_test ${DORIS_LINK_LIBS})
message(STATUS "Add benchmark to build")
install(TARGETS benchmark_test DESTINATION ${OUTPUT_DIR}/lib)
endif()
52 changes: 52 additions & 0 deletions be/benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <benchmark/benchmark.h>

#include <string>

#include "vec/columns/column_string.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_string.h"

namespace doris::vectorized { // change if need

static void Example1(benchmark::State& state) {
// init. dont time it.
state.PauseTiming();
Block block;
DataTypePtr str_type = std::make_shared<DataTypeString>();
std::vector<std::string> vals {100, "content"};
state.ResumeTiming();

// do test
for (auto _ : state) {
auto str_col = ColumnString::create();
for (auto& v : vals) {
str_col->insert_data(v.data(), v.size());
}
block.insert({std::move(str_col), str_type, "col"});
benchmark::DoNotOptimize(block); // mark the watched target
}
}
// could BENCHMARK many functions to compare them together.
BENCHMARK(Example1);

} // namespace doris::vectorized

BENCHMARK_MAIN();
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// If there are historical versions of rowsets, we need to recalculate their delete
// bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets
int64_t start_calc_delete_bitmap_version =
already_exist_any_version ? 0 : sc_job->alter_version() + 1;
// [0-1] is a placeholder rowset, start from 2.
already_exist_any_version ? 2 : sc_job->alter_version() + 1;
RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
start_calc_delete_bitmap_version, initiator));
sc_job->set_delete_bitmap_lock_initiator(initiator);
Expand Down
25 changes: 23 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DEFINE_mString(public_host, "");

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
28 changes: 23 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP.
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);
DECLARE_Int32(public_access_port);
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DECLARE_mString(public_host);

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DECLARE_Int32(arrow_flight_sql_proxy_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
{"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
Expand Down
25 changes: 13 additions & 12 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,8 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly, bool need_local_merge) {
*res = std::make_shared<IRuntimeFilter>(state, desc, need_local_merge);
bool build_bf_exactly) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}
Expand Down Expand Up @@ -1326,10 +1326,10 @@ bool IRuntimeFilter::get_ignored() {

std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}]",
_is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target,
_wrapper->_context->ignored);
_filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target,
_has_local_target, _wrapper->_context->ignored);
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
Expand All @@ -1355,18 +1355,19 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size
? options->runtime_bloom_filter_max_size
: 0;
// We build runtime filter by exact distinct count iff three conditions are met:
auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size;
// We build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. Only 1 join key
// 2. Do not have remote target (e.g. do not need to merge), or broadcast join
// 3. Bloom filter
// 2. Bloom filter
// 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join).
params.build_bf_exactly =
build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);

params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv;

if (!desc->__isset.sync_filter_size || !desc->sync_filter_size) {
params.build_bf_exactly &= (!_has_remote_target || _is_broadcast_join);
if (!sync_filter_size) {
params.build_bf_exactly &= !_is_broadcast_join;
}

if (desc->__isset.bloom_filter_size_bytes) {
Expand Down Expand Up @@ -1523,9 +1524,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, "
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join,
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
}

Expand Down
15 changes: 5 additions & 10 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ enum RuntimeFilterState {
/// that can be pushed down to node based on the results of the right table.
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
bool need_local_merge = false)
IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc)
: _state(state),
_filter_id(desc->filter_id),
_is_broadcast_join(true),
Expand All @@ -206,17 +205,16 @@ class IRuntimeFilter {
_wait_infinitely(_state->get_query_ctx()->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(_state->get_query_ctx()->runtime_filter_wait_time_ms()),
_runtime_filter_type(get_runtime_filter_type(desc)),
_profile(
new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id, to_string(_runtime_filter_type)))),
_need_local_merge(need_local_merge) {}
_profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
to_string(_runtime_filter_type)))) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false, bool need_local_merge = false);
bool build_bf_exactly = false);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -417,9 +415,6 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
// `_need_local_merge` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE before push_to_remote or publish.
bool _need_local_merge = false;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,10 @@ Status BaseTablet::check_rowid_conversion(
VLOG_DEBUG << "check_rowid_conversion, location_map is empty";
return Status::OK();
}
if (!tablet_schema()->cluster_key_idxes().empty()) {
VLOG_DEBUG << "skip check_rowid_conversion for mow tables with cluster keys";
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;

RETURN_IF_ERROR(
Expand Down
Loading

0 comments on commit b9da63b

Please sign in to comment.