Skip to content

Commit

Permalink
misc: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nichmor committed Dec 5, 2024
1 parent 03c9525 commit 7181f2c
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions crates/pixi_build_frontend/src/tool/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,14 @@ impl ToolCache {

// Explicitly drop the entry, so we don't block any other tasks.
drop(entry);
// // // Drop the sender
// // Drop the sender
drop(sender);

eprintln!("I wait");
return match receiver.recv().await {
Ok(tool) => Ok(tool),
Err(err) => miette::bail!(
"a coalesced tool {} request install failed reason {}",
spec.command,
err
"installing of {} tool failed. Reason: {err}",
spec.command
),
};
} else {
Expand Down Expand Up @@ -183,10 +181,7 @@ impl ToolCache {
// the error. This will drop the sender and all other waiting tasks will
// receive an error.
// Installation happens outside the critical section
None => {
eprintln!("going to install tool");
context.install(&spec, channel_config).await?
}
None => context.install(&spec, channel_config).await?,

Some(tool) => tool,
};
Expand Down Expand Up @@ -296,7 +291,7 @@ mod tests {
ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness, Platform,
};
use reqwest_middleware::ClientWithMiddleware;
use tokio::sync::{Barrier, Mutex};
use tokio::sync::{Barrier, Mutex, Semaphore};

use crate::{
tool::{
Expand Down Expand Up @@ -482,8 +477,7 @@ mod tests {
assert_eq!(install_count, &1);
}

// #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_handle_a_failure() {
// This test verifies that during the installation of a tool, if an error occurs
// the tool is not cached and the next request will try to install the tool again.
Expand All @@ -503,12 +497,9 @@ mod tests {
let count = count.entry(spec.clone()).or_insert(0);
*count += 1;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// if count == &1 {
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// miette::bail!("error on first request");
// }
if count == &1 {
miette::bail!("error on first request");
}

let isolated_tool =
IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default());
Expand Down Expand Up @@ -536,24 +527,33 @@ mod tests {
channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]),
};

// Let's imitate that we have 4 requests to install a tool
// we will use a barrier to ensure all tasks start at the same time.
let num_tasks = 4;
let barrier = Arc::new(Barrier::new(num_tasks));
let mut handles = Vec::new();

for _ in 0..num_tasks {
let barrier = barrier.clone();
// We need to test that failure of one task will not block other tasks
// to execute.
// To test it we want to synchronize the installation of the tool
// in the following way
// first task will fail, and set the semaphore to true
// so other task can proceed to execute.
// in this way we can verify that we handle a task failure correctly
// and other tasks can proceed to install the tool.

let tool_context = tool_context.clone();
// It is is necessary to do it in this way because
// without synchronization, all tasks will be blocked on the waiting stage
// and failure of one task will be propagated to all other tasks.

let semaphore = Arc::new(Semaphore::new(1));
{
let semaphore = semaphore.clone();

let tool_context = tool_context.clone();
let tool_installer = tool_installer.clone();

let channel_config = channel_config.clone();
let tool_spec = tool_spec.clone();

let handle = tokio::spawn(async move {
barrier.wait().await;
let _sem = semaphore.acquire().await.unwrap();

tool_context
.cache
Expand All @@ -565,7 +565,29 @@ mod tests {
)
.await
});
handles.push(handle);
}
{
let semaphore = semaphore.clone();

let tool_context = tool_context.clone();
let tool_installer = tool_installer.clone();

let channel_config = channel_config.clone();
let tool_spec = tool_spec.clone();

let handle = tokio::spawn(async move {
let _sem = semaphore.acquire().await.unwrap();
tool_context
.cache
.get_or_install_tool(
tool_spec,
&tool_installer,
&tool_context.cache_dir,
&channel_config,
)
.await
});
handles.push(handle);
}

Expand Down

0 comments on commit 7181f2c

Please sign in to comment.