Skip to content

Commit

Permalink
Merge branch 'master' into 20241111_fix_mutl_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Nov 18, 2024
2 parents 9a56d4f + c050f54 commit 89ad4b1
Show file tree
Hide file tree
Showing 209 changed files with 3,588 additions and 1,527 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ on:
pull_request_target:
types:
- closed
- labeled
branches:
- master
permissions:
Expand All @@ -30,7 +31,7 @@ permissions:
jobs:
auto_cherry_pick:
runs-on: ubuntu-latest
if: ${{ (contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) && github.event.pull_request.merged == true }}
if: ${{(contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') ||github.event.label.name == 'dev/3.0.x' || github.event.label.name == 'dev/2.1.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand All @@ -54,15 +55,15 @@ jobs:
echo "SHA matches: $calculated_sha"
fi
- name: Auto cherry-pick to branch-3.0
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/3.0.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/3.0.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
- name: Auto cherry-pick to branch-2.1
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/2.1.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class [[nodiscard]] Status {
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR)
#undef ERROR_CTOR

template <int code>
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/lzo_decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in
ptr = get_uint32(ptr, &uncompressed_size);
left_input_len -= sizeof(uint32_t);
if (uncompressed_size == 0) {
*input_bytes_read += sizeof(uint32_t);
*stream_end = true;
return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/tablet_info.h"

#include <butil/logging.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Partitions_types.h>
Expand Down Expand Up @@ -180,6 +181,17 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
is_null_str);
if (it == std::end(slots_map)) {
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\npschema={}",
pcolumn_desc.name(), pcolumn_desc.type(), data_type_str, is_null_str,
keys, debug_string(), pschema.ShortDebugString());

return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
Expand Down Expand Up @@ -286,6 +298,18 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
is_null_str);
if (it == slots_map.end()) {
std::stringstream ss;
ss << tschema;
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\ntschema={}",
tcolumn_desc.column_name, tcolumn_desc.column_type.type, data_type_str,
is_null_str, keys, debug_string(), ss.str());
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
Expand Down
38 changes: 0 additions & 38 deletions be/src/exprs/bitmapfilter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ namespace doris {
// only used in Runtime Filter
class BitmapFilterFuncBase : public RuntimeFilterFuncBase {
public:
virtual void insert(const void* data) = 0;
virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 0;
virtual bool empty() = 0;
virtual Status assign(BitmapValue* bitmap_value) = 0;
virtual void light_copy(BitmapFilterFuncBase* other) { _not_in = other->_not_in; }
virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap,
uint16_t* offsets, int number) = 0;
virtual void find_batch(const char* data, const uint8* nullmap, size_t number,
Expand All @@ -58,8 +54,6 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {

~BitmapFilterFunc() override = default;

void insert(const void* data) override;

void insert_many(const std::vector<const BitmapValue*>& bitmaps) override;

uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets,
Expand All @@ -68,45 +62,21 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {
void find_batch(const char* data, const uint8* nullmap, size_t number,
uint8* results) const override;

bool empty() override { return _bitmap_value->empty(); }

Status assign(BitmapValue* bitmap_value) override {
*_bitmap_value = *bitmap_value;
return Status::OK();
}

void light_copy(BitmapFilterFuncBase* bitmapfilter_func) override;

size_t size() const override { return _bitmap_value->cardinality(); }

uint64_t max() { return _bitmap_value->max(nullptr); }

uint64_t min() { return _bitmap_value->min(nullptr); }

bool contains_any(CppType left, CppType right) {
if (right < 0) {
return false;
}
return _bitmap_value->contains_any(std::max(left, (CppType)0), right);
}

std::shared_ptr<BitmapValue> get_inner_bitmap() { return _bitmap_value; }

private:
std::shared_ptr<BitmapValue> _bitmap_value;

bool find(CppType data) const { return _not_in ^ (data >= 0 && _bitmap_value->contains(data)); }
};

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert(const void* data) {
if (data == nullptr) {
return;
}

*_bitmap_value |= *reinterpret_cast<const BitmapValue*>(data);
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*>& bitmaps) {
if (bitmaps.empty()) {
Expand Down Expand Up @@ -147,12 +117,4 @@ void BitmapFilterFunc<type>::find_batch(const char* data, const uint8* nullmap,
}
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::light_copy(BitmapFilterFuncBase* bitmapfilter_func) {
BitmapFilterFuncBase::light_copy(bitmapfilter_func);
auto other_func = reinterpret_cast<BitmapFilterFunc*>(bitmapfilter_func);
_bitmap_value = other_func->_bitmap_value;
set_filter_id(bitmapfilter_func->get_filter_id());
}

} // namespace doris
24 changes: 10 additions & 14 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include "common/exception.h"
#include "common/status.h"
#include "exprs/hybrid_set.h"
#include "exprs/minmax_predicate.h"
#include "function_filter.h"
Expand Down Expand Up @@ -244,12 +246,9 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
int be_exec_version, const TabletColumn*) {
if constexpr (PT == TYPE_TINYINT || PT == TYPE_SMALLINT || PT == TYPE_INT ||
PT == TYPE_BIGINT) {
std::shared_ptr<BitmapFilterFuncBase> filter_olap;
filter_olap.reset(create_bitmap_filter(PT));
filter_olap->light_copy(filter.get());
return new BitmapFilterColumnPredicate<PT>(column_id, filter, be_exec_version);
} else {
return nullptr;
throw Exception(ErrorCode::INTERNAL_ERROR, "bitmap filter do not support type {}", PT);
}
}

Expand All @@ -266,17 +265,14 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<FunctionFilter>& filter, int,
const TabletColumn* column = nullptr) {
// currently only support like predicate
if constexpr (PT == TYPE_CHAR || PT == TYPE_VARCHAR || PT == TYPE_STRING) {
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id,
filter->_fn_ctx, filter->_string_param);
}
} else {
return nullptr;
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else if constexpr (PT == TYPE_VARCHAR || PT == TYPE_STRING) {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
}
throw Exception(ErrorCode::INTERNAL_ERROR, "function filter do not support type {}", PT);
}

template <typename T>
Expand Down
23 changes: 15 additions & 8 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::publish(bool publish_local) {
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
DCHECK(is_producer());

auto send_to_remote = [&](IRuntimeFilter* filter) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return filter->push_to_remote(&addr);
return filter->push_to_remote(state, &addr);
};
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) {
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
Expand Down Expand Up @@ -1088,8 +1088,10 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context)
: Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {}
RuntimeFilterContextSPtr rf_context, std::weak_ptr<QueryContext> context)
: Base(req, callback, context),
_dependency(std::move(dependency)),
_rf_context(rf_context) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1133,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure =
SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context);
auto closure = SyncSizeClosure::create_unique(
request, callback, _dependency, _wrapper->_context,
state->query_options().ignore_runtime_filter_error ? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand All @@ -1157,7 +1161,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
return Status::OK();
}

Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
Expand All @@ -1170,7 +1174,10 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
create_unique(merge_filter_request, merge_filter_callback);
create_unique(merge_filter_request, merge_filter_callback,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
void* data = nullptr;
int len = 0;

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class IRuntimeFilter {

// publish filter
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);
Status publish(RuntimeState* state, bool publish_local = false);

Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);

