Skip to content

Commit

Permalink
Merge branch 'master' into truncate_recycle
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 26, 2024
2 parents e783858 + 0bc3ea7 commit d916a66
Show file tree
Hide file tree
Showing 120 changed files with 1,914 additions and 1,337 deletions.
15 changes: 7 additions & 8 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ class MetaServiceProxy {
long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
config::meta_service_connection_age_base_minutes > 0) {
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_minutes,
config::meta_service_connection_age_base_minutes * 2);
deadline = now + duration_cast<milliseconds>(minutes(uni(rng))).count();
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}
Expand Down Expand Up @@ -610,8 +610,9 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id()
<< ", txn_id=" << txn_id << ", status=" << status;
LOG_EVERY_N(INFO, 20)
<< "delete bitmap not found in cache, will sync rowset to get. tablet_id= "
<< tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
Expand All @@ -630,8 +631,6 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
<< tablet->tablet_id();
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DEFINE_Bool(meta_service_use_load_balancer, "false");
DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
DEFINE_Bool(meta_service_connection_pooled, "true");
DEFINE_mInt64(meta_service_connection_pool_size, "20");
DEFINE_mInt32(meta_service_connection_age_base_minutes, "5");
DEFINE_mInt32(meta_service_connection_age_base_seconds, "30");
DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0");
DEFINE_mInt32(meta_service_rpc_retry_times, "200");
DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000");
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static inline bool is_cloud_mode() {
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
// Here is a set of configs to configure the connection behaviors:
// - meta_service_connection_pooled: distribute the long connections to different RS of the VIP.
// - meta_service_connection_age_base_minutes: expire the connection after a random time during [base, 2*base],
// - meta_service_connection_age_base_seconds: expire the connection after a random time during [base, 2*base],
// so that the BE has a chance to connect to a new RS. (When you add a new RS, the BE will connect to it)
// - meta_service_idle_connection_timeout_ms: rebuild the idle connections after the timeout exceeds. Some LB
// vendors will reset the connection if it is idle for a long time.
Expand All @@ -50,7 +50,7 @@ DECLARE_mInt64(meta_service_connection_pool_size);
// has a chance to connect to a new RS. Set zero to disable it.
//
// Only works when meta_service_endpoint is set to a single host.
DECLARE_mInt32(meta_service_connection_age_base_minutes);
DECLARE_mInt32(meta_service_connection_age_base_seconds);
// Rebuild the idle connections after the timeout exceeds. Set zero to disable it.
//
// Only works when meta_service_endpoint is set to a single host.
Expand Down
7 changes: 5 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ DEFINE_Bool(enable_brpc_builtin_services, "true");
// Enable brpc connection check
DEFINE_Bool(enable_brpc_connection_check, "false");

DEFINE_mInt64(brpc_connection_check_timeout_ms, "10000");

// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
Expand Down Expand Up @@ -959,8 +961,6 @@ DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
DEFINE_mInt64(block_cache_wait_timeout_ms, "1000");
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");

// limit the queue of pending batches which will be sent by a single nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
Expand Down Expand Up @@ -1054,6 +1054,9 @@ DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
13 changes: 11 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1011,13 +1011,13 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes);
// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);
DECLARE_mInt64(block_cache_wait_timeout_ms);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);

DECLARE_Bool(enable_brpc_builtin_services);

DECLARE_Bool(enable_brpc_connection_check);

DECLARE_mInt64(brpc_connection_check_timeout_ms);

// Max waiting time to wait the "plan fragment start" rpc.
// If timeout, the fragment will be cancelled.
// This parameter is usually only used when the FE loses connection,
Expand Down Expand Up @@ -1095,6 +1095,15 @@ DECLARE_Bool(enable_ttl_cache_evict_using_lru);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file cache.
// Make your choice based on the following considerations:
// If your file cache is ample enough to accommodate all the data in your database,
// enable this option; otherwise, it is recommended to leave it disabled.
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
84 changes: 30 additions & 54 deletions be/src/exprs/hybrid_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,8 @@

#pragma once

#include <glog/logging.h>

#include <type_traits>

#include "common/exception.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/common/string_ref.h"
#include "exprs/runtime_filter_convertor.h"

