Skip to content

Commit

Permalink
Merge branch 'master' into 20240730_fix_arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Aug 15, 2024
2 parents 73ce62d + a55b61c commit 8a448e6
Show file tree
Hide file tree
Showing 63 changed files with 1,112 additions and 188 deletions.
33 changes: 31 additions & 2 deletions .github/workflows/comment-to-trigger-teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ jobs:
check-comment-if-need-to-trigger-teamcity:

# This job only runs for pull request comments, and comment body contains 'run'
if: ${{ github.event.issue.pull_request && contains(github.event.comment.body, 'run') }}
if: ${{ github.event.issue.pull_request && (contains(github.event.comment.body, 'run') || contains(github.event.comment.body, 'skip buildall')) }}

runs-on: ubuntu-latest
env:
COMMENT_BODY: ${{ github.event.comment.body }}
COMMENT_USER_ID: ${{ github.event.comment.user.id }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

steps:
Expand All @@ -50,8 +51,20 @@ jobs:
"${COMMENT_BODY}" == *'run arm'* ||
"${COMMENT_BODY}" == *'run performance'* ]]; then
echo "comment_trigger=true" | tee -a "$GITHUB_OUTPUT"
echo "comment_skip=false" | tee -a "$GITHUB_OUTPUT"
elif [[ "${COMMENT_BODY}" == *'skip buildall'* ]]; then
if [[ "${COMMENT_USER_ID}" == '27881198' ||
"${COMMENT_USER_ID}" == '37901441' ]]; then
echo "comment_trigger=false" | tee -a "$GITHUB_OUTPUT"
echo "comment_skip=true" | tee -a "$GITHUB_OUTPUT"
echo "COMMENT_USER_ID ${COMMENT_USER_ID} is allowed to skip buildall."
else
echo "COMMENT_USER_ID ${COMMENT_USER_ID} is not allowed to skip buildall."
exit
fi
else
echo "comment_trigger=false" | tee -a "$GITHUB_OUTPUT"
echo "comment_skip=false" | tee -a "$GITHUB_OUTPUT"
echo "find no keyword in comment body, skip this action."
exit
fi
Expand All @@ -71,7 +84,7 @@ jobs:
echo "COMMENT_REPEAT_TIMES=${COMMENT_REPEAT_TIMES}" | tee -a "$GITHUB_OUTPUT"
- name: "Checkout master"
if: ${{ fromJSON(steps.parse.outputs.comment_trigger) }}
if: ${{ fromJSON(steps.parse.outputs.comment_trigger) || fromJSON(steps.parse.outputs.comment_skip) }}
uses: actions/checkout@v4

