From 1b9fca8ccb3f02e2985e4be6e453596f5e98675c Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 1 Nov 2023 10:43:25 -0700 Subject: [PATCH] fix: task definition serialization (#6306) ### Description A bunch of tiny changes that bring the JSON serialization to be closer to Go. We go for a similar approach to Go of having a new structure for serialization in order to match Go's distinction between `nil` and `[]string{}`. ### Testing Instructions More rust codepath integration tests pass. Many failing ones now have smaller diffs e.g. `dry_json/single_package*.t` only differ by `hashOfExternalDeps` Closes TURBO-1548 --------- Co-authored-by: Chris Olszewski --- crates/turborepo-env/src/lib.rs | 4 +- crates/turborepo-lib/src/config/turbo.rs | 19 ++- crates/turborepo-lib/src/package_graph/mod.rs | 12 +- .../src/run/summary/execution.rs | 2 +- crates/turborepo-lib/src/run/summary/mod.rs | 36 +++-- crates/turborepo-lib/src/run/summary/task.rs | 145 +++++++++++++++--- .../src/run/summary/task_factory.rs | 28 +++- crates/turborepo-lib/src/task_graph/mod.rs | 8 +- .../turborepo-lib/src/task_graph/visitor.rs | 4 +- crates/turborepo-lib/src/task_hash.rs | 31 ++-- .../integration/tests/dry_json/monorepo.t | 49 ------ 11 files changed, 217 insertions(+), 121 deletions(-) diff --git a/crates/turborepo-env/src/lib.rs b/crates/turborepo-env/src/lib.rs index 8379b0c93ef98..43fc5485569b1 100644 --- a/crates/turborepo-env/src/lib.rs +++ b/crates/turborepo-env/src/lib.rs @@ -63,9 +63,9 @@ impl EnvironmentVariableMap { hasher.update(v.as_bytes()); let hash = hasher.finalize(); let hexed_hash = hex::encode(hash); - format!("{}=", hexed_hash) + format!("{k}={hexed_hash}") } else { - format!("{}=", k) + format!("{k}=") } }) .collect() diff --git a/crates/turborepo-lib/src/config/turbo.rs b/crates/turborepo-lib/src/config/turbo.rs index fb3e8b3e11d79..e334fcc2b96f7 100644 --- a/crates/turborepo-lib/src/config/turbo.rs +++ b/crates/turborepo-lib/src/config/turbo.rs @@ -249,8 +249,7 @@ impl TryFrom for BookkeepingTaskDefinition { Ok(dot_env) }) - .transpose()? - .unwrap_or_default(); + .transpose()?; if raw_task.output_mode.is_some() { defined_fields.insert("OutputMode".to_string()); @@ -709,6 +708,20 @@ mod tests { task_definition: TaskDefinitionStable::default() } )] + #[test_case( + r#"{ "dotEnv": [] }"#, + RawTaskDefinition { + dot_env: Some(Vec::new()), + ..RawTaskDefinition::default() + }, + BookkeepingTaskDefinition { + defined_fields: ["DotEnv".to_string()].into_iter().collect(), + experimental_fields: HashSet::new(), + experimental: TaskDefinitionExperiments::default(), + task_definition: TaskDefinitionStable { dot_env: Some(Vec::new()), ..Default::default() } + } + ; "empty dotenv" + )] #[test_case( r#"{ "dependsOn": ["cli#build"], @@ -747,7 +760,7 @@ mod tests { experimental_fields: HashSet::new(), experimental: TaskDefinitionExperiments {}, task_definition: TaskDefinitionStable { - dot_env: vec![RelativeUnixPathBuf::new("package/a/.env").unwrap()], + dot_env: Some(vec![RelativeUnixPathBuf::new("package/a/.env").unwrap()]), env: vec!["OS".to_string()], outputs: TaskOutputs { inclusions: vec!["package/a/dist".to_string()], diff --git a/crates/turborepo-lib/src/package_graph/mod.rs b/crates/turborepo-lib/src/package_graph/mod.rs index 33db5664c2b68..810d5b67e8271 100644 --- a/crates/turborepo-lib/src/package_graph/mod.rs +++ b/crates/turborepo-lib/src/package_graph/mod.rs @@ -54,13 +54,13 @@ impl WorkspaceInfo { } pub fn get_external_deps_hash(&self) -> String { - let mut transitive_deps = Vec::with_capacity( - self.transitive_dependencies - .as_ref() - .map_or(0, |deps| deps.len()), - ); + let Some(transitive_dependencies) = &self.transitive_dependencies else { + return "".into(); + }; + + let mut transitive_deps = Vec::with_capacity(transitive_dependencies.len()); - for dependency in self.transitive_dependencies.iter().flatten() { + for dependency in transitive_dependencies.iter() { transitive_deps.push(dependency.clone()); } diff --git a/crates/turborepo-lib/src/run/summary/execution.rs b/crates/turborepo-lib/src/run/summary/execution.rs index cf85ec3b7ec2b..520900c566327 100644 --- a/crates/turborepo-lib/src/run/summary/execution.rs +++ b/crates/turborepo-lib/src/run/summary/execution.rs @@ -196,7 +196,7 @@ impl<'a> ExecutionSummary<'a> { } fn successful(&self) -> usize { - self.success + self.attempted + self.success + self.cached } } diff --git a/crates/turborepo-lib/src/run/summary/mod.rs b/crates/turborepo-lib/src/run/summary/mod.rs index c4556b59d4241..493dfdb87a975 100644 --- a/crates/turborepo-lib/src/run/summary/mod.rs +++ b/crates/turborepo-lib/src/run/summary/mod.rs @@ -111,7 +111,7 @@ pub struct RunSummary<'a> { global_hash_summary: GlobalHashSummary<'a>, #[serde(skip_serializing_if = "Option::is_none")] execution: Option>, - packages: HashSet, + packages: Vec, env_mode: EnvMode, framework_inference: bool, tasks: Vec, @@ -236,7 +236,7 @@ impl RunTracker { id: Ksuid::new(None, None), version: RUN_SUMMARY_SCHEMA_VERSION.to_string(), turbo_version: self.version, - packages, + packages: packages.into_iter().sorted().collect(), execution: Some(execution_summary), env_mode: run_opts.env_mode.into(), framework_inference: run_opts.framework_inference, @@ -540,7 +540,10 @@ impl<'a> RunSummary<'a> { ui, GREY, " Outputs\t=\t{}", - task.shared.outputs.join(", ") + task.shared + .outputs + .as_ref() + .map_or_else(String::new, |outputs| outputs.join(", ")) )?; cwriteln!( tab_writer, @@ -575,7 +578,10 @@ impl<'a> RunSummary<'a> { ui, GREY, " .env Files Considered\t=\t{}", - task.shared.dot_env.len() + task.shared + .dot_env + .as_ref() + .map_or(0, |dot_env| dot_env.len()) )?; cwriteln!( @@ -609,14 +615,19 @@ impl<'a> RunSummary<'a> { .environment_variables .specified .pass_through_env - .join(", ") + .as_ref() + .map_or_else(String::new, |pass_through_env| pass_through_env.join(", ")) )?; cwriteln!( tab_writer, ui, GREY, " Passed Through Env Vars Values\t=\t{}", - task.shared.environment_variables.pass_through.join(", ") + task.shared + .environment_variables + .pass_through + .as_ref() + .map_or_else(String::new, |vars| vars.join(", ")) )?; // If there's an error, we can silently ignore it, we don't need to block the @@ -640,13 +651,16 @@ impl<'a> RunSummary<'a> { fn format_json(&mut self) -> Result { self.normalize(); - if self.monorepo { - Ok(serde_json::to_string_pretty(&self)?) + let mut rendered_json = if self.monorepo { + serde_json::to_string_pretty(&self) } else { // Deref coercion used to get an immutable reference from the mutable reference. let monorepo_rsm = SinglePackageRunSummary::from(&*self); - Ok(serde_json::to_string_pretty(&monorepo_rsm)?) - } + serde_json::to_string_pretty(&monorepo_rsm) + }?; + // Go produces an extra newline at the end of the JSON + rendered_json.push('\n'); + Ok(rendered_json) } fn normalize(&mut self) { @@ -658,7 +672,7 @@ impl<'a> RunSummary<'a> { // For single packages, we don't need the packages // and each task summary needs some cleaning if !self.monorepo { - self.packages.drain(); + self.packages.clear(); } self.tasks.sort_by(|a, b| a.task_id.cmp(&b.task_id)); diff --git a/crates/turborepo-lib/src/run/summary/task.rs b/crates/turborepo-lib/src/run/summary/task.rs index 6360aa28baf6d..3973db90abb22 100644 --- a/crates/turborepo-lib/src/run/summary/task.rs +++ b/crates/turborepo-lib/src/run/summary/task.rs @@ -7,7 +7,11 @@ use turborepo_cache::CacheResponse; use turborepo_env::{DetailedMap, EnvironmentVariableMap}; use super::{execution::TaskExecutionSummary, EnvMode}; -use crate::{run::task_id::TaskId, task_graph::TaskDefinition}; +use crate::{ + cli::OutputLogsMode, + run::task_id::TaskId, + task_graph::{TaskDefinition, TaskOutputs}, +}; #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] @@ -42,7 +46,7 @@ enum CacheSource { #[serde(rename_all = "camelCase")] pub(crate) struct TaskSummary { pub task_id: TaskId<'static>, - pub dir: String, + pub task: String, pub package: String, #[serde(flatten)] pub shared: SharedTaskSummary>, @@ -66,27 +70,42 @@ pub(crate) struct SharedTaskSummary { pub cache: TaskCacheSummary, pub command: String, pub cli_arguments: Vec, - pub outputs: Vec, - pub excluded_outputs: Vec, + pub outputs: Option>, + pub excluded_outputs: Option>, pub log_file: String, - pub expanded_outputs: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub directory: Option, pub dependencies: Vec, pub dependents: Vec, - pub resolved_task_definition: TaskDefinition, + pub resolved_task_definition: TaskSummaryTaskDefinition, + pub expanded_outputs: Vec, pub framework: String, #[serde(skip_serializing_if = "Option::is_none")] pub execution: Option, pub env_mode: EnvMode, pub environment_variables: TaskEnvVarSummary, - pub dot_env: Vec, + pub dot_env: Option>, } #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct TaskEnvConfiguration { pub env: Vec, - // TODO: we most likely want this to be optional - pub pass_through_env: Vec, + pub pass_through_env: Option>, +} + +#[derive(Debug, Serialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub struct TaskSummaryTaskDefinition { + outputs: Vec, + cache: bool, + depends_on: Vec, + inputs: Vec, + output_mode: OutputLogsMode, + persistent: bool, + env: Vec, + pass_through_env: Option>, + dot_env: Option>, } #[derive(Debug, Serialize, Clone)] @@ -96,7 +115,8 @@ pub struct TaskEnvVarSummary { pub configured: Vec, pub inferred: Vec, - pub pass_through: Vec, + #[serde(rename = "passthrough")] + pub pass_through: Option>, } impl TaskCacheSummary { @@ -153,24 +173,28 @@ impl TaskEnvVarSummary { env_vars: DetailedMap, env_at_execution_start: &EnvironmentVariableMap, ) -> Result { + // TODO: this operation differs from the actual env that gets passed in during + // task execution it should be unified, but first we should copy Go's + // behavior as we try to match the implementations + let pass_through = env_at_execution_start + .from_wildcards( + task_definition + .pass_through_env + .as_deref() + .unwrap_or_default(), + )? + .to_secret_hashable(); Ok(Self { specified: TaskEnvConfiguration { env: task_definition.env.clone(), - pass_through_env: task_definition.pass_through_env.clone().unwrap_or_default(), + pass_through_env: task_definition.pass_through_env.clone(), }, configured: env_vars.by_source.explicit.to_secret_hashable(), inferred: env_vars.by_source.matching.to_secret_hashable(), - // TODO: this operation differs from the actual env that gets passed in during task - // execution it should be unified, but first we should copy Go's behavior as - // we try to match the implementations - pass_through: env_at_execution_start - .from_wildcards( - task_definition - .pass_through_env - .as_deref() - .unwrap_or_default(), - )? - .to_secret_hashable(), + pass_through: match pass_through.is_empty() { + false => Some(pass_through), + true => None, + }, }) } } @@ -209,6 +233,7 @@ impl From>> for SharedTaskSummary { env_mode, environment_variables, dot_env, + .. } = value; Self { hash, @@ -220,6 +245,7 @@ impl From>> for SharedTaskSummary { outputs, excluded_outputs, log_file, + directory: None, expanded_outputs, dependencies: dependencies .into_iter() @@ -241,6 +267,62 @@ impl From>> for SharedTaskSummary { } } +impl From for TaskSummaryTaskDefinition { + fn from(value: TaskDefinition) -> Self { + let TaskDefinition { + outputs: + TaskOutputs { + inclusions, + exclusions, + }, + cache, + mut env, + pass_through_env, + dot_env, + topological_dependencies, + task_dependencies, + mut inputs, + output_mode, + persistent, + } = value; + + let mut outputs = inclusions; + for exclusion in exclusions { + outputs.push(format!("!{exclusion}")); + } + + let mut depends_on = + Vec::with_capacity(task_dependencies.len() + topological_dependencies.len()); + for task_dependency in task_dependencies { + depends_on.push(task_dependency.to_string()); + } + for topological_dependency in topological_dependencies { + depends_on.push(format!("^{topological_dependency}")); + } + + // These _should_ already be sorted when the TaskDefinition struct was + // unmarshaled, but we want to ensure they're sorted on the way out + // also, just in case something in the middle mutates the items. + depends_on.sort(); + outputs.sort(); + env.sort(); + inputs.sort(); + + Self { + outputs, + cache, + depends_on, + inputs, + output_mode, + persistent, + env, + pass_through_env, + // This should _not_ be sorted. + dot_env, + } + } +} + #[cfg(test)] mod test { use serde_json::json; @@ -279,6 +361,25 @@ mod test { }) ; "local cache hit" )] + #[test_case( + TaskSummaryTaskDefinition { + outputs: vec!["foo".into()], + cache: true, + ..Default::default() + }, + json!({ + "outputs": ["foo"], + "cache": true, + "dependsOn": [], + "inputs": [], + "outputMode": "full", + "persistent": false, + "env": [], + "passThroughEnv": null, + "dotEnv": null, + }) + ; "resolved task definition" + )] fn test_serialization(value: impl serde::Serialize, expected: serde_json::Value) { assert_eq!(serde_json::to_value(value).unwrap(), expected); } diff --git a/crates/turborepo-lib/src/run/summary/task_factory.rs b/crates/turborepo-lib/src/run/summary/task_factory.rs index 8cf16463c6657..ae7b102592281 100644 --- a/crates/turborepo-lib/src/run/summary/task_factory.rs +++ b/crates/turborepo-lib/src/run/summary/task_factory.rs @@ -17,6 +17,10 @@ use crate::{ task_hash::TaskHashTracker, }; +const NO_FRAMEWORK_DETECTED: &str = ""; + +const NO_FRAMEWORK_INFERENCE: &str = ""; + pub struct TaskSummaryFactory<'a> { package_graph: &'a PackageGraph, engine: &'a Engine, @@ -69,10 +73,11 @@ impl<'a> TaskSummaryFactory<'a> { }, )?; let package = task_id.package().to_string(); + let task = task_id.task().to_string(); Ok(TaskSummary { task_id, - dir: workspace_info.package_path().to_string(), + task, package, shared, }) @@ -123,7 +128,13 @@ impl<'a> TaskSummaryFactory<'a> { .expanded_outputs(task_id) .unwrap_or_default(); - let framework = self.hash_tracker.framework(task_id).unwrap_or_default(); + let framework = self.hash_tracker.framework(task_id).unwrap_or_else(|| { + match self.run_opts.framework_inference { + true => NO_FRAMEWORK_DETECTED, + false => NO_FRAMEWORK_INFERENCE, + } + .to_string() + }); let hash = self .hash_tracker .hash(task_id) @@ -157,10 +168,17 @@ impl<'a> TaskSummaryFactory<'a> { cache: cache_summary, command, cli_arguments: self.run_opts.pass_through_args.to_vec(), - outputs: task_definition.outputs.inclusions.clone(), - excluded_outputs: task_definition.outputs.exclusions.clone(), + outputs: match task_definition.outputs.inclusions.is_empty() { + false => Some(task_definition.outputs.inclusions.clone()), + true => None, + }, + excluded_outputs: match task_definition.outputs.exclusions.is_empty() { + true => None, + false => Some(task_definition.outputs.exclusions.clone()), + }, log_file, - resolved_task_definition: task_definition.clone(), + directory: Some(workspace_info.package_path().to_string()), + resolved_task_definition: task_definition.clone().into(), expanded_outputs, framework, dependencies, diff --git a/crates/turborepo-lib/src/task_graph/mod.rs b/crates/turborepo-lib/src/task_graph/mod.rs index 18d244924b039..e2520cf507682 100644 --- a/crates/turborepo-lib/src/task_graph/mod.rs +++ b/crates/turborepo-lib/src/task_graph/mod.rs @@ -49,7 +49,7 @@ pub struct TaskDefinitionStable { pub(crate) persistent: bool, pub(crate) env: Vec, pub(crate) pass_through_env: Option>, - pub(crate) dot_env: Vec, + pub(crate) dot_env: Option>, } impl Default for TaskDefinitionStable { @@ -64,13 +64,13 @@ impl Default for TaskDefinitionStable { persistent: false, env: Vec::new(), pass_through_env: None, - dot_env: Vec::new(), + dot_env: None, } } } // Constructed from a RawTaskDefinition -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Deserialize, PartialEq, Clone)] pub struct TaskDefinition { pub outputs: TaskOutputs, pub(crate) cache: bool, @@ -80,7 +80,7 @@ pub struct TaskDefinition { pub(crate) pass_through_env: Option>, - pub(crate) dot_env: Vec, + pub(crate) dot_env: Option>, // TopologicalDependencies are tasks from package dependencies. // E.g. "build" is a topological dependency in: diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 848c200b81756..091a2d01f7b59 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -297,7 +297,7 @@ impl<'a> Visitor<'a> { .lock() .expect("lock poisoned") .push(TaskError::from_spawn(task_id_for_display.clone(), e)); - let _summary = tracker.spawn_failed(error_string).await; + tracker.spawn_failed(error_string).await; callback .send(if continue_on_error { Ok(()) @@ -343,7 +343,7 @@ impl<'a> Visitor<'a> { match exit_status { // The task was successful, nothing special needs to happen. ChildExit::Finished(Some(0)) => { - let _summary = tracker.build_succeeded(0); + tracker.build_succeeded(0).await; } ChildExit::Finished(Some(code)) => { // If there was an error, flush the buffered output diff --git a/crates/turborepo-lib/src/task_hash.rs b/crates/turborepo-lib/src/task_hash.rs index a6bf7698f0393..a2e0f335aa8e3 100644 --- a/crates/turborepo-lib/src/task_hash.rs +++ b/crates/turborepo-lib/src/task_hash.rs @@ -115,21 +115,20 @@ impl PackageInputsHashes { Err(err) => return Some(Err(err.into())), }; - if !task_definition.dot_env.is_empty() { - let absolute_package_path = repo_root.resolve(package_path); - let dot_env_object = match scm.hash_existing_of( - &absolute_package_path, - task_definition - .dot_env - .iter() - .map(|p| p.to_anchored_system_path_buf()), - ) { - Ok(dot_env_object) => dot_env_object, - Err(err) => return Some(Err(err.into())), - }; - - for (key, value) in dot_env_object { - hash_object.insert(key, value); + if let Some(dot_env) = &task_definition.dot_env { + if !dot_env.is_empty() { + let absolute_package_path = repo_root.resolve(package_path); + let dot_env_object = match scm.hash_existing_of( + &absolute_package_path, + dot_env.iter().map(|p| p.to_anchored_system_path_buf()), + ) { + Ok(dot_env_object) => dot_env_object, + Err(err) => return Some(Err(err.into())), + }; + + for (key, value) in dot_env_object { + hash_object.insert(key, value); + } } } @@ -321,7 +320,7 @@ impl<'a> TaskHasher<'a> { .as_deref() .unwrap_or_default(), env_mode: task_env_mode, - dot_env: &task_definition.dot_env, + dot_env: task_definition.dot_env.as_deref().unwrap_or_default(), }; let task_hash = task_hashable.calculate_task_hash(); diff --git a/turborepo-tests/integration/tests/dry_json/monorepo.t b/turborepo-tests/integration/tests/dry_json/monorepo.t index fbd62476086db..cdad6f046dd6b 100644 --- a/turborepo-tests/integration/tests/dry_json/monorepo.t +++ b/turborepo-tests/integration/tests/dry_json/monorepo.t @@ -182,52 +182,3 @@ Tasks that don't exist throw an error $ ${TURBO} run doesnotexist --dry=json Error: Could not find the following tasks in project: doesnotexist [1] - - -# Save JSON to tmp file so we don't need to keep re-running the build - $ ${TURBO} run build --experimental-rust-codepath --dry=json > tmpjson.log - -# test with a regex that captures what release we usually have (1.x.y or 1.a.b-canary.c) - $ cat tmpjson.log | jq .turboVersion - "[a-z0-9\.-]+" (re) - - $ cat tmpjson.log | jq .globalCacheInputs - { - "rootKey": "HEY STELLLLLLLAAAAAAAAAAAAA", - "files": { - "foo.txt": "eebae5f3ca7b5831e429e947b7d61edd0de69236" - }, - "hashOfExternalDependencies": "459c029558afe716", - "globalDotEnv": null, - "environmentVariables": { - "specified": { - "env": [ - "SOME_ENV_VAR" - ], - "passThroughEnv": null - }, - "configured": [], - "inferred": [], - "passthrough": null - } - } - - $ cat tmpjson.log | jq 'keys' - [ - "envMode", - "frameworkInference", - "globalCacheInputs", - "id", - "monorepo", - "packages", - "scm", - "tasks", - "turboVersion", - "user", - "version" - ] - -Tasks that don't exist throw an error - $ ${TURBO} run doesnotexist --dry=json - Error: Could not find the following tasks in project: doesnotexist - [1]