namespace doris {

Expand Down Expand Up @@ -221,30 +208,19 @@ class HybridSetBase : public RuntimeFilterFuncBase {
virtual bool find(const void* data, size_t) const = 0;

virtual void find_batch(const doris::vectorized::IColumn& column, size_t rows,
doris::vectorized::ColumnUInt8::Container& results) {
LOG(FATAL) << "HybridSetBase not support find_batch";
__builtin_unreachable();
}

doris::vectorized::ColumnUInt8::Container& results) = 0;
virtual void find_batch_negative(const doris::vectorized::IColumn& column, size_t rows,
doris::vectorized::ColumnUInt8::Container& results) {
LOG(FATAL) << "HybridSetBase not support find_batch_negative";
__builtin_unreachable();
}

doris::vectorized::ColumnUInt8::Container& results) = 0;
virtual void find_batch_nullable(const doris::vectorized::IColumn& column, size_t rows,
const doris::vectorized::NullMap& null_map,
doris::vectorized::ColumnUInt8::Container& results) {
LOG(FATAL) << "HybridSetBase not support find_batch_nullable";
__builtin_unreachable();
}
doris::vectorized::ColumnUInt8::Container& results) = 0;

virtual void find_batch_nullable_negative(const doris::vectorized::IColumn& column, size_t rows,
const doris::vectorized::NullMap& null_map,
doris::vectorized::ColumnUInt8::Container& results) {
LOG(FATAL) << "HybridSetBase not support find_batch_nullable_negative";
__builtin_unreachable();
}
virtual void find_batch_nullable_negative(
const doris::vectorized::IColumn& column, size_t rows,
const doris::vectorized::NullMap& null_map,
doris::vectorized::ColumnUInt8::Container& results) = 0;

virtual void to_pb(PInFilter* filter) = 0;

class IteratorBase {
public:
Expand All @@ -261,26 +237,6 @@ class HybridSetBase : public RuntimeFilterFuncBase {
bool _contains_null = false;
};

template <typename Type>
const Type* check_and_get_hybrid_set(const HybridSetBase& column) {
return typeid_cast<const Type*>(&column);
}

template <typename Type>
const Type* check_and_get_hybrid_set(const HybridSetBase* column) {
return typeid_cast<const Type*>(column);
}

template <typename Type>
bool check_hybrid_set(const HybridSetBase& column) {
return check_and_get_hybrid_set<Type>(&column);
}

template <typename Type>
bool check_hybrid_set(const HybridSetBase* column) {
return check_and_get_hybrid_set<Type>(column);
}

template <PrimitiveType T,
typename _ContainerType = DynamicContainer<typename PrimitiveTypeTraits<T>::CppType>,
typename _ColumnType = typename PrimitiveTypeTraits<T>::ColumnType>
Expand Down Expand Up @@ -409,6 +365,14 @@ class HybridSet : public HybridSetBase {

ContainerType* get_inner_set() { return &_set; }

void set_pb(PInFilter* filter, auto f) {
for (auto v : _set) {
f(filter->add_values(), v);
}
}

void to_pb(PInFilter* filter) override { set_pb(filter, get_convertor<ElementType>()); }

private:
ContainerType _set;
ObjectPool _pool;
Expand Down Expand Up @@ -569,6 +533,14 @@ class StringSet : public HybridSetBase {

ContainerType* get_inner_set() { return &_set; }

void set_pb(PInFilter* filter, auto f) {
for (const auto& v : _set) {
f(filter->add_values(), v);
}
}

void to_pb(PInFilter* filter) override { set_pb(filter, get_convertor<std::string>()); }

private:
ContainerType _set;
ObjectPool _pool;
Expand Down Expand Up @@ -735,6 +707,10 @@ class StringValueSet : public HybridSetBase {

ContainerType* get_inner_set() { return &_set; }

void to_pb(PInFilter* filter) override {
throw Exception(ErrorCode::INTERNAL_ERROR, "StringValueSet do not support to_pb");
}

private:
ContainerType _set;
ObjectPool _pool;
Expand Down
23 changes: 14 additions & 9 deletions be/src/exprs/minmax_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,8 @@

#pragma once

#include <type_traits>

#include "common/object_pool.h"
#include "exprs/runtime_filter.h"
#include "runtime/type_limit.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "exprs/runtime_filter_convertor.h"

namespace doris {
// only used in Runtime Filter
Expand All @@ -45,6 +37,8 @@ class MinMaxFuncBase : public RuntimeFilterFuncBase {

void set_contain_null() { _contain_null = true; }

virtual void to_pb(PMinMaxFilter* filter) = 0;

protected:
bool _contain_null = false;
};
Expand Down Expand Up @@ -165,6 +159,17 @@ class MinMaxNumFunc : public MinMaxFuncBase {
return Status::OK();
}

void set_pb(PMinMaxFilter* filter, auto f) {
if constexpr (NeedMin) {
f(filter->mutable_min_val(), _min);
}
if constexpr (NeedMax) {
f(filter->mutable_max_val(), _max);
}
}

void to_pb(PMinMaxFilter* filter) override { set_pb(filter, get_convertor<T>()); }

protected:
T _max = type_limit<T>::min();
T _min = type_limit<T>::max();
Expand Down
Loading

0 comments on commit d916a66

Please sign in to comment.