Skip to content

Commit

Permalink
[feature](merge-cloud) Get routine load progress info from meta servi…
Browse files Browse the repository at this point in the history
…ce (apache#32532)

Co-authored-by: Luwei <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
  • Loading branch information
3 people authored Mar 25, 2024
1 parent 3c0e724 commit 9e7d416
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 80 deletions.
1 change: 1 addition & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", "get_delete_bitmap"
BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");

BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
12 changes: 12 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ class MetaServiceImpl : public cloud::MetaService {
GetClusterStatusResponse* response,
::google::protobuf::Closure* done) override;

void get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

// ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`.

std::pair<MetaServiceCode, std::string> get_instance_info(const std::string& instance_id,
Expand Down Expand Up @@ -574,6 +579,13 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::get_cluster_status, controller, request, response, done);
}

void get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_rl_task_commit_attach, controller, request, response, done);
}

private:
template <typename Request, typename Response>
using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*,
Expand Down
238 changes: 158 additions & 80 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,163 @@ void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle
}
}

void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
const std::string& instance_id,
const CommitTxnRequest* request,
Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
ss << "failed to get commit attachment from req, db_id=" << db_id
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}

TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
RLTaskTxnCommitAttachmentPB commit_attachment =
txn_commit_attachment.rl_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();

std::string rl_progress_key;
std::string rl_progress_val;
bool prev_progress_existed = true;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_progress_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get routine load progress, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}
}

RoutineLoadProgressPB prev_progress_info;
if (prev_progress_existed) {
if (!prev_progress_info.ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse routine load progress, db_id=" << db_id
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}
}

std::string new_progress_val;
RoutineLoadProgressPB new_progress_info;
new_progress_info.CopyFrom(commit_attachment.progress());
for (auto const& elem : prev_progress_info.partition_to_offset()) {
auto it = new_progress_info.partition_to_offset().find(elem.first);
if (it == new_progress_info.partition_to_offset().end()) {
new_progress_info.mutable_partition_to_offset()->insert(elem);
}
}

std::string new_statistic_val;
RoutineLoadJobStatisticPB* new_statistic_info = new_progress_info.mutable_stat();
if (prev_progress_info.has_stat()) {
const RoutineLoadJobStatisticPB& prev_statistic_info = prev_progress_info.stat();

new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() + commit_attachment.filtered_rows());
new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() + commit_attachment.loaded_rows());
new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() + commit_attachment.unselected_rows());
new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() + commit_attachment.received_bytes());
new_statistic_info->set_task_execution_time_ms(prev_statistic_info.task_execution_time_ms() + commit_attachment.task_execution_time_ms());
} else {
new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows());
new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows());
new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows());
new_statistic_info->set_received_bytes(commit_attachment.received_bytes());
new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms());
}

LOG(INFO) << "routine load new progress: " << new_progress_info.ShortDebugString();

if (!new_progress_info.SerializeToString(&new_progress_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new progress val, txn_id=" << txn_id;
msg = ss.str();
return;
}

txn->put(rl_progress_key, new_progress_val);
}

void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
const GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_rl_task_commit_attach);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}
RPC_RATE_LIMIT(get_rl_task_commit_attach)

std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "filed to create txn, err=" << err;
msg = ss.str();
return;
}

if (!request->has_db_id() || !request->has_job_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty db_id or job_id";
LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
return;
}

int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
std::string rl_progress_key;
std::string rl_progress_val;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
err = txn->get(rl_progress_key, &rl_progress_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
ss << "pregress info not found, db_id=" << db_id
<< " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get pregress info, db_id=" << db_id
<< " job_id=" << job_id << " err=" << err;
msg = ss.str();
return;
}

RLTaskTxnCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress();
if (!progress_info->ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse progress info, db_id=" << db_id << " job_id=" << job_id;
msg = ss.str();
return;
}

if (progress_info->has_stat()) {
const RoutineLoadJobStatisticPB& statistic_info = progress_info->stat();
commit_attach->set_filtered_rows(statistic_info.filtered_rows());
commit_attach->set_loaded_rows(statistic_info.loaded_rows());
commit_attach->set_unselected_rows(statistic_info.unselected_rows());
commit_attach->set_received_bytes(statistic_info.received_bytes());
commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms());
}
}

