Skip to content

Commit

Permalink
feat: hook up task execution tracking (#6222)
Browse files Browse the repository at this point in the history
### Description

Quick PR to get reviewed while I work on the component that allows us to
communicate the generated task summaries up to the primary run summary.

All this does is hook up the task tracking to the graph walk which does
get us the correct summary output in the terminal.

I suggest reviewing each PR on it's own as they're all very
tiny/reviewable changes.

### Testing Instructions

More integration tests should pass on this PR due to the footer being
correctly populated:
On main:
`# Ran 114 tests, 0 skipped, 71 failed.`

This PR:
`# Ran 114 tests, 0 skipped, 57 failed.`


Closes TURBO-1497

---------

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski authored Oct 19, 2023
1 parent fa5e0ec commit 9fd5cea
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
39 changes: 33 additions & 6 deletions crates/turborepo-lib/src/run/summary/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl ExecutionSummary<'_> {
"Tasks",
format!(
"{}, {} total",
color!(ui, BOLD_GREEN, "{} successful", self.success),
color!(ui, BOLD_GREEN, "{} successful", self.successful()),
self.attempted
),
),
Expand Down Expand Up @@ -173,6 +173,10 @@ impl ExecutionSummary<'_> {

println!();
}

fn successful(&self) -> usize {
self.success + self.attempted
}
}