Expand Down Expand Up @@ -293,7 +293,7 @@ class IRuntimeFilter {
bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(const TNetworkAddress* addr);
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);

void init_profile(RuntimeProfile* parent_profile);

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local) {
Status publish(RuntimeState* state, bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
RETURN_IF_ERROR(filter->publish(state, publish_local));
}
}
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_slots_cross.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class VRuntimeFilterSlotsCross {
return Status::OK();
}

Status publish() {
Status publish(RuntimeState* state) {
for (auto filter : _runtime_filters) {
RETURN_IF_ERROR(filter->publish());
RETURN_IF_ERROR(filter->publish(state));
}
return Status::OK();
}
Expand Down
22 changes: 18 additions & 4 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "http/http_headers.h"
#include "http/http_status.h"
#include "runtime/exec_env.h"
#include "util/security.h"
#include "util/stack_util.h"

namespace doris {
Expand Down Expand Up @@ -205,9 +206,11 @@ Status HttpClient::execute(const std::function<bool(const void* data, size_t len
_callback = &callback;
auto code = curl_easy_perform(_curl);
if (code != CURLE_OK) {
std::string url = mask_token(_get_url());
LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code)
<< ", trace=" << get_stack_trace();
return Status::HttpError(_to_errmsg(code));
<< ", trace=" << get_stack_trace() << ", url=" << url;
std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url);
return Status::HttpError(std::move(errmsg));
}
return Status::OK();
}
Expand Down Expand Up @@ -275,13 +278,22 @@ Status HttpClient::execute(std::string* response) {
return execute(callback);
}

const char* HttpClient::_to_errmsg(CURLcode code) {
const char* HttpClient::_to_errmsg(CURLcode code) const {
if (_error_buf[0] == 0) {
return curl_easy_strerror(code);
}
return _error_buf;
}

const char* HttpClient::_get_url() const {
const char* url = nullptr;
curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url);
if (!url) {
url = "<unknown>";
}
return url;
}

Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
const std::function<Status(HttpClient*)>& callback) {
Status status;
Expand All @@ -293,7 +305,9 @@ Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
if (http_status == 200) {
return status;
} else {
auto error_msg = fmt::format("http status code is not 200, code={}", http_status);
std::string url = mask_token(client._get_url());
auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
http_status, url);
LOG(WARNING) << error_msg;
return Status::HttpError(error_msg);
}
Expand Down
Loading

0 comments on commit 89ad4b1

Please sign in to comment.