/**
* 0. Extract txn_id from request
* 1. Get db id from TxnKv with txn_id
Expand Down Expand Up @@ -977,86 +1134,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,

if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
if (!request->has_commit_attachment()) {
ss << "failed to get commit attachment from req, db_id=" << db_id
<< " txn_id=" << txn_id;
msg = ss.str();
return;
}

TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
RLTaskTxnCommitAttachmentPB commit_attachment =
txn_commit_attachment.rl_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();

std::string rl_progress_key;
std::string rl_progress_val;
bool prev_progress_existed = true;
RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
prev_progress_existed = false;
} else {
code = cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
msg = ss.str();
return;
}
}

RoutineLoadProgressPB prev_progress_info;
if (prev_progress_existed) {
if (!prev_progress_info.ParseFromString(rl_progress_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
msg = ss.str();
return;
}

int cal_row_num = 0;
for (auto const& elem : commit_attachment.progress().partition_to_offset()) {
if (elem.second >= 0) {
auto it = prev_progress_info.partition_to_offset().find(elem.first);
if (it != prev_progress_info.partition_to_offset().end() && it->second >= 0) {
cal_row_num += elem.second - it->second;
} else {
cal_row_num += elem.second + 1;
}
}
}

LOG(INFO) << " calculated row num " << cal_row_num << " actual row num "
<< commit_attachment.loaded_rows() << " prev progress "
<< prev_progress_info.DebugString();

if (cal_row_num == 0) {
LOG(WARNING) << " repeated to load task in routine load, db_id=" << db_id
<< " txn_id=" << txn_id << " calculated row num " << cal_row_num
<< " actual row num " << commit_attachment.loaded_rows();
return;
}
}

std::string new_progress_val;
RoutineLoadProgressPB new_progress_info;
new_progress_info.CopyFrom(commit_attachment.progress());
for (auto const& elem : prev_progress_info.partition_to_offset()) {
auto it = new_progress_info.partition_to_offset().find(elem.first);
if (it == new_progress_info.partition_to_offset().end()) {
new_progress_info.mutable_partition_to_offset()->insert(elem);
}
}

if (!new_progress_info.SerializeToString(&new_progress_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new progress val, txn_id=" << txn_info.txn_id();
msg = ss.str();
return;
}
txn->put(rl_progress_key, new_progress_val);
put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
}

LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,15 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) {
}
return blockingStub.getInstance(request);
}

public Cloud.GetRLTaskCommitAttachResponse
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request) {
if (!request.hasCloudUniqueId()) {
Cloud.GetRLTaskCommitAttachRequest.Builder builder =
Cloud.GetRLTaskCommitAttachRequest.newBuilder();
builder.mergeFrom(request);
return blockingStub.getRlTaskCommitAttach(builder.setCloudUniqueId(Config.cloud_unique_id).build());
}
return blockingStub.getRlTaskCommitAttach(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,4 +442,19 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetRLTaskCommitAttachResponse
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
throws RpcException {
if (metaServiceHostPort == null) {
throw new RpcException("", "cloud mode, please configure cloud_unique_id and meta_service_endpoint");
}
TNetworkAddress metaAddress = new TNetworkAddress(metaServiceHostPort.first, metaServiceHostPort.second);
try {
final MetaServiceClient client = getProxy(metaAddress);
return client.getRLTaskCommitAttach(request);
} catch (Exception e) {
throw new RpcException(metaAddress.hostname, e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
Expand All @@ -40,6 +42,7 @@
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
Expand Down Expand Up @@ -69,6 +72,7 @@
import java.util.TimeZone;
import java.util.UUID;


/**
* KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka.
* The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}"
Expand Down Expand Up @@ -247,6 +251,35 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept
}
}

@Override
public void updateCloudProgress() throws UserException {
Cloud.GetRLTaskCommitAttachRequest.Builder builder =
Cloud.GetRLTaskCommitAttachRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setDbId(dbId);
builder.setJobId(id);

Cloud.GetRLTaskCommitAttachResponse response;
try {
response = MetaServiceProxy.getInstance().getRLTaskCommitAttach(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get routine load commit attach, response: {}", response);
if (response.getStatus().getCode() == Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
LOG.warn("not found routine load progress, response: {}", response);
return;
} else {
throw new UserException(response.getStatus().getMsg());
}
}
} catch (RpcException e) {
LOG.info("failed to get routine load commit attach {}", e);
throw new UserException(e.getMessage());
}

RLTaskTxnCommitAttachment commitAttach = new RLTaskTxnCommitAttachment(response.getCommitAttach());
updateProgress(commitAttach);
}

@Override
public int calculateCurrentConcurrentTaskNum() {
int partitionNum = currentKafkaPartitions.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,8 @@ public void processTimeoutTasks() {
}
}

abstract void updateCloudProgress() throws UserException;

abstract void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException;

public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
Expand Down
Loading

0 comments on commit 9e7d416

Please sign in to comment.