Skip to content

Commit

Permalink
Merge branch 'master' into 20240425_fix_thread_context
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Apr 27, 2024
2 parents eccdb6e + e24c7a5 commit ce77f7e
Show file tree
Hide file tree
Showing 451 changed files with 11,345 additions and 6,996 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ github:
- shuke987
- wm1581066
- KassieZ
- gavinchou
- yujun777

notifications:
Expand Down
5 changes: 0 additions & 5 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ option(USE_LIBCPP "Use libc++" OFF)
option(USE_MEM_TRACKER, "Use memory tracker" ON)
option(USE_UNWIND "Use libunwind" ON)
option(USE_JEMALLOC "Use jemalloc" ON)
option(USE_JEMALLOC_HOOK "Use jemalloc hook" ON)
if (OS_MACOSX)
set(GLIBC_COMPATIBILITY OFF)
set(USE_LIBCPP ON)
Expand All @@ -88,7 +87,6 @@ message(STATUS "GLIBC_COMPATIBILITY is ${GLIBC_COMPATIBILITY}")
message(STATUS "USE_LIBCPP is ${USE_LIBCPP}")
message(STATUS "USE_MEM_TRACKER is ${USE_MEM_TRACKER}")
message(STATUS "USE_JEMALLOC is ${USE_JEMALLOC}")
message(STATUS "USE_JEMALLOC_HOOK is ${USE_JEMALLOC_HOOK}")
message(STATUS "USE_UNWIND is ${USE_UNWIND}")
message(STATUS "ENABLE_PCH is ${ENABLE_PCH}")

Expand Down Expand Up @@ -348,9 +346,6 @@ endif()
if (USE_JEMALLOC)
add_definitions(-DUSE_JEMALLOC)
endif()
if (USE_JEMALLOC_HOOK)
add_definitions(-DUSE_JEMALLOC_HOOK)
endif()

# Compile with libunwind
if (USE_UNWIND)
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
Expand All @@ -1384,6 +1385,7 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
};
st = client->reset(conf);
fs = std::move(existed_fs);
Expand Down
16 changes: 16 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@ DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
DEFINE_Int32(flush_thread_num_per_store, "6");
// number of thread for flushing memtable per store, for high priority load task
DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
DEFINE_Int32(max_flush_thread_num_per_cpu, "4");

// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
Expand Down Expand Up @@ -978,6 +981,7 @@ DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down Expand Up @@ -1211,11 +1215,19 @@ DEFINE_mInt32(thrift_client_open_num_tries, "1");

DEFINE_Bool(enable_index_compaction, "false");

// http scheme in S3Client to use. E.g. http or https
DEFINE_String(s3_client_http_scheme, "http");
DEFINE_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
return config == "http" || config == "https";
});

// enable injection point in regression-test
DEFINE_mBool(enable_injection_point, "false");

DEFINE_mBool(ignore_schema_change_check, "false");

DEFINE_mInt64(string_overflow_size, "4294967295"); // std::numic_limits<uint32_t>::max()

// The min thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
// The max thread num for BufferedReaderPrefetchThreadPool
Expand All @@ -1224,6 +1236,8 @@ DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
DEFINE_mInt64(max_ttl_cache_ratio, "90");

// clang-format off
#ifdef BE_TEST
Expand Down Expand Up @@ -1663,6 +1677,8 @@ Status set_fuzzy_configs() {
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_shrink_memory"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["string_overflow_size"] =
((distribution(*generator) % 2) == 0) ? "10" : "4294967295";

fmt::memory_buffer buf;
for (auto& it : fuzzy_field_and_value) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 1GB
DECLARE_Int32(flush_thread_num_per_store);
// number of thread for flushing memtable per store, for high priority load task
DECLARE_Int32(high_priority_flush_thread_num_per_store);
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
DECLARE_Int32(max_flush_thread_num_per_cpu);

// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
Expand Down Expand Up @@ -1022,6 +1025,7 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_read_cache_file_directly);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down Expand Up @@ -1290,11 +1294,17 @@ DECLARE_mInt64(hive_sink_max_file_size);
// Retry the Open num_retries time waiting 100 milliseconds between retries.
DECLARE_mInt32(thrift_client_open_num_tries);

