Skip to content

Commit

Permalink
Reschedule replacement tasks in case of artificial failure due to mis…
Browse files Browse the repository at this point in the history
…sing spooling output size

The code introduced in #22298 was
lacking rescheduling replacement tasks. As a result query execution got
stuck after we observed missing spooling output stats for task.
  • Loading branch information
losipiuk committed Jun 24, 2024
1 parent 37311c6 commit f8d373f
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,8 @@ public Void onRemoteTaskCompleted(RemoteTaskCompletedEvent event)
TaskState taskState = taskStatus.getState();
StageExecution stageExecution = getStageExecution(taskId.getStageId());
if (taskState == TaskState.FINISHED) {
stageExecution.taskFinished(taskId, taskStatus);
Optional<List<PrioritizedScheduledTask>> failOverrideReplacementTasks = stageExecution.taskFinished(taskId, taskStatus);
failOverrideReplacementTasks.ifPresent(prioritizedScheduledTasks -> prioritizedScheduledTasks.forEach(schedulingQueue::addOrUpdate));
}
else if (taskState == TaskState.FAILED) {
ExecutionFailureInfo failureInfo = taskStatus.getFailures().stream()
Expand Down Expand Up @@ -2317,10 +2318,10 @@ public void finalizeUpdateOfExchangeSinkInstanceHandle(TaskId taskId, ExchangeSi
partition.updateExchangeSinkInstanceHandle(taskId, updatedExchangeSinkInstanceHandle);
}

public void taskFinished(TaskId taskId, TaskStatus taskStatus)
public Optional<List<PrioritizedScheduledTask>> taskFinished(TaskId taskId, TaskStatus taskStatus)
{
if (getState().isDone()) {
return;
return Optional.empty();
}

int partitionId = taskId.getPartitionId();
Expand All @@ -2331,8 +2332,7 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus)
// it is rare but possible to get empty spooling output stats for task which completed successfully.
// As we need this information in FTE mode we need to fail such task artificially
log.warn("Failing task " + taskId + " because we received empty spooling output stats");
taskFailed(taskId, Failures.toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "Treating FINISHED task as FAILED because we received empty spooling output stats")), taskStatus);
return;
return Optional.of(taskFailed(taskId, Failures.toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "Treating FINISHED task as FAILED because we received empty spooling output stats")), taskStatus));
}

exchange.sinkFinished(partition.getExchangeSinkHandle(), taskId.getAttemptId());
Expand All @@ -2343,7 +2343,7 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus)

if (!remainingPartitions.remove(partitionId)) {
// a different task for the same partition finished before
return;
return Optional.empty();
}

updateOutputSize(outputStats.orElseThrow());
Expand All @@ -2359,6 +2359,7 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus)
if (noMorePartitions && remainingPartitions.isEmpty() && !stage.getState().isDone()) {
finish();
}
return Optional.empty();
}

private void finish()
Expand Down

0 comments on commit f8d373f

Please sign in to comment.