Skip to content

Commit

Permalink
Merge branch 'master' into 20240325_fix_arrow_flight
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Mar 26, 2024
2 parents b9ac035 + c6c1d1c commit e8b9db6
Show file tree
Hide file tree
Showing 80 changed files with 1,804 additions and 257 deletions.
5 changes: 5 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "common/status.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
Expand Down Expand Up @@ -83,6 +84,10 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
// If be is gracefully stop, then k_doris_exist is set to true
heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
heartbeat_result.backend_info.__set_fragment_executing_count(
get_fragment_executing_count());
heartbeat_result.backend_info.__set_fragment_last_active_time(
get_fragment_last_active_time());
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
Expand Down
34 changes: 29 additions & 5 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -446,7 +447,6 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(PUBLISH_VERSION)
ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
ADD_TASK_COUNT(ALTER)
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
Expand All @@ -459,6 +459,17 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
DELETE_count << n;
}
return;
case TTaskType::ALTER:
{
ALTER_count << n;
// cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment
doris::g_fragment_executing_count << 1;
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_last_active_time.set_value(now);
return;
}
default:
return;
}
Expand Down Expand Up @@ -1387,12 +1398,14 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;
std::string root_path =
param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";

if (!existed_fs) {
// No such FS instance on BE
auto res = io::HdfsFileSystem::create(param.hdfs_storage_param,
param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr);
auto res = io::HdfsFileSystem::create(
param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr, std::move(root_path));
if (!res.has_value()) {
st = std::move(res).error();
} else {
Expand All @@ -1410,7 +1423,8 @@ void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPt
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
.tag("resource_name", param.name)
.tag("root_path", fs->root_path().string());
put_storage_resource(param.id, {std::move(fs), param.version});
}
}
Expand Down Expand Up @@ -1851,6 +1865,11 @@ void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req)
alter_tablet(engine, req, signature, task_type, &finish_task_request);
finish_task(finish_task_request);
}
doris::g_fragment_executing_count << -1;
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_last_active_time.set_value(now);
remove_task_info(req.task_type, req.signature);
}

Expand All @@ -1872,6 +1891,11 @@ void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskReq
alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request);
finish_task(finish_task_request);
}
doris::g_fragment_executing_count << -1;
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_last_active_time.set_value(now);
remove_task_info(req.task_type, req.signature);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Status CloudRowsetBuilder::init() {
context.mow_context = mow_context;
context.write_file_cache = _req.write_file_cache;
context.partial_update_info = _partial_update_info;
context.file_cache_ttl_sec = _tablet->ttl_seconds();
// New loaded data is always written to latest shared storage
// TODO(AlexYue): use the passed resource id to retrive the corresponding
// fs to pass to the RowsetWriterContext
Expand Down
83 changes: 78 additions & 5 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::sync_meta() {
// TODO(lightman): FileCache
return Status::NotSupported("CloudTablet::sync_meta is not implemented");
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
Expand Down Expand Up @@ -618,4 +613,82 @@ Status CloudTablet::calc_delete_bitmap_for_compaciton(
return Status::OK();
}

Status CloudTablet::sync_meta() {
if (!config::enable_file_cache) {
return Status::OK();
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
// TODO(Lchangliang): recycle_resources_by_self();
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
_tablet_meta->set_ttl_seconds(new_ttl_seconds);
int64_t cur_time = UnixSeconds();
std::shared_lock rlock(_meta_lock);
for (auto& [_, rs] : _rs_version_map) {
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
int64_t new_expiration_time =
new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
auto file_key = io::BlockFileCache::hash(
io::Path(rs->segment_file_path(seg_id)).filename().native());
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->modify_expiration_time(file_key, new_expiration_time);
}
}
}

auto new_compaction_policy = tablet_meta->compaction_policy();
if (_tablet_meta->compaction_policy() != new_compaction_policy) {
_tablet_meta->set_compaction_policy(new_compaction_policy);
}
auto new_time_series_compaction_goal_size_mbytes =
tablet_meta->time_series_compaction_goal_size_mbytes();
if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
new_time_series_compaction_goal_size_mbytes) {
_tablet_meta->set_time_series_compaction_goal_size_mbytes(
new_time_series_compaction_goal_size_mbytes);
}
auto new_time_series_compaction_file_count_threshold =
tablet_meta->time_series_compaction_file_count_threshold();
if (_tablet_meta->time_series_compaction_file_count_threshold() !=
new_time_series_compaction_file_count_threshold) {
_tablet_meta->set_time_series_compaction_file_count_threshold(
new_time_series_compaction_file_count_threshold);
}
auto new_time_series_compaction_time_threshold_seconds =
tablet_meta->time_series_compaction_time_threshold_seconds();
if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
new_time_series_compaction_time_threshold_seconds) {
_tablet_meta->set_time_series_compaction_time_threshold_seconds(
new_time_series_compaction_time_threshold_seconds);
}
auto new_time_series_compaction_empty_rowsets_threshold =
tablet_meta->time_series_compaction_empty_rowsets_threshold();
if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
new_time_series_compaction_empty_rowsets_threshold) {
_tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
new_time_series_compaction_empty_rowsets_threshold);
}
auto new_time_series_compaction_level_threshold =
tablet_meta->time_series_compaction_level_threshold();
if (_tablet_meta->time_series_compaction_level_threshold() !=
new_time_series_compaction_level_threshold) {
_tablet_meta->set_time_series_compaction_level_threshold(
new_time_series_compaction_level_threshold);
}

return Status::OK();
}

} // namespace doris
6 changes: 3 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

// sync tablet_meta when modifying meta
DEFINE_mBool(sync_tablet_meta, "true");
DEFINE_mBool(sync_tablet_meta, "false");

