Skip to content

Commit

Permalink
[feature](load) collect loaded rows on table level after txn published (
Browse files Browse the repository at this point in the history
apache#24346)

As title.

Stream load 20 lines

```
2023-09-14 11:40:04,186 DEBUG (PUBLISH_VERSION|23) [DatabaseTransactionMgr.updateCatalogAfterVisible():1769] table id to loaded rows:{51016=20}
```

```
mysql> select count(*) from dup_tbl_basic;
+----------+
| count(*) |
+----------+
|       20 |
+----------+
1 row in set (0.05 sec)
```
  • Loading branch information
TangSiyang2001 authored Sep 19, 2023
1 parent 80bcb43 commit b092bda
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 9 deletions.
6 changes: 4 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1529,14 +1529,16 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
std::map<TTabletId, int64_t> tablet_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablets, &discontinuous_version_tablets);
&succ_tablets, &discontinuous_version_tablets,
&tablet_id_to_num_delta_rows);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
Expand Down Expand Up @@ -1621,7 +1623,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));

finish_task_request.__set_tablet_id_to_delta_num_rows(tablet_id_to_num_delta_rows);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ void TabletPublishStatistics::record_in_bvar() {
EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets,
std::map<TTabletId, int64_t>* tablet_id_to_num_delta_rows)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
_discontinuous_version_tablets(discontinuous_version_tablets),
_tablet_id_to_num_delta_rows(tablet_id_to_num_delta_rows) {}

void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
Expand Down Expand Up @@ -186,6 +188,9 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
auto rowset_meta_ptr = rowset->rowset_meta();
_tablet_id_to_num_delta_rows->insert(
{rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()});
auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ class EnginePublishVersionTask : public EngineTask {
EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
~EnginePublishVersionTask() {}
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets,
std::map<TTabletId, int64_t>* tablet_id_to_num_delta_rows);
~EnginePublishVersionTask() override = default;

virtual Status finish() override;
Status finish() override;

void add_error_tablet_id(int64_t tablet_id);

Expand All @@ -102,6 +103,7 @@ class EnginePublishVersionTask : public EngineTask {
std::set<TTabletId>* _error_tablet_ids;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
std::map<TTabletId, int64_t>* _tablet_id_to_num_delta_rows;
};

class AsyncTabletPublishTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
// not remove the task from queue and be will retry
return;
}
if (request.isSetTabletIdToDeltaNumRows()) {
publishVersionTask.setTabletIdToDeltaNumRows(request.getTabletIdToDeltaNumRows());
}
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
publishVersionTask.getTaskType(),
publishVersionTask.getSignature());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.thrift.TPublishVersionRequest;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask {
// tabletId => version, current version = 0
private Map<Long, Long> succTablets;

/**
* To collect loaded rows for each tablet from each BE
*/
private final Map<Long, Long> tabletIdToDeltaNumRows = Maps.newHashMap();

public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime);
Expand Down Expand Up @@ -81,4 +87,12 @@ public synchronized void addErrorTablets(List<Long> errorTablets) {
}
this.errorTablets.addAll(errorTablets);
}

public void setTabletIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) {
this.tabletIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
}

public Map<Long, Long> getTabletIdToDeltaNumRows() {
return tabletIdToDeltaNumRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.ClearTransactionTask;
Expand Down Expand Up @@ -1787,6 +1788,9 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
}
}
}
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
LOG.debug("table id to loaded rows:{}", transactionState.getTableIdToNumDeltaRows());
transactionState.getTableIdToNumDeltaRows().forEach(analysisManager::updateUpdatedRows);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.transaction;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
Expand All @@ -29,13 +31,17 @@
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

public class PublishVersionDaemon extends MasterDaemon {

Expand Down Expand Up @@ -121,12 +127,39 @@ private void publishVersion() {
AgentTaskExecutor.submit(batchTask);
}

TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex();
Set<Long> tabletIdFilter = Sets.newHashSet();
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream()
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
.getPublishVersionTasks()
.values()
.stream()
.peek(task -> {
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
Map<Long, Long> tabletIdToDeltaNumRows =
task.getTabletIdToDeltaNumRows();
tabletIdToDeltaNumRows.forEach((tabletId, numRows) -> {
if (!tabletIdFilter.add(tabletId)) {
// means the delta num rows for this tablet id has been collected
return;
}
TabletMeta tabletMeta = tabletInvertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
// for delete, drop, schema change etc. here may be a null value
return;
}
long tableId = tabletMeta.getTableId();
tableIdToNumDeltaRows.computeIfPresent(tableId, (tblId, orgNum) -> orgNum + numRows);
tableIdToNumDeltaRows.putIfAbsent(tableId, numRows);
});
}
});
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout();
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout();
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ public String toString() {
// tbl id -> (index ids)
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();

private Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();

private String errorLogUrl = null;

// record some error msgs during the transaction operation.
Expand Down Expand Up @@ -701,6 +703,14 @@ public void readFields(DataInput in) throws IOException {
}
}

public Map<Long, Long> getTableIdToNumDeltaRows() {
return tableIdToNumDeltaRows;
}

public void setTableIdToNumDeltaRows(Map<Long, Long> tableIdToNumDeltaRows) {
this.tableIdToNumDeltaRows.putAll(tableIdToNumDeltaRows);
}

public void setErrorMsg(String errMsg) {
this.errMsg = errMsg;
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/MasterService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct TFinishTaskRequest {
15: optional i64 copy_size
16: optional i64 copy_time_ms
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
18: optional map<i64, i64> tablet_id_to_delta_num_rows
}

struct TTablet {
Expand Down

0 comments on commit b092bda

Please sign in to comment.