Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip][fix](cloud) inform reader cluster after streamload in multi-cluster #45625

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,13 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {

// async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP
static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
const std::string& label, CommitTxnResponse& res) {
const std::string& label,
const std::vector<TTabletCommitInfo>& commit_infos,
CommitTxnResponse& res) {
std::string protobufBytes;
res.SerializeToString(&protobufBytes);
auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
[db_id, txn_id, label, protobufBytes]() -> Status {
[db_id, txn_id, label, protobufBytes, &commit_infos]() -> Status {
TReportCommitTxnResultRequest request;
TStatus result;

Expand All @@ -838,6 +840,7 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
request.__set_txnId(txn_id);
request.__set_label(label);
request.__set_payload(protobufBytes);
request.__set_commit_infos(commit_infos);

Status status;
int64_t duration_ns = 0;
Expand Down Expand Up @@ -891,7 +894,7 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn);

if (st.ok()) {
send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, ctx.commit_infos, res);
}

return st;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
Expand Down Expand Up @@ -245,6 +246,7 @@
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableRef;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -1711,10 +1713,28 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2
: Config.try_commit_lock_timeout_seconds * 1000;
List<Table> tables = queryLoadCommitTables(request, db);
return Env.getCurrentGlobalTransactionMgr()
boolean ret = Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tables, request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
if (Config.isCloudMode()) {
String clusterName = request.getCluster();
if (ConnectContext.get().getSessionVariable().enableMultiClusterSyncLoad()
&& clusterName != null && !clusterName.isEmpty()) {
CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
List<List<Backend>> backendsList = infoService
.getCloudClusterNames()
.stream()
.filter(name -> !name.equals(clusterName))
.map(name -> infoService.getBackendsByClusterName(name))
.collect(Collectors.toList());
List<Long> allTabletIds = request.getCommitInfos().stream()
.map(TTabletCommitInfo::getTabletId)
.collect(Collectors.toList());
StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
}
}
return ret;
}

@Override
Expand Down Expand Up @@ -4035,6 +4055,21 @@ public TStatus reportCommitTxnResult(TReportCommitTxnResultRequest request) thro
}
CommitTxnResponse commitTxnResponse = CommitTxnResponse.parseFrom(receivedProtobufBytes);
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse);

List<Long> allTabletIds = new ArrayList<>();
TabletCommitInfo.fromThrift(request.getCommitInfos()).forEach(tabletCommitInfo -> {
allTabletIds.add(tabletCommitInfo.getTabletId());
});
if (!allTabletIds.isEmpty()) {
CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
List<List<Backend>> backendsList = infoService
.getCloudClusterNames()
.stream()
.filter(name -> !name.equals(clusterName))
.map(name -> infoService.getBackendsByClusterName(name))
.collect(Collectors.toList());
StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
}
} catch (InvalidProtocolBufferException e) {
// Handle the exception, log it, or take appropriate action
e.printStackTrace();
Expand Down
5 changes: 3 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ struct TGroupCommitInfo{
2: optional i64 groupCommitLoadTableId
3: optional string cluster
5: optional bool updateLoadData
6: optional i64 tableId
6: optional i64 tableId
7: optional i64 receiveData
}

Expand Down Expand Up @@ -844,7 +844,7 @@ struct TLoadTxnCommitRequest {
17: optional string auth_code_uuid // deprecated, use token instead
18: optional bool groupCommit
19: optional i64 receiveBytes
20: optional i64 backendId
20: optional i64 backendId
}

struct TLoadTxnCommitResult {
Expand Down Expand Up @@ -1681,6 +1681,7 @@ struct TReportCommitTxnResultRequest {
2: optional i64 txnId
3: optional string label
4: optional binary payload
5: optional list<Types.TTabletCommitInfo> commit_infos
}

struct TQueryColumn {
Expand Down
Loading