// default thrift rpc timeout ms
DEFINE_mInt32(thrift_rpc_timeout_ms, "60000");
Expand All @@ -617,9 +617,9 @@ DEFINE_Bool(enable_metric_calculator, "true");
// max consumer num in one data consumer group, for routine load
DEFINE_mInt32(max_consumer_num_per_group, "3");

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DEFINE_Int32(routine_load_thread_pool_size, "10");
DEFINE_Int32(max_routine_load_thread_pool_size, "1024");

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,9 @@ DECLARE_Bool(enable_metric_calculator);
// max consumer num in one data consumer group, for routine load
DECLARE_mInt32(max_consumer_num_per_group);

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DECLARE_Int32(routine_load_thread_pool_size);
DECLARE_Int32(max_routine_load_thread_pool_size);

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
15 changes: 8 additions & 7 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,31 @@ namespace doris::io {

Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(
const std::map<std::string, std::string>& properties, std::string fs_name, std::string id,
RuntimeProfile* profile) {
RuntimeProfile* profile, std::string root_path) {
return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id),
profile);
profile, std::move(root_path));
}

Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams& hdfs_params,
std::string fs_name, std::string id,
RuntimeProfile* profile) {
RuntimeProfile* profile,
std::string root_path) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return ResultError(Status::InternalError(
"hdfs file system is not enabled, you can change be config enable_java_support to "
"true."));
}
#endif
std::shared_ptr<HdfsFileSystem> fs(
new HdfsFileSystem(hdfs_params, std::move(fs_name), std::move(id), profile));
std::shared_ptr<HdfsFileSystem> fs(new HdfsFileSystem(
hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path)));
RETURN_IF_ERROR_RESULT(fs->init());
return fs;
}

HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile)
: RemoteFileSystem("", std::move(id), FileSystemType::HDFS),
RuntimeProfile* profile, std::string root_path)
: RemoteFileSystem(root_path, std::move(id), FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_fs_name(std::move(fs_name)),
_profile(profile) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ class HdfsFileSystem final : public RemoteFileSystem {
public:
static Result<std::shared_ptr<HdfsFileSystem>> create(const THdfsParams& hdfs_params,
std::string fs_name, std::string id,
RuntimeProfile* profile);
RuntimeProfile* profile,
std::string root_path = "");

static Result<std::shared_ptr<HdfsFileSystem>> create(
const std::map<std::string, std::string>& properties, std::string fs_name,
std::string id, RuntimeProfile* profile);
std::string id, RuntimeProfile* profile, std::string root_path = "");

~HdfsFileSystem() override;

Expand Down Expand Up @@ -84,7 +85,7 @@ class HdfsFileSystem final : public RemoteFileSystem {
private:
friend class HdfsFileWriter;
HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile);
RuntimeProfile* profile, std::string root_path);
const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here
std::string _fs_name;
// do not use std::shared_ptr or std::unique_ptr
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,12 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
}

std::string S3FileWriter::_dump_completed_part() const {
std::string view;
std::stringstream ss;
ss << "part_numbers:";
for (const auto& part : _completed_parts) {
view.append(fmt::format("part {}, ", view, part->GetPartNumber()));
ss << " " << part->GetPartNumber();
}
return view;
return ss.str();
}

} // namespace doris::io
1 change: 1 addition & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class BaseTablet {
int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); }
size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); }
int64_t ttl_seconds() const { return _tablet_meta->ttl_seconds(); }
std::mutex& get_schema_change_lock() { return _schema_change_lock; }
bool enable_unique_key_merge_on_write() const {
#ifdef BE_TEST
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ Status CompactionMixin::do_inverted_index_compaction() {

// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_tmp_path = tablet_path + "/" + dest_rowset_id.to_string() + "_" + "tmp";
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
auto index_tmp_path = tmp_file_dir / dest_rowset_id.to_string();
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";
Expand Down
36 changes: 20 additions & 16 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <stdint.h>

#include <algorithm>
Expand Down Expand Up @@ -1403,22 +1404,25 @@ void StorageEngine::_cold_data_compaction_producer_callback() {
for (auto& [tablet, score] : tablet_to_follow) {
LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id()
<< " score=" << score;
static_cast<void>(
_cold_data_compaction_thread_pool->submit_func([&, t = std::move(tablet)]() {
{
std::lock_guard lock(tablet_submitted_mtx);
tablet_submitted.insert(t->tablet_id());
}
auto st = t->cooldown();
{
std::lock_guard lock(tablet_submitted_mtx);
tablet_submitted.erase(t->tablet_id());
}
if (!st.ok()) {
LOG(WARNING) << "failed to cooldown. tablet_id=" << t->tablet_id()
<< " err=" << st;
}
}));
static_cast<void>(_cold_data_compaction_thread_pool->submit_func([&,
t = std::move(
tablet)]() {
{
std::lock_guard lock(tablet_submitted_mtx);
tablet_submitted.insert(t->tablet_id());
}
auto st = t->cooldown();
{
std::lock_guard lock(tablet_submitted_mtx);
tablet_submitted.erase(t->tablet_id());
}
if (!st.ok()) {
// The cooldown of the replica may be relatively slow
// resulting in a short period of time where following cannot be successful
LOG_EVERY_N(WARNING, 5)
<< "failed to cooldown. tablet_id=" << t->tablet_id() << " err=" << st;
}
}));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class BetaRowset final : public Rowset {

Status create_reader(RowsetReaderSharedPtr* result) override;

std::string segment_file_path(int segment_id) const;
std::string segment_file_path(int segment_id) const override;

static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id,
int segment_id);
Expand Down
Loading

0 comments on commit e8b9db6

Please sign in to comment.