diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 835e74ca7d5687..e91298d77b2405 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -821,11 +821,13 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) { // async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, - const std::string& label, CommitTxnResponse& res) { + const std::string& label, + const std::vector& commit_infos, + CommitTxnResponse& res) { std::string protobufBytes; res.SerializeToString(&protobufBytes); auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func( - [db_id, txn_id, label, protobufBytes]() -> Status { + [db_id, txn_id, label, protobufBytes, &commit_infos]() -> Status { TReportCommitTxnResultRequest request; TStatus result; @@ -838,6 +840,7 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, request.__set_txnId(txn_id); request.__set_label(label); request.__set_payload(protobufBytes); + request.__set_commit_infos(commit_infos); Status status; int64_t duration_ns = 0; @@ -891,7 +894,7 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); if (st.ok()) { - send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res); + send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, ctx.commit_infos, res); } return st; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1ad8d733ddea07..614caac88e6573 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; @@ -245,6 +246,7 @@ import org.apache.doris.thrift.TTableQueryStats; import org.apache.doris.thrift.TTableRef; import org.apache.doris.thrift.TTableStatus; +import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; @@ -1711,10 +1713,28 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : Config.try_commit_lock_timeout_seconds * 1000; List tables = queryLoadCommitTables(request, db); - return Env.getCurrentGlobalTransactionMgr() + boolean ret = Env.getCurrentGlobalTransactionMgr() .commitAndPublishTransaction(db, tables, request.getTxnId(), TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); + if (Config.isCloudMode()) { + String clusterName = request.getCluster(); + if (ConnectContext.get().getSessionVariable().enableMultiClusterSyncLoad() + && clusterName != null && !clusterName.isEmpty()) { + CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo(); + List> backendsList = infoService + .getCloudClusterNames() + .stream() + .filter(name -> !name.equals(clusterName)) + .map(name -> infoService.getBackendsByClusterName(name)) + .collect(Collectors.toList()); + List allTabletIds = request.getCommitInfos().stream() + .map(TTabletCommitInfo::getTabletId) + .collect(Collectors.toList()); + StmtExecutor.syncLoadForTablets(backendsList, allTabletIds); + } + } + return ret; } @Override @@ -4035,6 +4055,21 @@ public TStatus reportCommitTxnResult(TReportCommitTxnResultRequest request) thro } CommitTxnResponse commitTxnResponse = CommitTxnResponse.parseFrom(receivedProtobufBytes); Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse); + + List allTabletIds = new ArrayList<>(); + TabletCommitInfo.fromThrift(request.getCommitInfos()).forEach(tabletCommitInfo -> { + allTabletIds.add(tabletCommitInfo.getTabletId()); + }); + if (!allTabletIds.isEmpty()) { + CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo(); + List> backendsList = infoService + .getCloudClusterNames() + .stream() + .filter(name -> !name.equals(clusterName)) + .map(name -> infoService.getBackendsByClusterName(name)) + .collect(Collectors.toList()); + StmtExecutor.syncLoadForTablets(backendsList, allTabletIds); + } } catch (InvalidProtocolBufferException e) { // Handle the exception, log it, or take appropriate action e.printStackTrace(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7d1bf264f2f63..f282fd5e82ba5d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -558,7 +558,7 @@ struct TGroupCommitInfo{ 2: optional i64 groupCommitLoadTableId 3: optional string cluster 5: optional bool updateLoadData - 6: optional i64 tableId + 6: optional i64 tableId 7: optional i64 receiveData } @@ -844,7 +844,7 @@ struct TLoadTxnCommitRequest { 17: optional string auth_code_uuid // deprecated, use token instead 18: optional bool groupCommit 19: optional i64 receiveBytes - 20: optional i64 backendId + 20: optional i64 backendId } struct TLoadTxnCommitResult { @@ -1681,6 +1681,7 @@ struct TReportCommitTxnResultRequest { 2: optional i64 txnId 3: optional string label 4: optional binary payload + 5: optional list commit_infos } struct TQueryColumn {