- name: "Check if pr need run build"
Expand Down Expand Up @@ -364,3 +377,19 @@ jobs:
"performance" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
fi
- name: "Skip buildall"
if: ${{ fromJSON(steps.parse.outputs.comment_skip) }}
run: |
source ./regression-test/pipeline/common/teamcity-utils.sh
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" feut
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" beut
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" compile
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" p0
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" p1
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" external
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" performance
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" arm
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloud_p0
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloud_p1
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloudut
4 changes: 3 additions & 1 deletion .github/workflows/labeler/scope-label-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ meta-change:
- gensrc/proto/*

doing:
- '**'
- base-branch: 'master'
- changed-files:
- any-glob-to-any-file: '**'
12 changes: 11 additions & 1 deletion be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ CloudWarmUpManager::~CloudWarmUpManager() {
}
}

std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
};
constexpr bool include_stale = false;
tablet->traverse_rowsets(visitor, include_stale);
return id_to_rowset_meta_map;
}

void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
Expand Down Expand Up @@ -78,7 +88,7 @@ void CloudWarmUpManager::handle_jobs() {
std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);
auto tablet_meta = tablet->tablet_meta();
auto rs_metas = tablet_meta->snapshot_rs_metas();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
auto storage_resource = rs->remote_storage_resource();
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
}

_bloom_filter_alloced = data_size;
_inited = true;
return _bloom_filter->init(data, data_size);
}

Expand Down
12 changes: 11 additions & 1 deletion be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ void FileCacheBlockDownloader::check_download_task(const std::vector<int64_t>& t
}
}

std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
};
constexpr bool include_stale = false;
tablet->traverse_rowsets(visitor, include_stale);
return id_to_rowset_meta_map;
}

void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
Expand All @@ -141,7 +151,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
tablet = std::move(res).value();
}

auto id_to_rowset_meta_map = tablet->tablet_meta()->snapshot_rs_metas();
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
return;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,11 @@ Status BloomFilterIndexWriter::create(const BloomFilterOptions& bf_options,
M(FieldType::OLAP_FIELD_TYPE_DECIMAL64)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL128I)
M(FieldType::OLAP_FIELD_TYPE_DECIMAL256)
M(FieldType::OLAP_FIELD_TYPE_IPV4)
M(FieldType::OLAP_FIELD_TYPE_IPV6)
#undef M
default:
return Status::NotSupported("unsupported type for bitmap index: {}",
return Status::NotSupported("unsupported type for bloom filter index: {}",
std::to_string(int(type)));
}
return Status::OK();
Expand Down
12 changes: 0 additions & 12 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ class TabletMeta {
void revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap);

const std::vector<RowsetMetaSharedPtr>& all_stale_rs_metas() const;
// return the snapshot of rowset_meta
// the return value is map<rowset_id_str, rowset_meta_sptr>
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas() const;
RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version) const;
void delete_stale_rs_meta_by_version(const Version& version);
RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version& version) const;
Expand Down Expand Up @@ -698,15 +695,6 @@ inline bool TabletMeta::all_beta() const {
return true;
}

inline std::unordered_map<std::string, RowsetMetaSharedPtr> TabletMeta::snapshot_rs_metas() const {
std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
std::shared_lock rlock(_meta_lock);
std::for_each(_rs_metas.cbegin(), _rs_metas.cend(), [&](const auto& rowset_meta) {
id_to_rowset_meta_map.emplace(rowset_meta->rowset_id().to_string(), rowset_meta);
});
return id_to_rowset_meta_map;
}

std::string tablet_state_name(TabletState state);

// Only for unit test now.
Expand Down
5 changes: 3 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2063,10 +2063,11 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont
st = Status::Error(ErrorCode::INTERNAL_ERROR,
"_exec_plan_fragment_impl meet unknown error");
}
closure_guard.release();
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id)
<< ", errmsg=" << st;
} else {
closure_guard.release();
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/functions/function_regexp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ struct RegexpReplaceOneImpl {
}
};

template <bool ReturnNull>
struct RegexpExtractImpl {
static constexpr auto name = "regexp_extract";
static constexpr auto name = ReturnNull ? "regexp_extract_or_null" : "regexp_extract";
// 3 args
static void execute_impl(FunctionContext* context, ColumnPtr argument_columns[],
size_t input_rows_count, ColumnString::Chars& result_data,
Expand All @@ -201,7 +202,8 @@ struct RegexpExtractImpl {
}
const auto& index_data = index_col->get_int(i);
if (index_data < 0) {
StringOP::push_empty_string(i, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(i, result_data, result_offset, null_map)
: StringOP::push_empty_string(i, result_data, result_offset);
continue;
}
_execute_inner_loop<false>(context, str_col, pattern_col, index_data, result_data,
Expand All @@ -220,7 +222,8 @@ struct RegexpExtractImpl {
const auto& index_data = index_col->get_int(0);
if (index_data < 0) {
for (size_t i = 0; i < input_rows_count; ++i) {
StringOP::push_empty_string(i, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(i, result_data, result_offset, null_map)
: StringOP::push_empty_string(i, result_data, result_offset);
}
return;
}
Expand Down Expand Up @@ -260,15 +263,17 @@ struct RegexpExtractImpl {

int max_matches = 1 + re->NumberOfCapturingGroups();
if (index_data >= max_matches) {
StringOP::push_empty_string(index_now, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(index_now, result_data, result_offset, null_map)
: StringOP::push_empty_string(index_now, result_data, result_offset);
return;
}

std::vector<re2::StringPiece> matches(max_matches);
bool success =
re->Match(str_sp, 0, str.size, re2::RE2::UNANCHORED, &matches[0], max_matches);
if (!success) {
StringOP::push_empty_string(index_now, result_data, result_offset);
ReturnNull ? StringOP::push_null_string(index_now, result_data, result_offset, null_map)
: StringOP::push_empty_string(index_now, result_data, result_offset);
return;
}
const re2::StringPiece& match = matches[index_data];
Expand Down Expand Up @@ -486,7 +491,8 @@ class FunctionRegexp : public IFunction {

void register_function_regexp_extract(SimpleFunctionFactory& factory) {
factory.register_function<FunctionRegexp<RegexpReplaceImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl<true>>>();
factory.register_function<FunctionRegexp<RegexpExtractImpl<false>>>();
factory.register_function<FunctionRegexp<RegexpReplaceOneImpl>>();
factory.register_function<FunctionRegexp<RegexpExtractAllImpl>>();
}
Expand Down
39 changes: 39 additions & 0 deletions be/test/vec/function/function_like_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,45 @@ TEST(FunctionLikeTest, regexp_extract) {
}
}

TEST(FunctionLikeTest, regexp_extract_or_null) {
std::string func_name = "regexp_extract_or_null";

DataSet data_set = {{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("x=([0-9]+)([a-z]+)"), (int64_t)0},
std::string("x=18abc")},
{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("^x=([a-z]+)([0-9]+)"), (int64_t)0},
std::string("x=a3")},
{{std::string("x=a3&x=18abc&x=2&y=3&x=4"),
std::string("^x=([a-z]+)([0-9]+)"), (int64_t)1},
std::string("a")},
{{std::string("http://a.m.baidu.com/i41915173660.htm"),
std::string("i([0-9]+)"), (int64_t)0},
std::string("i41915173660")},
{{std::string("http://a.m.baidu.com/i41915173660.htm"),
std::string("i([0-9]+)"), (int64_t)1},
std::string("41915173660")},

{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)0},
std::string("itde")},
{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)1},
std::string("i")},
{{std::string("hitdecisiondlist"), std::string("(i)(.*?)(e)"), (int64_t)2},
std::string("td")},
// null
{{std::string("abc"), Null(), (int64_t)0}, Null()},
{{Null(), std::string("i([0-9]+)"), (int64_t)0}, Null()}};

// pattern is constant value
InputTypeSet const_pattern_input_types = {TypeIndex::String, Consted {TypeIndex::String},
TypeIndex::Int64};
for (const auto& line : data_set) {
DataSet const_pattern_dataset = {line};
static_cast<void>(check_function<DataTypeString, true>(func_name, const_pattern_input_types,
const_pattern_dataset));
}
}

TEST(FunctionLikeTest, regexp_extract_all) {
std::string func_name = "regexp_extract_all";

Expand Down
14 changes: 13 additions & 1 deletion cloud/src/recycler/obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

#include "recycler/obj_storage_client.h"

#include <chrono>

#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
#include "recycler/util.h"

using namespace std::chrono;

namespace doris::cloud {

Expand All @@ -28,6 +31,9 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
size_t num_deleted_objects = 0;
auto start_time = steady_clock::now();

auto list_iter = list_objects(path);

ObjectStorageResponse ret;
Expand All @@ -42,6 +48,7 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
continue;
}

num_deleted_objects++;
keys.emplace_back(std::move(obj->key));
if (keys.size() < batch_size) {
continue;
Expand Down Expand Up @@ -70,6 +77,11 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
}
}

auto elapsed = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
<< " finished, ret=" << ret.ret << ", finished=" << finished
<< ", num_deleted_objects=" << num_deleted_objects << ", cost=" << elapsed << " ms";

ret = finished ? ret : -1;

return ret;
Expand Down
12 changes: 4 additions & 8 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
return {std::string_view(), range_move};
}
++num_recycled;
LOG_INFO("k is {}, is empty {}", k, k.empty());
return {k, range_move};
});
} else {
Expand All @@ -1157,10 +1156,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
}
return true;
}());
sync_executor.add([k]() mutable -> TabletKeyPair {
LOG_INFO("k is {}, is empty {}", k, k.empty());
return {k, true};
});
sync_executor.add([k]() mutable -> TabletKeyPair { return {k, true}; });
++num_recycled;
}
return 0;
Expand Down Expand Up @@ -1433,7 +1429,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {

std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle rowsets finished, cost={}s", cost)
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
});
Expand Down Expand Up @@ -1618,7 +1614,7 @@ int InstanceRecycler::recycle_rowsets() {
// old version `RecycleRowsetPB` may has empty resource_id, just remove the kv.
LOG(INFO) << "delete the recycle rowset kv that has empty resource_id, key="
<< hex(k) << " value=" << proto_to_json(rowset);
rowset_keys.push_back(std::string(k));
rowset_keys.emplace_back(k);
return -1;
}
// decode rowset_id
Expand Down Expand Up @@ -1664,7 +1660,7 @@ int InstanceRecycler::recycle_rowsets() {
return -1;
}
} else {
rowset_keys.push_back(std::string(k));
rowset_keys.emplace_back(k);
if (rowset_meta->num_segments() > 0) { // Skip empty rowset
rowsets.push_back(std::move(*rowset_meta));
}
Expand Down
Loading

0 comments on commit 8a448e6

Please sign in to comment.