// http scheme in S3Client to use. E.g. http or https
DECLARE_String(s3_client_http_scheme);

// enable injection point in regression-test
DECLARE_mBool(enable_injection_point);

DECLARE_mBool(ignore_schema_change_check);

/** Only use in fuzzy test **/
DECLARE_mInt64(string_overflow_size);

// The min thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
// The max thread num for BufferedReaderPrefetchThreadPool
Expand All @@ -1303,6 +1313,8 @@ DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
// The max ratio for ttl cache's size
DECLARE_mInt64(max_ttl_cache_ratio);

#ifdef BE_TEST
// test s3
Expand Down
14 changes: 14 additions & 0 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
std::string compaction_type;
bool run_status = false;

{
// Full compaction holds both base compaction lock and cumu compaction lock.
// So we can not judge if full compaction is running by check these two locks holding.
// Here, we use a variable 'is_full_compaction_running' to check if full compaction is running.
if (tablet->is_full_compaction_running()) {
msg = "compaction task for this tablet is running";
compaction_type = "full";
run_status = true;
*json_result = strings::Substitute(json_template, run_status, msg, tablet_id,
compaction_type);
return Status::OK();
}
}

{
// use try lock to check this tablet is running cumulative compaction
std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(),
Expand Down
29 changes: 28 additions & 1 deletion be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha

auto& offsets = _files[hash];
DCHECK((context.expiration_time == 0 && context.cache_type != FileCacheType::TTL) ||
(context.cache_type == FileCacheType::TTL && context.expiration_time != 0));
(context.cache_type == FileCacheType::TTL && context.expiration_time != 0))
<< fmt::format("expiration time {}, cache type {}", context.expiration_time,
context.cache_type);

FileCacheKey key;
key.hash = hash;
Expand All @@ -639,6 +641,7 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
_key_to_time[hash] = context.expiration_time;
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
_cur_ttl_size += cell.size();
}
auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
_cur_cache_size += size;
Expand Down Expand Up @@ -695,6 +698,10 @@ const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) co
bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
auto limit = config::max_ttl_cache_ratio * _capacity;
if ((_cur_ttl_size + size) * 100 > limit) {
return false;
}
auto is_overflow = [&] {
return _disk_resource_limit_mode ? removed_size < size
: cur_cache_size + size - removed_size > _capacity;
Expand Down Expand Up @@ -1129,6 +1136,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
}
}
_cur_cache_size -= file_block->range().size();
if (FileCacheType::TTL == type) {
_cur_ttl_size -= file_block->range().size();
}
auto& offsets = _files[hash];
offsets.erase(file_block->offset());
if (offsets.empty()) {
Expand Down Expand Up @@ -1544,6 +1554,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
_files.clear();
_cur_cache_size = 0;
_cur_ttl_size = 0;
_time_to_key.clear();
_key_to_time.clear();
_index_queue.clear(cache_lock);
Expand All @@ -1560,6 +1571,22 @@ std::string BlockFileCache::clear_file_cache_directly() {
return msg;
}

std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
std::lock_guard cache_lock(_mutex);
if (_files.contains(hash)) {
for (auto& [offset, cell] : _files[hash]) {
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
offset_to_block.emplace(offset, cell.file_block);
use_cell(cell, nullptr,
need_to_move(cell.file_block->cache_type(), FileCacheType::DISPOSABLE),
cache_lock);
}
}
}
return offset_to_block;
}