/// The final states of all task executions
Expand Down Expand Up @@ -216,9 +220,10 @@ enum Event {
#[derive(Debug, Serialize)]
pub enum ExecutionState {
Canceled,
Built { exit_code: u32 },
Built { exit_code: i32 },
Cached,
BuildFailed { exit_code: u32, err: String },
BuildFailed { exit_code: i32, err: String },
SpawnFailed { err: String },
}

#[derive(Debug, Serialize)]
Expand All @@ -231,7 +236,7 @@ pub struct TaskExecutionSummary {
}

impl TaskExecutionSummary {
pub fn exit_code(&self) -> Option<u32> {
pub fn exit_code(&self) -> Option<i32> {
match self.state {
ExecutionState::BuildFailed { exit_code, .. } | ExecutionState::Built { exit_code } => {
Some(exit_code)
Expand Down Expand Up @@ -359,7 +364,7 @@ impl TaskTracker<chrono::DateTime<Local>> {
}
}

pub async fn build_succeeded(self, exit_code: u32) -> TaskExecutionSummary {
pub async fn build_succeeded(self, exit_code: i32) -> TaskExecutionSummary {
let Self {
sender, started_at, ..
} = self;
Expand All @@ -382,7 +387,7 @@ impl TaskTracker<chrono::DateTime<Local>> {

pub async fn build_failed(
self,
exit_code: u32,
exit_code: i32,
error: impl fmt::Display,
) -> TaskExecutionSummary {
let Self {
Expand All @@ -406,6 +411,28 @@ impl TaskTracker<chrono::DateTime<Local>> {
},
}
}

pub async fn spawn_failed(self, error: impl fmt::Display) -> TaskExecutionSummary {
let Self {
sender, started_at, ..
} = self;

let ended_at = Local::now();
let duration = TurboDuration::new(&started_at, &Local::now());

sender
.send(Event::BuildFailed)
.await
.expect("summary state thread finished");
TaskExecutionSummary {
started_at: started_at.timestamp_millis(),
ended_at: ended_at.timestamp_millis(),
duration,
state: ExecutionState::SpawnFailed {
err: error.to_string(),
},
}
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions crates/turborepo-lib/src/run/summary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use turborepo_api_client::{spaces::CreateSpaceRunPayload, APIAuth, APIClient};
use turborepo_env::EnvironmentVariableMap;
use turborepo_ui::{color, cprintln, cwriteln, BOLD, BOLD_CYAN, GREY, UI};

use self::execution::TaskTracker;
use super::task_id::TaskId;
use crate::{
cli,
opts::RunOpts,
Expand Down Expand Up @@ -257,6 +259,10 @@ impl RunTracker {
.finish(end_time, exit_code, pkg_dep_graph, ui)
.await
}

pub fn track_task(&self, task_id: TaskId<'static>) -> TaskTracker<()> {
self.execution_tracker.task_tracker(task_id)
}
}

// This is an exact copy of RunSummary, but the JSON tags are structured
Expand Down
18 changes: 17 additions & 1 deletion crates/turborepo-lib/src/task_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,14 @@ impl<'a> Visitor<'a> {
let errors = errors.clone();
let task_id_for_display = self.display_task_id(&info);
let hash_tracker = self.task_hasher.task_hash_tracker();
let tracker = self.run_tracker.track_task(info.clone().into_owned());

let parent_span = Span::current();
tasks.push(tokio::spawn(async move {
let span = tracing::debug_span!("execute_task", task = %info.task());
span.follows_from(parent_span.id());
let _enter = span.enter();
let tracker = tracker.start().await;

let task_id = info;
let mut task_cache = task_cache;
Expand All @@ -250,6 +252,7 @@ impl<'a> Visitor<'a> {
task_id,
task_cache.expanded_outputs().to_vec(),
);
let _summary = tracker.cached().await;
callback.send(Ok(())).ok();
return;
}
Expand Down Expand Up @@ -277,6 +280,9 @@ impl<'a> Visitor<'a> {
Err(e) => {
error!("failed to capture outputs for \"{task_id}\": {e}");
manager.stop().await;
// If we have an internal failure of being unable setup log capture we
// mark it as cancelled.
let _summary = tracker.cancel();
callback.send(Err(StopExecution)).ok();
return;
}
Expand All @@ -288,10 +294,12 @@ impl<'a> Visitor<'a> {
Some(Err(e)) => {
// Note: we actually failed to spawn, but this matches the Go output
prefixed_ui.error(format!("command finished with error: {e}"));
let error_string = e.to_string();
errors
.lock()
.expect("lock poisoned")
.push(TaskError::from_spawn(task_id_for_display.clone(), e));
let _summary = tracker.spawn_failed(error_string).await;
callback
.send(if continue_on_error {
Ok(())
Expand All @@ -305,6 +313,7 @@ impl<'a> Visitor<'a> {
// Turbo is shutting down
None => {
callback.send(Ok(())).ok();
let _summary = tracker.cancel();
return;
}
};
Expand All @@ -316,6 +325,7 @@ impl<'a> Visitor<'a> {
Ok(Some(exit_status)) => exit_status,
Err(e) => {
error!("unable to pipe outputs from command: {e}");
let _summary = tracker.cancel();
callback.send(Err(StopExecution)).ok();
manager.stop().await;
return;
Expand All @@ -326,21 +336,26 @@ impl<'a> Visitor<'a> {
// None. Is it still running?
error!("unable to determine why child exited");
manager.stop().await;
let _summary = tracker.cancel();
callback.send(Err(StopExecution)).ok();
return;
}
};

match exit_status {
// The task was successful, nothing special needs to happen.
ChildExit::Finished(Some(0)) => (),
ChildExit::Finished(Some(0)) => {
let _summary = tracker.build_succeeded(0);
}
ChildExit::Finished(Some(code)) => {
// If there was an error, flush the buffered output
if let Err(e) = task_cache.on_error(&mut prefixed_ui) {
error!("error reading logs: {e}");
}
let error =
TaskErrorCause::from_execution(process.label().to_string(), code);
// TODO pass actual code
let _summary = tracker.build_failed(0, error.to_string()).await;
if continue_on_error {
prefixed_ui.warn("command finished with error, but continuing...");
callback.send(Ok(())).ok();
Expand All @@ -361,6 +376,7 @@ impl<'a> Visitor<'a> {
| ChildExit::KilledExternal
| ChildExit::Failed => {
manager.stop().await;
let _summary = tracker.cancel();
callback.send(Err(StopExecution)).ok();
return;
}
Expand Down
7 changes: 4 additions & 3 deletions turborepo-tests/integration/tests/run-summary/monorepo.t
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ Setup
$ cat $FIRST | jq '.execution.exitCode'
0
$ cat $FIRST | jq '.execution.attempted'
0
2
Note this differs from the Go codepath because we don't clear the cache so it is a cache hit
$ cat $FIRST | jq '.execution.cached'
0
2
$ cat $FIRST | jq '.execution.failed'
0
$ cat $FIRST | jq '.execution.success'
0
$ cat $FIRST | jq '.execution.startTime'
[0-9]+ (re)
$ cat $FIRST | jq '.execution.endTime'
[0-9]+ (re)
[0-9]+ (re)

0 comments on commit 9fd5cea

Please sign in to comment.