diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 702dea865022b2..ab0b5934b50998 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -69,6 +69,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", "get_delete_bitmap" BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); +BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job"); BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 1c4c4f749b6fe1..dbdbfa834e9812 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -170,6 +170,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock; extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; +extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 08a6b0884e059f..a8cd0f521487b2 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -251,6 +251,11 @@ class MetaServiceImpl : public cloud::MetaService { GetClusterStatusResponse* response, ::google::protobuf::Closure* done) override; + void get_rl_task_commit_attach(::google::protobuf::RpcController* controller, + const GetRLTaskCommitAttachRequest* request, + GetRLTaskCommitAttachResponse* response, + ::google::protobuf::Closure* done) override; + // ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`. std::pair get_instance_info(const std::string& instance_id, @@ -574,6 +579,13 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::get_cluster_status, controller, request, response, done); } + void get_rl_task_commit_attach(::google::protobuf::RpcController* controller, + const GetRLTaskCommitAttachRequest* request, + GetRLTaskCommitAttachResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::get_rl_task_commit_attach, controller, request, response, done); + } + private: template using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*, diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 788663c2ceac3f..03251ee0f071a7 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -478,6 +478,163 @@ void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle } } +void put_routine_load_progress(MetaServiceCode& code, std::string& msg, + const std::string& instance_id, + const CommitTxnRequest* request, + Transaction* txn, int64_t db_id) { + std::stringstream ss; + int64_t txn_id = request->txn_id(); + if (!request->has_commit_attachment()) { + ss << "failed to get commit attachment from req, db_id=" << db_id + << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + + TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment(); + RLTaskTxnCommitAttachmentPB commit_attachment = + txn_commit_attachment.rl_task_txn_commit_attachment(); + int64_t job_id = commit_attachment.job_id(); + + std::string rl_progress_key; + std::string rl_progress_val; + bool prev_progress_existed = true; + RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; + rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); + TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val); + if (err != TxnErrorCode::TXN_OK) { + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + prev_progress_existed = false; + } else { + code = cast_as(err); + ss << "failed to get routine load progress, db_id=" << db_id << " txn_id=" << txn_id + << " err=" << err; + msg = ss.str(); + return; + } + } + + RoutineLoadProgressPB prev_progress_info; + if (prev_progress_existed) { + if (!prev_progress_info.ParseFromString(rl_progress_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse routine load progress, db_id=" << db_id + << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + } + + std::string new_progress_val; + RoutineLoadProgressPB new_progress_info; + new_progress_info.CopyFrom(commit_attachment.progress()); + for (auto const& elem : prev_progress_info.partition_to_offset()) { + auto it = new_progress_info.partition_to_offset().find(elem.first); + if (it == new_progress_info.partition_to_offset().end()) { + new_progress_info.mutable_partition_to_offset()->insert(elem); + } + } + + std::string new_statistic_val; + RoutineLoadJobStatisticPB* new_statistic_info = new_progress_info.mutable_stat(); + if (prev_progress_info.has_stat()) { + const RoutineLoadJobStatisticPB& prev_statistic_info = prev_progress_info.stat(); + + new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() + commit_attachment.filtered_rows()); + new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() + commit_attachment.loaded_rows()); + new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() + commit_attachment.unselected_rows()); + new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() + commit_attachment.received_bytes()); + new_statistic_info->set_task_execution_time_ms(prev_statistic_info.task_execution_time_ms() + commit_attachment.task_execution_time_ms()); + } else { + new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows()); + new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows()); + new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows()); + new_statistic_info->set_received_bytes(commit_attachment.received_bytes()); + new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms()); + } + + LOG(INFO) << "routine load new progress: " << new_progress_info.ShortDebugString(); + + if (!new_progress_info.SerializeToString(&new_progress_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize new progress val, txn_id=" << txn_id; + msg = ss.str(); + return; + } + + txn->put(rl_progress_key, new_progress_val); +} + +void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller, + const GetRLTaskCommitAttachRequest* request, + GetRLTaskCommitAttachResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(get_rl_task_commit_attach); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(get_rl_task_commit_attach) + + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "filed to create txn, err=" << err; + msg = ss.str(); + return; + } + + if (!request->has_db_id() || !request->has_job_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty db_id or job_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + + int64_t db_id = request->db_id(); + int64_t job_id = request->job_id(); + std::string rl_progress_key; + std::string rl_progress_val; + RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; + rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); + err = txn->get(rl_progress_key, &rl_progress_val); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND; + ss << "pregress info not found, db_id=" << db_id + << " job_id=" << job_id << " err=" << err; + msg = ss.str(); + return; + } else if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to get pregress info, db_id=" << db_id + << " job_id=" << job_id << " err=" << err; + msg = ss.str(); + return; + } + + RLTaskTxnCommitAttachmentPB* commit_attach = response->mutable_commit_attach(); + RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress(); + if (!progress_info->ParseFromString(rl_progress_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse progress info, db_id=" << db_id << " job_id=" << job_id; + msg = ss.str(); + return; + } + + if (progress_info->has_stat()) { + const RoutineLoadJobStatisticPB& statistic_info = progress_info->stat(); + commit_attach->set_filtered_rows(statistic_info.filtered_rows()); + commit_attach->set_loaded_rows(statistic_info.loaded_rows()); + commit_attach->set_unselected_rows(statistic_info.unselected_rows()); + commit_attach->set_received_bytes(statistic_info.received_bytes()); + commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms()); + } +} + /** * 0. Extract txn_id from request * 1. Get db id from TxnKv with txn_id @@ -977,86 +1134,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, if (txn_info.load_job_source_type() == LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) { - if (!request->has_commit_attachment()) { - ss << "failed to get commit attachment from req, db_id=" << db_id - << " txn_id=" << txn_id; - msg = ss.str(); - return; - } - - TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment(); - RLTaskTxnCommitAttachmentPB commit_attachment = - txn_commit_attachment.rl_task_txn_commit_attachment(); - int64_t job_id = commit_attachment.job_id(); - - std::string rl_progress_key; - std::string rl_progress_val; - bool prev_progress_existed = true; - RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; - rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); - TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val); - if (err != TxnErrorCode::TXN_OK) { - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - prev_progress_existed = false; - } else { - code = cast_as(err); - ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id - << " err=" << err; - msg = ss.str(); - return; - } - } - - RoutineLoadProgressPB prev_progress_info; - if (prev_progress_existed) { - if (!prev_progress_info.ParseFromString(rl_progress_val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; - msg = ss.str(); - return; - } - - int cal_row_num = 0; - for (auto const& elem : commit_attachment.progress().partition_to_offset()) { - if (elem.second >= 0) { - auto it = prev_progress_info.partition_to_offset().find(elem.first); - if (it != prev_progress_info.partition_to_offset().end() && it->second >= 0) { - cal_row_num += elem.second - it->second; - } else { - cal_row_num += elem.second + 1; - } - } - } - - LOG(INFO) << " calculated row num " << cal_row_num << " actual row num " - << commit_attachment.loaded_rows() << " prev progress " - << prev_progress_info.DebugString(); - - if (cal_row_num == 0) { - LOG(WARNING) << " repeated to load task in routine load, db_id=" << db_id - << " txn_id=" << txn_id << " calculated row num " << cal_row_num - << " actual row num " << commit_attachment.loaded_rows(); - return; - } - } - - std::string new_progress_val; - RoutineLoadProgressPB new_progress_info; - new_progress_info.CopyFrom(commit_attachment.progress()); - for (auto const& elem : prev_progress_info.partition_to_offset()) { - auto it = new_progress_info.partition_to_offset().find(elem.first); - if (it == new_progress_info.partition_to_offset().end()) { - new_progress_info.mutable_partition_to_offset()->insert(elem); - } - } - - if (!new_progress_info.SerializeToString(&new_progress_val)) { - code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; - ss << "failed to serialize new progress val, txn_id=" << txn_info.txn_id(); - msg = ss.str(); - return; - } - txn->put(rl_progress_key, new_progress_val); + put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); } LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index 49cb3c205903fc..19949ac73d3c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -314,4 +314,15 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { } return blockingStub.getInstance(request); } + + public Cloud.GetRLTaskCommitAttachResponse + getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.GetRLTaskCommitAttachRequest.Builder builder = + Cloud.GetRLTaskCommitAttachRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.getRlTaskCommitAttach(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.getRlTaskCommitAttach(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 9715d831e8f3ae..680189d4d276f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -442,4 +442,19 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo throw new RpcException("", e.getMessage(), e); } } + + public Cloud.GetRLTaskCommitAttachResponse + getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request) + throws RpcException { + if (metaServiceHostPort == null) { + throw new RpcException("", "cloud mode, please configure cloud_unique_id and meta_service_endpoint"); + } + TNetworkAddress metaAddress = new TNetworkAddress(metaServiceHostPort.first, metaServiceHostPort.second); + try { + final MetaServiceClient client = getProxy(metaAddress); + return client.getRLTaskCommitAttach(request); + } catch (Exception e) { + throw new RpcException(metaAddress.hostname, e.getMessage(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index bdcfb9e4a2724a..1067a759e5f416 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -23,6 +23,8 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.InternalErrorCode; @@ -40,6 +42,7 @@ import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; +import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -69,6 +72,7 @@ import java.util.TimeZone; import java.util.UUID; + /** * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka. * The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}" @@ -247,6 +251,35 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept } } + @Override + public void updateCloudProgress() throws UserException { + Cloud.GetRLTaskCommitAttachRequest.Builder builder = + Cloud.GetRLTaskCommitAttachRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setDbId(dbId); + builder.setJobId(id); + + Cloud.GetRLTaskCommitAttachResponse response; + try { + response = MetaServiceProxy.getInstance().getRLTaskCommitAttach(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("failed to get routine load commit attach, response: {}", response); + if (response.getStatus().getCode() == Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) { + LOG.warn("not found routine load progress, response: {}", response); + return; + } else { + throw new UserException(response.getStatus().getMsg()); + } + } + } catch (RpcException e) { + LOG.info("failed to get routine load commit attach {}", e); + throw new UserException(e.getMessage()); + } + + RLTaskTxnCommitAttachment commitAttach = new RLTaskTxnCommitAttachment(response.getCommitAttach()); + updateProgress(commitAttach); + } + @Override public int calculateCurrentConcurrentTaskNum() { int partitionNum = currentKafkaPartitions.size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 77663be058cd77..20c1b999d030e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -779,6 +779,8 @@ public void processTimeoutTasks() { } } + abstract void updateCloudProgress() throws UserException; + abstract void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException; public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 84f9548de1324a..51029c3d18b194 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -18,6 +18,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -73,10 +74,15 @@ private void process() throws UserException { if (!routineLoadJobList.isEmpty()) { LOG.info("there are {} job need schedule", routineLoadJobList.size()); } + for (RoutineLoadJob routineLoadJob : routineLoadJobList) { RoutineLoadJob.JobState errorJobState = null; UserException userException = null; try { + if (Config.isCloudMode()) { + routineLoadJob.updateCloudProgress(); + } + routineLoadJob.prepare(); // judge nums of tasks more than max concurrent tasks of cluster int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index cad9eac22baf09..2db7dd1a5ef557 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -258,6 +258,7 @@ message TxnCoordinatorPB { message RoutineLoadProgressPB { map partition_to_offset = 1; + optional RoutineLoadJobStatisticPB stat = 2; } message RLTaskTxnCommitAttachmentPB { @@ -272,6 +273,14 @@ message RLTaskTxnCommitAttachmentPB { optional string error_log_url = 9; } +message RoutineLoadJobStatisticPB { + optional int64 filtered_rows = 1; + optional int64 loaded_rows = 2; + optional int64 unselected_rows = 3; + optional int64 received_bytes = 4; + optional int64 task_execution_time_ms = 5; +} + message TxnCommitAttachmentPB { enum Type { LODD_JOB_FINAL_OPERATION = 0; @@ -1205,6 +1214,7 @@ enum MetaServiceCode { JOB_TABLET_BUSY = 5001; JOB_ALREADY_SUCCESS = 5002; ROUTINE_LOAD_DATA_INCONSISTENT = 5003; + ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004; // Rate limit MAX_QPS_LIMIT = 6001; @@ -1288,6 +1298,17 @@ message GetDeleteBitmapUpdateLockResponse { optional MetaServiceResponseStatus status = 1; } +message GetRLTaskCommitAttachRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 db_id = 2; + optional int64 job_id = 3; +} + +message GetRLTaskCommitAttachResponse { + optional MetaServiceResponseStatus status = 1; + optional RLTaskTxnCommitAttachmentPB commit_attach = 2; +} + service MetaService { rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse); rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse); @@ -1350,6 +1371,9 @@ service MetaService { rpc update_delete_bitmap(UpdateDeleteBitmapRequest) returns(UpdateDeleteBitmapResponse); rpc get_delete_bitmap(GetDeleteBitmapRequest) returns(GetDeleteBitmapResponse); rpc get_delete_bitmap_update_lock(GetDeleteBitmapUpdateLockRequest) returns(GetDeleteBitmapUpdateLockResponse); + + // routine load progress + rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); }; service RecyclerService {