template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock);
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class BlockFileCache {
*/
std::string clear_file_cache_async();
std::string clear_file_cache_directly();

std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
/// For debug.
std::string dump_structure(const UInt128Wrapper& hash);

Expand Down Expand Up @@ -394,6 +394,7 @@ class BlockFileCache {
CachedFiles _files;
QueryFileCacheContextMap _query_map;
size_t _cur_cache_size = 0;
size_t _cur_ttl_size = 0;
std::multimap<uint64_t, UInt128Wrapper> _time_to_key;
std::unordered_map<UInt128Wrapper, uint64_t, KeyHash> _key_to_time;
// The three queues are level queue.
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ void FileCacheBlockS3Downloader::download_file_cache_block(std::vector<FileCache
}
}
};
CloudTabletSPtr tablet;
auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false);
if (!res.has_value()) {
LOG_WARNING("Failed to find tablet {}", meta.tablet_id()).error(res.error());
return;
}
auto id_to_rowset_meta_map = tablet->tablet_meta()->snapshot_rs_metas();
auto id_to_rowset_meta_map = res.value()->tablet_meta()->snapshot_rs_metas();
if (auto iter = id_to_rowset_meta_map.find(meta.rowset_id());
iter != id_to_rowset_meta_map.end()) {
UInt128Wrapper cache_key = BlockFileCache::hash(meta.file_name());
Expand Down
68 changes: 62 additions & 6 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "io/cache/block_file_cache_profile.h"
#include "io/cache/file_block.h"
#include "io/fs/file_reader.h"
#include "io/fs/local_file_system.h"
#include "io/io_common.h"
#include "util/bit_util.h"
#include "util/doris_metrics.h"
Expand All @@ -50,6 +51,9 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
Expand All @@ -69,6 +73,14 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
}
}

void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
if (config::enable_read_cache_file_directly) {
std::lock_guard lock(_mtx);
DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
_cache_file_readers.emplace(file_block->offset(), std::move(file_block));
}
}

CachedRemoteFileReader::~CachedRemoteFileReader() {
static_cast<void>(close());
}
Expand Down Expand Up @@ -110,6 +122,54 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
return Status::OK();
}
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));
stats.bytes_read += bytes_req;
if (config::enable_read_cache_file_directly) {
// read directly
size_t need_read_size = bytes_req;
std::shared_lock lock(_mtx);
if (!_cache_file_readers.empty()) {
// find the last offset > offset.
auto iter = _cache_file_readers.upper_bound(offset);
if (iter != _cache_file_readers.begin()) {
iter--;
}
size_t cur_offset = offset;
while (need_read_size != 0 && iter != _cache_file_readers.end()) {
if (iter->second->offset() > cur_offset ||
iter->second->range().right < cur_offset) {
break;
}
size_t file_offset = cur_offset - iter->second->offset();
size_t reserve_bytes =
std::min(need_read_size, iter->second->range().size() - file_offset);
{
SCOPED_RAW_TIMER(&stats.local_read_timer);
if (!iter->second
->read(Slice(result.data + (cur_offset - offset), reserve_bytes),
file_offset)
.ok()) {
break;
}
}
need_read_size -= reserve_bytes;
cur_offset += reserve_bytes;
iter++;
}
if (need_read_size == 0) {
*bytes_read = bytes_req;
stats.hit_cache = true;
return Status::OK();
}
}
}
// read from cache or remote
auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
CacheContext cache_context(io_ctx);
FileBlocksHolder holder =
Expand Down Expand Up @@ -137,7 +197,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
break;
}
}
stats.bytes_read += bytes_req;
size_t empty_start = 0;
size_t empty_end = 0;
if (!empty_blocks.empty()) {
Expand All @@ -164,6 +223,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}
if (!st.ok()) {
LOG_WARNING("Write data to file cache failed").error(st);
} else {
_insert_file_reader(block);
}
stats.bytes_write_into_file_cache += block_size;
}
Expand Down Expand Up @@ -243,11 +304,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
current_offset = right + 1;
}
DCHECK(*bytes_read == bytes_req);
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
return Status::OK();
}

Expand Down
Loading

0 comments on commit ce77f7e

Please sign in to comment.