Skip to content

Commit

Permalink
Merge branch 'master' into 20240522_fix_reserve
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored May 23, 2024
2 parents dffa495 + 015d925 commit d884a68
Show file tree
Hide file tree
Showing 224 changed files with 3,887 additions and 1,835 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController*
// If the vaults containing hdfs vault then it would try to create hdfs connection using jni
// which would acuiqre one thread local jniEnv. But bthread context can't guarantee that the brpc
// worker thread wouldn't do bthread switch between worker threads.
bool ret = _heavy_work_pool.try_offer([&]() {
bool ret = _heavy_work_pool.try_offer([this, done]() {
brpc::ClosureGuard closure_guard(done);
_engine.sync_storage_vault();
});
Expand Down
11 changes: 11 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,17 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
add_obj_store(vault.obj_info());
}
});

for (int i = 0; i < resp.obj_info_size(); ++i) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
auto j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}

LOG(INFO) << "get storage vault response: " << resp.ShortDebugString();
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ void CloudStorageEngine::sync_storage_vault() {
}

if (vault_infos.empty()) {
LOG(WARNING) << "no storage vault info";
LOG(WARNING) << "empty storage vault info";
return;
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ DEFINE_mBool(disable_storage_page_cache, "false");
DEFINE_mBool(disable_storage_row_cache, "true");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");
DEFINE_Bool(enable_non_pipeline, "false");

// Cache for mow primary key storage page size
DEFINE_String(pk_storage_page_cache_limit, "10%");
Expand Down Expand Up @@ -1075,6 +1076,7 @@ DEFINE_mBool(enable_delete_when_cumu_compaction, "false");
// max_write_buffer_number for rocksdb
DEFINE_Int32(rocksdb_max_write_buffer_number, "5");

DEFINE_mBool(allow_zero_date, "false");
DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
DEFINE_mString(kerberos_ccache_path, "");
DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ DECLARE_Bool(disable_storage_page_cache);
DECLARE_mBool(disable_storage_row_cache);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);
DECLARE_Bool(enable_non_pipeline);

// Cache for mow primary key storage page size, it's seperated from
// storage_page_cache_limit
Expand Down Expand Up @@ -1121,6 +1122,8 @@ DECLARE_mBool(enable_delete_when_cumu_compaction);
// max_write_buffer_number for rocksdb
DECLARE_Int32(rocksdb_max_write_buffer_number);

// Convert date 0000-00-00 to 0000-01-01. It's recommended to set to false.
DECLARE_mBool(allow_zero_date);
// Allow invalid decimalv2 literal for compatible with old version. Recommend set it false strongly.
DECLARE_mBool(allow_invalid_decimalv2_literal);
// Allow to specify kerberos credentials cache path.
Expand Down
6 changes: 2 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ Status IRuntimeFilter::publish(bool publish_local) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return filter->push_to_remote(&addr, _opt_remote_rf);
return filter->push_to_remote(&addr);
};
auto send_to_local = [&](RuntimePredicateWrapper* wrapper) {
std::vector<IRuntimeFilter*> filters;
Expand Down Expand Up @@ -1091,7 +1091,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
return Status::OK();
}

Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf) {
Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
Expand All @@ -1117,7 +1117,6 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo
pfragment_instance_id->set_lo((int64_t)this);

merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_opt_remote_rf(opt_remote_rf);
merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
Expand Down Expand Up @@ -1308,7 +1307,6 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
_has_local_target = desc->has_local_targets;
_has_remote_target = desc->has_remote_targets;
_expr_order = desc->expr_order;
_opt_remote_rf = desc->__isset.opt_remote_rf && desc->opt_remote_rf;
vectorized::VExprContextSPtr build_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, build_ctx));

Expand Down
3 changes: 1 addition & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class IRuntimeFilter {
bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf);
Status push_to_remote(const TNetworkAddress* addr);

