From e9290340c899ad94c4f8c6a8964de18464321f5c Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Wed, 6 Nov 2024 13:55:27 +0800 Subject: [PATCH] [fix](export) remove export task executor in TransientTaskExecutor and fix concurrency issue (#42880)(#43051)(#43109)(#43250) (#43305) cherry pick from (#42880)(#43051)(#43109)(#43250) --- .../org/apache/doris/analysis/ExportStmt.java | 2 +- .../java/org/apache/doris/catalog/Env.java | 8 ---- .../java/org/apache/doris/load/ExportJob.java | 20 +++++----- .../java/org/apache/doris/load/ExportMgr.java | 30 +++++++++----- .../trees/plans/commands/ExportCommand.java | 2 +- .../scheduler/disruptor/TaskHandler.java | 2 + .../manager/TransientTaskManager.java | 15 ++++++- .../registry/ExportTaskRegister.java | 40 ------------------- .../doris/analysis/CancelExportStmtTest.java | 1 - 9 files changed, 48 insertions(+), 72 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index a9ce85b2d3e078..ba7aa50ec69595 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -208,7 +208,7 @@ public void analyze(Analyzer analyzer) throws UserException { } private void setJob() throws UserException { - exportJob = new ExportJob(); + exportJob = new ExportJob(Env.getCurrentEnv().getNextId()); Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb()); exportJob.setDbId(db.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 47caa710f5ecef..dcc32d8276fb56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -254,7 +254,6 @@ import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher; import org.apache.doris.scheduler.manager.TransientTaskManager; -import org.apache.doris.scheduler.registry.ExportTaskRegister; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; @@ -395,7 +394,6 @@ public class Env { private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; - private ExportTaskRegister exportTaskRegister; private JobManager, ?> jobManager; private LabelProcessor labelProcessor; private TransientTaskManager transientTaskManager; @@ -709,7 +707,6 @@ public Env(boolean isCheckpointCatalog) { this.jobManager = new JobManager<>(); this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); - this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); @@ -4418,11 +4415,6 @@ public SyncJobManager getSyncJobManager() { return this.syncJobManager; } - - public ExportTaskRegister getExportTaskRegister() { - return exportTaskRegister; - } - public JobManager getJobManager() { return jobManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 33418531f2cda8..e77b0517d953be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -98,7 +98,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Data @@ -207,9 +206,7 @@ public class ExportJob implements Writable { // backend_address => snapshot path private List> snapshotPaths = Lists.newArrayList(); - private List jobExecutorList; - - private ConcurrentHashMap taskIdToExecutor = new ConcurrentHashMap<>(); + private List jobExecutorList = Lists.newArrayList(); private Integer finishedTaskCount = 0; private List> allOutfileInfo = Lists.newArrayList(); @@ -399,8 +396,8 @@ private StatementBase generateLogicalPlanAdapter(LogicalPlan outfileLogicalPlan) return statementBase; } - public List getTaskExecutors() { - return jobExecutorList; + public List getCopiedTaskExecutors() { + return Lists.newArrayList(jobExecutorList); } private void generateExportJobExecutor() { @@ -690,11 +687,11 @@ private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws } // we need cancel all task - taskIdToExecutor.keySet().forEach(id -> { + jobExecutorList.forEach(executor -> { try { - Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id); + Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId()); } catch (JobException e) { - LOG.warn("cancel export task {} exception: {}", id, e); + LOG.warn("cancel export task {} exception: {}", executor.getId(), e); } }); @@ -705,10 +702,12 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms setExportJobState(ExportJobState.CANCELLED); finishTimeMs = System.currentTimeMillis(); failMsg = new ExportFailMsg(type, msg); + jobExecutorList.clear(); if (FeConstants.runningUnitTest) { return; } Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED); + LOG.info("cancel export job {}", id); } private void exportExportJob() { @@ -749,7 +748,10 @@ private void finishExportJobUnprotected() { setExportJobState(ExportJobState.FINISHED); finishTimeMs = System.currentTimeMillis(); outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo); + // Clear the jobExecutorList to release memory. + jobExecutorList.clear(); Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED); + LOG.info("finish export job {}", id); } public void replayExportJobState(ExportJobState newState) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 7dbe953cf9bdbc..49ebbfe7dcddb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -67,8 +67,8 @@ public class ExportMgr { // dbid ->