Skip to content

Commit

Permalink
[Fix](TPartitionVersionInfo) Fix duplicate TPartitionVersionInfo in…
Browse files Browse the repository at this point in the history
… `PublishVersionTask.partitionVersionInfos` (#44846)

### What problem does this PR solve?

Problem Summary: When FE handles BEs' tablet report and finds that there
exists some expired txns on BE, it will generate publish version task.
`TPartitionVersionInfo` with same values may be added to
`transactionsToPublish` under same txn id many times when partitions
involved in this failed txn involves many tablets on this BE. Because it
uses `ArrayListMultimap`, these duplicate values may occupy a lot of
memories when the number of tablets is large.

### Release note

Fixed the issue of FE's memory occupation growing too fast in cases of
persistent load and clone failures on merge-on-write tables.

修复了在merge-on-write表上有持续的导入失败和clone失败的情况下,FE使用内存增长过快的问题。
  • Loading branch information
bobhan1 authored Dec 2, 2024
1 parent 21e1d6d commit 303e275
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.collect.TreeMultimap;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
Set<Long> tabletFoundInMeta,
ListMultimap<TStorageMedium, Long> tabletMigrationMap,
Map<Long, Long> partitionVersionSyncMap,
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
List<TTabletMetaInfo> tabletToUpdate,
Expand Down Expand Up @@ -314,7 +315,7 @@ && isLocal(tabletMeta.getStorageMedium())) {
}

private void handleBackendTransactions(long backendId, List<Long> transactionIds, long tabletId,
TabletMeta tabletMeta, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
TabletMeta tabletMeta, Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear) {
GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr();
long partitionId = tabletMeta.getPartitionId();
Expand Down Expand Up @@ -376,15 +377,15 @@ private TPartitionVersionInfo generatePartitionVersionInfoWhenReport(Transaction
}

private void publishPartition(TransactionState transactionState, long transactionId, TabletMeta tabletMeta,
long partitionId, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish) {
long partitionId, Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish) {
TPartitionVersionInfo versionInfo = generatePartitionVersionInfoWhenReport(transactionState,
transactionId, tabletMeta, partitionId);
if (versionInfo != null) {
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(
SetMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(
transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
map = LinkedHashMultimap.create();
transactionsToPublish.put(transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -503,7 +504,7 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
Map<Long, Long> partitionVersionSyncMap = Maps.newConcurrentMap();

// dbid -> txn id -> [partition info]
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap();
Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap();
ListMultimap<Long, Long> transactionsToClear = LinkedListMultimap.create();

// db id -> tablet id
Expand Down Expand Up @@ -1148,14 +1149,14 @@ private static void handleMigration(ListMultimap<TStorageMedium, Long> tabletMet
}

private static void handleRepublishVersionInfo(
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, long backendId) {
Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
long createPublishVersionTaskTime = System.currentTimeMillis();
for (Long dbId : transactionsToPublish.keySet()) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId);
SetMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId);
for (long txnId : map.keySet()) {
PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId,
map.get(txnId), createPublishVersionTaskTime);
Lists.newArrayList(map.get(txnId)), createPublishVersionTaskTime);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
AgentTaskQueue.addTask(task);
Expand Down

0 comments on commit 303e275

Please sign in to comment.