From 99d07480d3ae685f57b6cf61866343863b481489 Mon Sep 17 00:00:00 2001 From: yagagagaga Date: Thu, 7 Nov 2024 15:05:47 +0800 Subject: [PATCH] [enhancement](cloud) support BE http action: list_cache and clear (#41037) ## Proposed changes Add a http action which is useful when you debug. ### API ```http GET /api/file_cache ``` ### request parameter #### request parameter1 |param|type|desc|require| |:---|:---|:---|:---| |op|string|the value must be `list_cache`, other value you can refer to #40831 #37484 |yes| |value|string|the segment file name |yes| #### request parameter2 |param|type|desc|require| |:---|:---|:---|:---| |op|string|the value must be `clear`, other value you can refer to #40831 #37484 |yes| |value|string|the segment file name |yes| |sync|bool|clean local cache in sync |no| ### response #### response1 if success |param|type|desc| |:---|:---|:---| ||array|return the segment file cache in local path| if fail |param|type|desc| |:---|:---|:---| ||array|empty array| #### response2 if success |param|type|desc| |:---|:---|:---| |status|string|| |msg|string|| ### example #### case 1 ```bash curl '172.100.0.4:8040/api/file_cache?op=list_cache&value=0200000000000001bf42c14374fff491ffb7c89a1a65c5bb_0.dat' ``` return ```json ["/opt/doris/be/file_cache/c6a/c6a599f453f67f0949f80ad9990fa3dd/0"] ``` #### case 2 ```bash curl '127.0.0.1:8040/api/file_cache?op=clear&sync=true&value=0200000000000001284b68fea3dcfe8a83e65cd88426b081_0.dat' ``` return ```json { "status": "OK", "msg": "OK" } ``` --- be/src/http/action/file_cache_action.cpp | 33 ++++- be/src/io/cache/block_file_cache_factory.cpp | 17 +++ be/src/io/cache/block_file_cache_factory.h | 2 + be/src/io/cache/file_block.cpp | 4 + be/src/io/cache/file_block.h | 2 + be/src/io/cache/file_cache_storage.h | 2 + be/src/io/cache/fs_file_cache_storage.cpp | 5 + be/src/io/cache/fs_file_cache_storage.h | 1 + be/src/io/cache/mem_file_cache_storage.cpp | 4 + be/src/io/cache/mem_file_cache_storage.h | 1 + .../cache/http/test_list_cache_file.groovy | 117 ++++++++++++++++++ 11 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp index f31c040c5cf672..740bac46edf2a7 100644 --- a/be/src/http/action/file_cache_action.cpp +++ b/be/src/http/action/file_cache_action.cpp @@ -17,10 +17,15 @@ #include "file_cache_action.h" +#include + +#include #include #include #include #include +#include +#include #include "common/status.h" #include "http/http_channel.h" @@ -30,6 +35,7 @@ #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" +#include "io/cache/fs_file_cache_storage.h" #include "olap/olap_define.h" #include "olap/tablet_meta.h" #include "util/easy_json.h" @@ -43,6 +49,7 @@ constexpr static std::string_view PATH = "path"; constexpr static std::string_view CLEAR = "clear"; constexpr static std::string_view RESET = "reset"; constexpr static std::string_view HASH = "hash"; +constexpr static std::string_view LIST_CACHE = "list_cache"; constexpr static std::string_view CAPACITY = "capacity"; constexpr static std::string_view RELEASE = "release"; constexpr static std::string_view BASE_PATH = "base_path"; @@ -66,7 +73,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri *json_metrics = json.ToString(); } else if (operation == CLEAR) { const std::string& sync = req->param(SYNC.data()); - auto ret = io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true"); + const std::string& segment_path = req->param(VALUE.data()); + if (segment_path.empty()) { + io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true"); + } else { + io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path); + io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash); + cache->remove_if_cached(hash); + } } else if (operation == RESET) { std::string capacity = req->param(CAPACITY.data()); int64_t new_capacity = 0; @@ -96,6 +110,23 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri json[HASH.data()] = ret.to_string(); *json_metrics = json.ToString(); } + } else if (operation == LIST_CACHE) { + const std::string& segment_path = req->param(VALUE.data()); + if (segment_path.empty()) { + st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data()); + } else { + io::UInt128Wrapper cache_hash = io::BlockFileCache::hash(segment_path); + std::vector cache_files = + io::FileCacheFactory::instance()->get_cache_file_by_path(cache_hash); + if (cache_files.empty()) { + *json_metrics = "[]"; + } else { + EasyJson json; + std::for_each(cache_files.begin(), cache_files.end(), + [&json](auto& x) { json.PushBack(x); }); + *json_metrics = json.ToString(); + } + } } else { st = Status::InternalError("invalid operation: {}", operation); } diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index 8370962ddd5fe1..2d0d25735fe2fd 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -21,6 +21,9 @@ #include "io/cache/block_file_cache_factory.h" #include + +#include +#include #if defined(__APPLE__) #include #else @@ -118,6 +121,20 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, return Status::OK(); } +std::vector FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) { + io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash); + auto blocks = cache->get_blocks_by_key(hash); + std::vector ret; + if (blocks.empty()) { + return ret; + } else { + for (auto& [_, fb] : blocks) { + ret.emplace_back(fb->get_cache_file()); + } + } + return ret; +} + BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) { // dont need lock mutex because _caches is immutable after create_file_cache return _caches[KeyHash()(key) % _caches.size()].get(); diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index 12714fd2087982..b00bd7bdfcb315 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -62,6 +62,8 @@ class FileCacheFactory { [[nodiscard]] size_t get_cache_instance_size() const { return _caches.size(); } + std::vector get_cache_file_by_path(const UInt128Wrapper& hash); + BlockFileCache* get_by_path(const UInt128Wrapper& hash); BlockFileCache* get_by_path(const std::string& cache_base_path); std::vector get_query_context_holders( diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 4576b9dbba892f..44cad5520ead06 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -272,6 +272,10 @@ std::string FileBlock::state_to_string(FileBlock::State state) { } } +std::string FileBlock::get_cache_file() const { + return _mgr->_storage->get_local_file(this->_key); +} + FileBlocksHolder::~FileBlocksHolder() { for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) { auto current_file_block_it = file_block_it; diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index 6e49a597b7b95c..3a4490d67a3f9d 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -123,6 +123,8 @@ class FileBlock { uint64_t expiration_time() const { return _key.meta.expiration_time; } + std::string get_cache_file() const; + State state_unlock(std::lock_guard&) const; FileBlock& operator=(const FileBlock&) = delete; diff --git a/be/src/io/cache/file_cache_storage.h b/be/src/io/cache/file_cache_storage.h index 642c4711cf6c62..024e701c6fa08b 100644 --- a/be/src/io/cache/file_cache_storage.h +++ b/be/src/io/cache/file_cache_storage.h @@ -65,6 +65,8 @@ class FileCacheStorage { // force clear all current data in the cache virtual Status clear(std::string& msg) = 0; virtual FileCacheStorageType get_type() = 0; + // get local cached file + virtual std::string get_local_file(const FileCacheKey& key) = 0; }; } // namespace doris::io diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index bacd0820c66099..d99869c1c8fb89 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -660,6 +660,11 @@ Status FSFileCacheStorage::clear(std::string& msg) { return Status::OK(); } +std::string FSFileCacheStorage::get_local_file(const FileCacheKey& key) { + return get_path_in_local_cache(get_path_in_local_cache(key.hash, key.meta.expiration_time), + key.offset, key.meta.type, false); +} + FSFileCacheStorage::~FSFileCacheStorage() { if (_cache_background_load_thread.joinable()) { _cache_background_load_thread.join(); diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index 23e98f422ac884..fb3490bcfe0ca3 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -70,6 +70,7 @@ class FSFileCacheStorage : public FileCacheStorage { void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key, std::lock_guard& cache_lock) override; Status clear(std::string& msg) override; + std::string get_local_file(const FileCacheKey& key) override; [[nodiscard]] static std::string get_path_in_local_cache(const std::string& dir, size_t offset, FileCacheType type, diff --git a/be/src/io/cache/mem_file_cache_storage.cpp b/be/src/io/cache/mem_file_cache_storage.cpp index bffa75ae305b59..7e76dd5f88c565 100644 --- a/be/src/io/cache/mem_file_cache_storage.cpp +++ b/be/src/io/cache/mem_file_cache_storage.cpp @@ -128,4 +128,8 @@ Status MemFileCacheStorage::clear(std::string& msg) { return Status::OK(); } +std::string MemFileCacheStorage::get_local_file(const FileCacheKey& key) { + return ""; +} + } // namespace doris::io diff --git a/be/src/io/cache/mem_file_cache_storage.h b/be/src/io/cache/mem_file_cache_storage.h index 20fdd8ce9f6520..82064c6e9edc78 100644 --- a/be/src/io/cache/mem_file_cache_storage.h +++ b/be/src/io/cache/mem_file_cache_storage.h @@ -44,6 +44,7 @@ class MemFileCacheStorage : public FileCacheStorage { void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key, std::lock_guard& cache_lock) override; Status clear(std::string& msg) override; + std::string get_local_file(const FileCacheKey& key) override; FileCacheStorageType get_type() override { return MEMORY; } diff --git a/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy new file mode 100644 index 00000000000000..acd33a22f0c40e --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_list_cache_file") { + sql """ use @regression_cluster_name1 """ + String[][] backends = sql """ show backends """ + String backendId; + def backendIdToBackendIP = [:] + def backendIdToBackendHttpPort = [:] + def backendIdToBackendBrpcPort = [:] + for (String[] backend in backends) { + if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) { + backendIdToBackendIP.put(backend[0], backend[1]) + backendIdToBackendHttpPort.put(backend[0], backend[4]) + backendIdToBackendBrpcPort.put(backend[0], backend[5]) + } + } + assertEquals(backendIdToBackendIP.size(), 1) + + backendId = backendIdToBackendIP.keySet()[0] + def socket = backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendHttpPort.get(backendId) + + sql "drop table IF EXISTS `user`" + + sql """ + CREATE TABLE IF NOT EXISTS `user` ( + `id` int NULL, + `name` string NULL + ) + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "file_cache_ttl_seconds" = "2884" + ) + """ + + sql "insert into user select number, cast(rand() as varchar(32)) from numbers(\"number\"=\"1000000\")" + + def get_tablets = { String tbl_name -> + def res = sql "show tablets from ${tbl_name}" + List tablets = new ArrayList<>() + for (final def line in res) { + tablets.add(Integer.valueOf(line[0].toString())) + } + return tablets + } + + def get_rowsets = { int tablet_id -> + var ret = [] + httpTest { + endpoint "" + uri socket + "/api/compaction/show?tablet_id=" + tablet_id + op "get" + check {respCode, body -> + assertEquals(respCode, 200) + var map = parseJson(body) + for (final def line in map.get("rowsets")) { + var tokens = line.toString().split(" ") + ret.add(tokens[4]) + } + } + } + return ret + } + + var tablets = get_tablets("user") + var rowsets = get_rowsets(tablets.get(0)) + var segment_file = rowsets[rowsets.size() - 1] + "_0.dat" + + httpTest { + endpoint "" + uri socket + "/api/file_cache?op=list_cache&value=" + segment_file + op "get" + check {respCode, body -> + assertEquals(respCode, 200) + var arr = parseJson(body) + assertTrue(arr.size() > 0, "There shouldn't be no cache file at all, maybe you need to check disk capacity and modify file_cache_enter_disk_resource_limit_mode_percent in be.conf") + } + } + + // clear single segment file cache + httpTest { + endpoint "" + uri socket + "/api/file_cache?op=clear&value=" + segment_file + op "get" + check {respCode, body -> + assertEquals(respCode, 200, "clear local cache fail, maybe you can find something in respond: " + parseJson(body)) + } + } + + httpTest { + endpoint "" + uri socket + "/api/file_cache?op=list_cache&value=" + segment_file + op "get" + check {respCode, body -> + assertEquals(respCode, 200) + var arr = parseJson(body) + assertTrue(arr.size() == 0, "local cache files should not greater than 0, because it has already clear") + } + } +}