Skip to content

Commit

Permalink
[fix](export) remove export task executor in TransientTaskExecutor an…
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored Nov 6, 2024
1 parent 9096671 commit e929034
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void analyze(Analyzer analyzer) throws UserException {
}

private void setJob() throws UserException {
exportJob = new ExportJob();
exportJob = new ExportJob(Env.getCurrentEnv().getNextId());

Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
exportJob.setDbId(db.getId());
Expand Down
8 changes: 0 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
Expand Down Expand Up @@ -395,7 +394,6 @@ public class Env {
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;

private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
Expand Down Expand Up @@ -709,7 +707,6 @@ public Env(boolean isCheckpointCatalog) {
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);

this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
Expand Down Expand Up @@ -4418,11 +4415,6 @@ public SyncJobManager getSyncJobManager() {
return this.syncJobManager;
}


public ExportTaskRegister getExportTaskRegister() {
return exportTaskRegister;
}

public JobManager getJobManager() {
return jobManager;
}
Expand Down
20 changes: 11 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Data
Expand Down Expand Up @@ -207,9 +206,7 @@ public class ExportJob implements Writable {
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();

private List<ExportTaskExecutor> jobExecutorList;

private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>();
private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();

private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
Expand Down Expand Up @@ -399,8 +396,8 @@ private StatementBase generateLogicalPlanAdapter(LogicalPlan outfileLogicalPlan)
return statementBase;
}

public List<? extends TransientTaskExecutor> getTaskExecutors() {
return jobExecutorList;
public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() {
return Lists.newArrayList(jobExecutorList);
}

private void generateExportJobExecutor() {
Expand Down Expand Up @@ -690,11 +687,11 @@ private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws
}

// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
jobExecutorList.forEach(executor -> {
try {
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
LOG.warn("cancel export task {} exception: {}", executor.getId(), e);
}
});

Expand All @@ -705,10 +702,12 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
jobExecutorList.clear();
if (FeConstants.runningUnitTest) {
return;
}
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED);
LOG.info("cancel export job {}", id);
}

private void exportExportJob() {
Expand Down Expand Up @@ -749,7 +748,10 @@ private void finishExportJobUnprotected() {
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
// Clear the jobExecutorList to release memory.
jobExecutorList.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED);
LOG.info("finish export job {}", id);
}

public void replayExportJobState(ExportJobState newState) {
Expand Down
30 changes: 19 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public class ExportMgr {
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap();

// lock for export job
// lock is private and must use after db lock
// lock for protecting export jobs.
// need to be added when creating or cancelling export job.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

public ExportMgr() {
Expand All @@ -95,8 +95,6 @@ private void writeUnlock() {
}

public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
long jobId = Env.getCurrentEnv().getNextId();
job.setId(jobId);
writeLock();
try {
if (dbTolabelToExportJobId.containsKey(job.getDbId())
Expand All @@ -117,15 +115,17 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
job.getTaskExecutors().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
// ATTN: Must add task after edit log, otherwise the job may finish before adding job.
job.getCopiedTaskExecutors().forEach(executor -> {
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
});
LOG.info("add export job. {}", job);

} finally {
writeUnlock();
}
LOG.info("add export job. {}", job);

}

public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
Expand All @@ -142,6 +142,11 @@ public void cancelExportJob(CancelExportStmt stmt) throws DdlException, Analysis

// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, stmt.getDbName(), matchExportJobs);
// Must add lock to protect export job.
// Because job may be cancelled when generating task executors,
// the cancel process may clear the task executor list at same time,
// which will cause ConcurrentModificationException
writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
Expand All @@ -150,6 +155,8 @@ public void cancelExportJob(CancelExportStmt stmt) throws DdlException, Analysis
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
} finally {
writeUnlock();
}
}

Expand Down Expand Up @@ -464,16 +471,17 @@ public void replayCreateExportJob(ExportJob job) {
}

public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
readLock();
writeLock();
try {
LOG.info("replay update export job: {}, {}", stateTransfer.getJobId(), stateTransfer.getState());
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
job.replayExportJobState(stateTransfer.getState());
job.setStartTimeMs(stateTransfer.getStartTimeMs());
job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
job.setFailMsg(stateTransfer.getFailMsg());
job.setOutfileInfo(stateTransfer.getOutFileInfo());
} finally {
readUnlock();
writeUnlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void checkBrokerDesc(ConnectContext ctx) throws UserException {

private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
throws UserException {
ExportJob exportJob = new ExportJob();
ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
// set export job and check catalog/db/table
CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public void onTransientTaskHandle(TaskEvent taskEvent) {
taskExecutor.execute();
} catch (JobException e) {
log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
} finally {
transientTaskManager.removeMemoryTask(taskId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.doris.scheduler.executor.TransientTaskExecutor;

import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ConcurrentHashMap;

public class TransientTaskManager {
private static final Logger LOG = LogManager.getLogger(TransientTaskManager.class);
/**
* key: taskId
* value: memory task executor of this task
Expand Down Expand Up @@ -57,10 +60,20 @@ public Long addMemoryTask(TransientTaskExecutor executor) {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
LOG.info("add memory task, taskId: {}", taskId);
return taskId;
}

public void cancelMemoryTask(Long taskId) throws JobException {
taskExecutorMap.get(taskId).cancel();
try {
taskExecutorMap.get(taskId).cancel();
} finally {
removeMemoryTask(taskId);
}
}

public void removeMemoryTask(Long taskId) {
taskExecutorMap.remove(taskId);
LOG.info("remove memory task, taskId: {}", taskId);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ public void testExportMgrCancelJob() throws UserException {
exportMgr.unprotectAddJob(job3);
exportMgr.unprotectAddJob(job4);


// cancel export job where state = "PENDING"
Assert.assertTrue(job1.getState() == ExportJobState.PENDING);
SlotRef stateSlotRef = new SlotRef(null, "state");
Expand Down

0 comments on commit e929034

Please sign in to comment.