void init_profile(RuntimeProfile* parent_profile);

Expand Down Expand Up @@ -444,7 +444,6 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
bool _opt_remote_rf;
// `_need_local_merge` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE before push_to_remote or publish.
bool _need_local_merge = false;
Expand Down
12 changes: 6 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
return plan_status;
}
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
ctx->db = ctx->put_result.pipeline_params.db_name;
ctx->table = ctx->put_result.pipeline_params.table_name;
ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id;
ctx->label = ctx->put_result.pipeline_params.import_label;
ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id);
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
// FIXME find a way to avoid chunked stream load write large WALs
size_t content_length = 0;
Expand All @@ -354,7 +354,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
content_length *= 3;
}
}
ctx->put_result.params.__set_content_length(content_length);
ctx->put_result.pipeline_params.__set_content_length(content_length);
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
Expand Down
4 changes: 2 additions & 2 deletions be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ DEFINE_string(pred_type, "", "inverted index term query predicate, eq/lt/gt/le/g
DEFINE_bool(print_row_id, false, "print row id when query terms");
DEFINE_bool(print_doc_id, false, "print doc id when check terms stats");
// only for debug index compaction
DEFINE_int32(idx_id, -1, "inverted index id");
DEFINE_int64(idx_id, -1, "inverted index id");
DEFINE_string(src_idx_dirs_file, "", "source segment index files");
DEFINE_string(dest_idx_dirs_file, "", "destination segment index files");
DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of rows");
Expand Down Expand Up @@ -406,7 +406,7 @@ int main(int argc, char** argv) {
return true;
};

int32_t index_id = FLAGS_idx_id;
int64_t index_id = FLAGS_idx_id;
std::string tablet_path = FLAGS_tablet_path;
std::string src_index_dirs_string;
std::string dest_index_dirs_string;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
if (UNLIKELY(segment_caches[i] == nullptr)) {
segment_caches[i] = std::make_unique<SegmentCacheHandle>();
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::static_pointer_cast<BetaRowset>(rs), segment_caches[i].get(), true));
std::static_pointer_cast<BetaRowset>(rs), segment_caches[i].get(), true, true));
}
auto& segments = segment_caches[i]->get_segments();
DCHECK_EQ(segments.size(), num_segments);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ Status BaseBetaRowsetWriter::flush() {

Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
SCOPED_SKIP_MEMORY_CHECK();
if (block->rows() == 0) {
return Status::OK();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatist

Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
SCOPED_SKIP_MEMORY_CHECK();
if (block->rows() == 0) {
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct RowsetWriterContext {

int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
// store column_unique_id to skip write inverted index
std::set<int32_t> skip_inverted_index;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
BaseTabletSPtr tablet = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ const std::string InvertedIndexDescriptor::index_suffix = ".idx";
const std::string InvertedIndexDescriptor::index_name_separator = "_";

std::string InvertedIndexDescriptor::get_temporary_index_path(
const std::string& segment_path, uint32_t uuid, const std::string& index_suffix_path) {
const std::string& segment_path, uint64_t uuid, const std::string& index_suffix_path) {
std::string suffix = index_suffix_path.empty() ? "" : "@" + index_suffix_path;
return StripSuffixString(segment_path, segment_suffix) + index_name_separator +
std::to_string(uuid) + suffix;
}

std::string InvertedIndexDescriptor::get_index_file_name(const std::string& segment_path,
uint32_t uuid,
uint64_t uuid,
const std::string& index_suffix_path) {
std::string suffix = index_suffix_path.empty() ? "" : "@" + index_suffix_path;
return StripSuffixString(segment_path, segment_suffix) + index_name_separator +
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class InvertedIndexDescriptor {
static const std::string segment_suffix;
static const std::string index_suffix;
static const std::string index_name_separator;
static std::string get_temporary_index_path(const std::string& segment_path, uint32_t uuid,
static std::string get_temporary_index_path(const std::string& segment_path, uint64_t uuid,
const std::string& index_suffix_path);
static std::string get_index_file_name(const std::string& path, uint32_t uuid,
static std::string get_index_file_name(const std::string& path, uint64_t uuid,
const std::string& index_suffix_path);
static std::string get_index_file_name(const std::string& path);
static const std::string get_temporary_null_bitmap_file_name() { return "null_bitmap"; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexRea

virtual InvertedIndexReaderType type() = 0;

[[nodiscard]] uint32_t get_index_id() const { return _index_meta.index_id(); }
[[nodiscard]] uint64_t get_index_id() const { return _index_meta.index_id(); }

[[nodiscard]] const std::map<string, string>& get_index_properties() const {
return _index_meta.properties();
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,19 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
_doc->clear();
_CLDELETE(ts);
} else {
// avoid to add doc which without any field which may make threadState init skip
// init fieldDataArray, then will make error with next doc with fields in
// resetCurrentFieldData
if (Status st = create_field(&new_field); st != Status::OK()) {
LOG(ERROR)
<< "create field " << string(_field_name.begin(), _field_name.end())
<< " error:" << st;
return st;
}
_doc->add(*new_field);
RETURN_IF_ERROR(add_null_document());
_doc->clear();
_CLDELETE(ts);
}
_rid++;
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,15 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
}

Status Segment::_load_pk_bloom_filter() {
#ifdef BE_TEST
if (_pk_index_meta == nullptr) {
// for BE UT "segment_cache_test"
return _load_pk_bf_once.call([this] {
_meta_mem_usage += 100;
return Status::OK();
});
}
#endif
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
DCHECK(_pk_index_meta != nullptr);
DCHECK(_pk_index_reader != nullptr);
Expand Down Expand Up @@ -312,6 +321,7 @@ Status Segment::load_pk_index_and_bf() {
RETURN_IF_ERROR(_load_pk_bloom_filter());
return Status::OK();
}

Status Segment::load_index() {
auto status = [this]() { return _load_index_impl(); }();
if (!status.ok()) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/segment_loader.h"

#include "common/config.h"
#include "common/status.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -50,7 +51,8 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
}

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache) {
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf) {
if (cache_handle->is_inited()) {
return Status::OK();
}
Expand All @@ -61,9 +63,13 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
}
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
}
if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
auto* cache_value = new SegmentCache::CacheValue();
_cache_mem_usage += segment->meta_mem_usage();
cache_value->segment = std::move(segment);
_segment_cache->insert(cache_key, *cache_value, cache_handle);
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,20 @@ class SegmentLoader {
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false);
bool use_cache = false, bool need_load_pk_index_and_bf = false);

void erase_segment(const SegmentCache::CacheKey& key);

void erase_segments(const RowsetId& rowset_id, int64_t num_segments);

// Just used for BE UT
int64_t cache_mem_usage() const { return _cache_mem_usage; }

private:
SegmentLoader();
std::unique_ptr<SegmentCache> _segment_cache;
// Just used for BE UT
int64_t _cache_mem_usage = 0;
};

// A handle for a single rowset from segment lru cache.
Expand Down
58 changes: 58 additions & 0 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "pipeline/exec/group_commit_scan_operator.h"

#include <fmt/format.h>

namespace doris::pipeline {

GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id, const DescriptorTbl& descs,
int parallel_tasks)
: ScanOperatorX<GroupCommitLocalState>(pool, tnode, operator_id, descs, parallel_tasks),
_table_id(tnode.group_commit_scan_node.table_id) {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}

Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos));
}
return Status::OK();
}

Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::init(state, info));
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<GroupCommitOperatorX>();
return state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue);
}

Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::_process_conjuncts(state));
if (_eos) {
return Status::OK();
}
// TODO: Push conjuncts down to reader.
return Status::OK();
}

} // namespace doris::pipeline
Loading

0 comments on commit d884a68

Please sign in to comment.