From 7181f2c39b9b8b359b9e41f8af35f64edaa3baae Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 5 Dec 2024 18:15:34 +0200 Subject: [PATCH] misc: fix tests --- crates/pixi_build_frontend/src/tool/cache.rs | 74 +++++++++++++------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index 19d939437..8a917a316 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -143,16 +143,14 @@ impl ToolCache { // Explicitly drop the entry, so we don't block any other tasks. drop(entry); - // // // Drop the sender + // // Drop the sender drop(sender); - eprintln!("I wait"); return match receiver.recv().await { Ok(tool) => Ok(tool), Err(err) => miette::bail!( - "a coalesced tool {} request install failed reason {}", - spec.command, - err + "installing of {} tool failed. Reason: {err}", + spec.command ), }; } else { @@ -183,10 +181,7 @@ impl ToolCache { // the error. This will drop the sender and all other waiting tasks will // receive an error. // Installation happens outside the critical section - None => { - eprintln!("going to install tool"); - context.install(&spec, channel_config).await? - } + None => context.install(&spec, channel_config).await?, Some(tool) => tool, }; @@ -296,7 +291,7 @@ mod tests { ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness, Platform, }; use reqwest_middleware::ClientWithMiddleware; - use tokio::sync::{Barrier, Mutex}; + use tokio::sync::{Barrier, Mutex, Semaphore}; use crate::{ tool::{ @@ -482,8 +477,7 @@ mod tests { assert_eq!(install_count, &1); } - // #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_handle_a_failure() { // This test verifies that during the installation of a tool, if an error occurs // the tool is not cached and the next request will try to install the tool again. @@ -503,12 +497,9 @@ mod tests { let count = count.entry(spec.clone()).or_insert(0); *count += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - // if count == &1 { - // tokio::time::sleep(std::time::Duration::from_millis(100)).await; - // miette::bail!("error on first request"); - // } + if count == &1 { + miette::bail!("error on first request"); + } let isolated_tool = IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default()); @@ -536,24 +527,33 @@ mod tests { channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]), }; - // Let's imitate that we have 4 requests to install a tool - // we will use a barrier to ensure all tasks start at the same time. - let num_tasks = 4; - let barrier = Arc::new(Barrier::new(num_tasks)); let mut handles = Vec::new(); - for _ in 0..num_tasks { - let barrier = barrier.clone(); + // We need to test that failure of one task will not block other tasks + // to execute. + // To test it we want to synchronize the installation of the tool + // in the following way + // first task will fail, and set the semaphore to true + // so other task can proceed to execute. + // in this way we can verify that we handle a task failure correctly + // and other tasks can proceed to install the tool. - let tool_context = tool_context.clone(); + // It is is necessary to do it in this way because + // without synchronization, all tasks will be blocked on the waiting stage + // and failure of one task will be propagated to all other tasks. + + let semaphore = Arc::new(Semaphore::new(1)); + { + let semaphore = semaphore.clone(); + let tool_context = tool_context.clone(); let tool_installer = tool_installer.clone(); let channel_config = channel_config.clone(); let tool_spec = tool_spec.clone(); let handle = tokio::spawn(async move { - barrier.wait().await; + let _sem = semaphore.acquire().await.unwrap(); tool_context .cache @@ -565,7 +565,29 @@ mod tests { ) .await }); + handles.push(handle); + } + { + let semaphore = semaphore.clone(); + let tool_context = tool_context.clone(); + let tool_installer = tool_installer.clone(); + + let channel_config = channel_config.clone(); + let tool_spec = tool_spec.clone(); + + let handle = tokio::spawn(async move { + let _sem = semaphore.acquire().await.unwrap(); + tool_context + .cache + .get_or_install_tool( + tool_spec, + &tool_installer, + &tool_context.cache_dir, + &channel_config, + ) + .await + }); handles.push(handle); }