diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 00feee22c76546..e61ed144486c0f 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -214,26 +214,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS *file_reader = stream_load_ctx->pipe; } - if (file_reader->get() == nullptr) { - return Status::OK(); - } - - auto multi_table_pipe = std::dynamic_pointer_cast(*file_reader); - if (multi_table_pipe == nullptr || runtime_state == nullptr) { - return Status::OK(); - } - - TUniqueId pipe_id; - if (runtime_state->enable_pipeline_x_exec()) { - pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(), - runtime_state->fragment_id()); - } else { - pipe_id = runtime_state->fragment_instance_id(); - } - *file_reader = multi_table_pipe->get_pipe(pipe_id); - LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id - << " pipe: " << (*file_reader).get(); - return Status::OK(); } diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index aeff2f42fa3ca8..789903cc1ca86b 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -61,9 +61,9 @@ Status MultiTablePipe::append_json(const char* data, size_t size) { } KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) { - auto pipe = _planned_pipes.find(table); - DCHECK(pipe != _planned_pipes.end()); - return pipe->second; + auto pair = _planned_tables.find(table); + DCHECK(pair != _planned_tables.end()); + return std::static_pointer_cast(pair->second->pipe); } static std::string_view get_first_part(const char* dat, char delimiter) { @@ -78,15 +78,15 @@ static std::string_view get_first_part(const char* dat, char delimiter) { } Status MultiTablePipe::finish() { - for (auto& pair : _planned_pipes) { - RETURN_IF_ERROR(pair.second->finish()); + for (auto& pair : _planned_tables) { + RETURN_IF_ERROR(pair.second->pipe->finish()); } return Status::OK(); } void MultiTablePipe::cancel(const std::string& reason) { - for (auto& pair : _planned_pipes) { - pair.second->cancel(reason); + for (auto& pair : _planned_tables) { + pair.second->pipe->cancel(reason); } } @@ -101,19 +101,29 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size return Status::InternalError("empty data"); } KafkaConsumerPipePtr pipe = nullptr; - auto iter = _planned_pipes.find(table); - if (iter != _planned_pipes.end()) { - pipe = iter->second; + auto iter = _planned_tables.find(table); + if (iter != _planned_tables.end()) { + pipe = std::static_pointer_cast(iter->second->pipe); RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), "append failed in planned kafka pipe"); } else { - iter = _unplanned_pipes.find(table); - if (iter == _unplanned_pipes.end()) { + iter = _unplanned_tables.find(table); + if (iter == _unplanned_tables.end()) { + std::shared_ptr ctx = + std::make_shared(doris::ExecEnv::GetInstance()); + ctx->id = UniqueId::gen_uid(); pipe = std::make_shared(); - LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", ctx: " << _ctx->brief(); - _unplanned_pipes.emplace(table, pipe); + ctx->pipe = pipe; +#ifndef BE_TEST + RETURN_NOT_OK_STATUS_WITH_WARN( + doris::ExecEnv::GetInstance()->new_load_stream_mgr()->put(ctx->id, ctx), + "put stream load ctx error"); +#endif + _unplanned_tables.emplace(table, ctx); + LOG(INFO) << "create new unplanned table ctx, table: " << table + << "load id: " << ctx->id << ", txn id: " << _ctx->txn_id; } else { - pipe = iter->second; + pipe = std::static_pointer_cast(iter->second->pipe); } // It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity, @@ -124,7 +134,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size auto pipe_current_capacity = pipe->current_capacity(); auto pipe_max_capacity = pipe->max_capacity(); if (_unplanned_row_cnt >= _row_threshold || - _unplanned_pipes.size() >= _wait_tables_threshold || + _unplanned_tables.size() >= _wait_tables_threshold || pipe_current_capacity + size > pipe_max_capacity) { LOG(INFO) << fmt::format( "unplanned row cnt={} reach row_threshold={} or " @@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size #ifndef BE_TEST Status MultiTablePipe::request_and_exec_plans() { - if (_unplanned_pipes.empty()) { + if (_unplanned_tables.empty()) { return Status::OK(); } - // get list of table names in unplanned pipes - std::vector tables; fmt::memory_buffer log_buffer; log_buffer.clear(); - fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size()); - for (auto& pair : _unplanned_pipes) { - tables.push_back(pair.first); + fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_tables.size()); + for (auto& pair : _unplanned_tables) { fmt::format_to(log_buffer, "{} ", pair.first); } fmt::format_to(log_buffer, "]"); LOG(INFO) << fmt::to_string(log_buffer); - TStreamLoadPutRequest request; - set_request_auth(&request, _ctx->auth); - request.db = _ctx->db; - request.table_names = tables; - request.__isset.table_names = true; - request.txnId = _ctx->txn_id; - request.formatType = _ctx->format; - request.__set_compress_type(_ctx->compress_type); - request.__set_header_type(_ctx->header_type); - request.__set_loadId(_ctx->id.to_thrift()); - request.fileType = TFileType::FILE_STREAM; - request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); - request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); - request.__set_user(_ctx->qualified_user); - request.__set_cloud_cluster(_ctx->cloud_cluster); - // no need to register new_load_stream_mgr coz it is already done in routineload submit task - - // plan this load - ExecEnv* exec_env = doris::ExecEnv::GetInstance(); - TNetworkAddress master_addr = exec_env->master_info()->network_address; - int64_t stream_load_put_start_time = MonotonicNanos(); - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, this](FrontendServiceConnection& client) { - client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request); - })); - _ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; - - Status plan_status(Status::create(_ctx->multi_table_put_result.status)); - if (!plan_status.ok()) { - LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief(); - return plan_status; - } - Status st; - if (_ctx->multi_table_put_result.__isset.params && - !_ctx->multi_table_put_result.__isset.pipeline_params) { - st = exec_plans(exec_env, _ctx->multi_table_put_result.params); - } else if (!_ctx->multi_table_put_result.__isset.params && - _ctx->multi_table_put_result.__isset.pipeline_params) { - st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params); - } else { - return Status::Aborted("too many or too few params are set in multi_table_put_result."); - } + for (auto& pair : _unplanned_tables) { + TStreamLoadPutRequest request; + set_request_auth(&request, _ctx->auth); + std::vector tables; + tables.push_back(pair.first); + request.db = _ctx->db; + request.table_names = tables; + request.__isset.table_names = true; + request.txnId = _ctx->txn_id; + request.formatType = _ctx->format; + request.__set_compress_type(_ctx->compress_type); + request.__set_header_type(_ctx->header_type); + request.__set_loadId((pair.second->id).to_thrift()); + request.fileType = TFileType::FILE_STREAM; + request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); + request.__set_user(_ctx->qualified_user); + request.__set_cloud_cluster(_ctx->cloud_cluster); + // no need to register new_load_stream_mgr coz it is already done in routineload submit task + + // plan this load + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + TNetworkAddress master_addr = exec_env->master_info()->network_address; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, this](FrontendServiceConnection& client) { + client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request); + })); + _ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; + + Status plan_status(Status::create(_ctx->multi_table_put_result.status)); + if (!plan_status.ok()) { + LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief(); + return plan_status; + } + if (_ctx->multi_table_put_result.__isset.params && + !_ctx->multi_table_put_result.__isset.pipeline_params) { + st = exec_plans(exec_env, _ctx->multi_table_put_result.params); + } else if (!_ctx->multi_table_put_result.__isset.params && + _ctx->multi_table_put_result.__isset.pipeline_params) { + st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params); + } else { + return Status::Aborted("too many or too few params are set in multi_table_put_result."); + } + if (!st.ok()) { + return st; + } + } + _unplanned_tables.clear(); return st; } template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector params) { // put unplanned pipes into planned pipes and clear unplanned pipes - for (auto& pipe : _unplanned_pipes) { - _ctx->table_list.push_back(pipe.first); - _planned_pipes.emplace(pipe.first, pipe.second); + for (auto& pair : _unplanned_tables) { + _ctx->table_list.push_back(pair.first); + _planned_tables.emplace(pair.first, pair.second); } LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}", - _unplanned_pipes.size(), _planned_pipes.size(), params.size()) + _unplanned_tables.size(), _planned_tables.size(), params.size()) << ", ctx: " << _ctx->brief(); - _unplanned_pipes.clear(); for (auto& plan : params) { DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed", { return Status::Aborted("MultiTablePipe.exec_plans.failed"); }); if (!plan.__isset.table_name || - _planned_pipes.find(plan.table_name) == _planned_pipes.end()) { + _unplanned_tables.find(plan.table_name) == _unplanned_tables.end()) { return Status::Aborted("Missing vital param: table_name"); } - if constexpr (std::is_same_v) { - RETURN_IF_ERROR( - put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name])); - LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id) - << " table=" << plan.table_name << ", ctx: " << _ctx->brief(); - } else if constexpr (std::is_same_v) { - auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id); - RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name])); - LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name - << ", ctx: " << _ctx->brief(); - } else { - LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or " - "`TPipelineFragmentParams`, will crash" - << ", ctx: " << _ctx->brief(); - CHECK(false); - } - _inflight_cnt++; RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment( - plan, [this](RuntimeState* state, Status* status) { + plan, [this, plan](RuntimeState* state, Status* status) { DCHECK(state); + auto pair = _planned_tables.find(plan.table_name); + if (pair == _planned_tables.end()) { + LOG(WARNING) << "failed to get ctx, table: " << plan.table_name; + } else { + doris::ExecEnv::GetInstance()->new_load_stream_mgr()->remove( + pair->second->id); + } + { std::lock_guard l(_tablet_commit_infos_lock); _tablet_commit_infos.insert(_tablet_commit_infos.end(), @@ -300,12 +304,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector para #else Status MultiTablePipe::request_and_exec_plans() { // put unplanned pipes into planned pipes - for (auto& pipe : _unplanned_pipes) { - _planned_pipes.emplace(pipe.first, pipe.second); + for (auto& pipe : _unplanned_tables) { + _planned_tables.emplace(pipe.first, pipe.second); } LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}", - _unplanned_pipes.size(), _planned_pipes.size()); - _unplanned_pipes.clear(); + _unplanned_tables.size(), _planned_tables.size()); + _unplanned_tables.clear(); return Status::OK(); } @@ -330,35 +334,6 @@ void MultiTablePipe::_handle_consumer_finished() { _ctx->promise.set_value(_status); // when all done, finish the routine load task } -Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id, - std::shared_ptr pipe) { - std::lock_guard l(_pipe_map_lock); - auto it = _pipe_map.find(pipe_id); - if (it != std::end(_pipe_map)) { - return Status::InternalError("id already exist"); - } - _pipe_map.emplace(pipe_id, pipe); - return Status::OK(); -} - -std::shared_ptr MultiTablePipe::get_pipe(const TUniqueId& pipe_id) { - std::lock_guard l(_pipe_map_lock); - auto it = _pipe_map.find(pipe_id); - if (it == std::end(_pipe_map)) { - return {}; - } - return it->second; -} - -void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) { - std::lock_guard l(_pipe_map_lock); - auto it = _pipe_map.find(pipe_id); - if (it != std::end(_pipe_map)) { - _pipe_map.erase(it); - VLOG_NOTICE << "remove stream load pipe: " << pipe_id; - } -} - template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector params); template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h index 36f6ec68b17f3c..f1d2e523652e28 100644 --- a/be/src/io/fs/multi_table_pipe.h +++ b/be/src/io/fs/multi_table_pipe.h @@ -60,13 +60,6 @@ class MultiTablePipe : public KafkaConsumerPipe { void cancel(const std::string& reason) override; - // register pair - Status put_pipe(const TUniqueId& pipe_id, std::shared_ptr pipe); - - std::shared_ptr get_pipe(const TUniqueId& pipe_id); - - void remove_pipe(const TUniqueId& pipe_id); - private: // parse table name from data std::string parse_dst_table(const char* data, size_t size); @@ -82,8 +75,8 @@ class MultiTablePipe : public KafkaConsumerPipe { void _handle_consumer_finished(); private: - std::unordered_map _planned_pipes; - std::unordered_map _unplanned_pipes; + std::unordered_map> _planned_tables; + std::unordered_map> _unplanned_tables; std::atomic _unplanned_row_cnt {0}; // trigger plan request when exceed threshold // inflight count, when it is zero, means consume and all plans is finished std::atomic _inflight_cnt {1};