diff --git a/crates/rattler_installs_packages/src/utils/streaming_or_local.rs b/crates/rattler_installs_packages/src/utils/streaming_or_local.rs index e72d14ce..c120d563 100644 --- a/crates/rattler_installs_packages/src/utils/streaming_or_local.rs +++ b/crates/rattler_installs_packages/src/utils/streaming_or_local.rs @@ -57,12 +57,14 @@ impl StreamingOrLocal { match self { StreamingOrLocal::Streaming(mut streaming) => streaming.read_to_end(bytes).await, StreamingOrLocal::Local(mut local) => { - match tokio::task::spawn_blocking(move || { + let read_to_end = move || { let mut bytes = Vec::new(); local.read_to_end(&mut bytes).map(|_| bytes) - }) - .map_err(JoinError::try_into_panic) - .await + }; + + match tokio::task::spawn_blocking(read_to_end) + .map_err(JoinError::try_into_panic) + .await { Ok(Ok(result)) => { *bytes = result; @@ -71,7 +73,7 @@ impl StreamingOrLocal { Ok(Err(err)) => Err(err), // Resume the panic on the main task Err(Ok(panic)) => std::panic::resume_unwind(panic), - Err(Err(_)) => Err(std::io::ErrorKind::Interrupted.into()), + Err(Err(_)) => Err(io::ErrorKind::Interrupted.into()), } } } diff --git a/crates/rattler_installs_packages/src/wheel_builder/build_environment.rs b/crates/rattler_installs_packages/src/wheel_builder/build_environment.rs index 28503e97..26463307 100644 --- a/crates/rattler_installs_packages/src/wheel_builder/build_environment.rs +++ b/crates/rattler_installs_packages/src/wheel_builder/build_environment.rs @@ -183,8 +183,11 @@ impl BuildEnvironment { /// This uses the `GetRequiresForBuildWheel` entry point of the build backend. /// this might not be available for all build backends. /// and it can also return an empty list of requirements. - fn get_extra_requirements(&self) -> Result, WheelBuildError> { - let output = self.run_command("GetRequiresForBuildWheel")?; + fn get_extra_requirements( + &self, + output_dir: &Path, + ) -> Result, WheelBuildError> { + let output = self.run_command("GetRequiresForBuildWheel", output_dir)?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(WheelBuildError::Error(stderr.to_string())); @@ -217,7 +220,10 @@ impl BuildEnvironment { wheel_builder: &WheelBuilder, ) -> Result<(), WheelBuildError> { // Get extra requirements if any - let extra_requirements = self.get_extra_requirements()?; + // Because we are using the build environment to get the extra requirements + // and we should only do this once + // its fine to use the work_dir as the output_dir + let extra_requirements = self.get_extra_requirements(&self.work_dir())?; // Combine previous requirements with extra requirements let combined_requirements = HashSet::from_iter(self.build_requirements.iter().cloned()) @@ -281,7 +287,11 @@ impl BuildEnvironment { } /// Run a command in the build environment - pub(crate) fn run_command(&self, stage: &str) -> Result { + pub(crate) fn run_command( + &self, + stage: &str, + output_dir: &Path, + ) -> Result { // We modify the environment of the user // so that we can use the scripts directory to run the build frontend // e.g maturin depends on an executable in the scripts directory @@ -315,7 +325,6 @@ impl BuildEnvironment { if self.clean_env { base_command.env_clear(); } - let work_dir = self.work_dir.path(); base_command .current_dir(&self.package_dir) // pass all env variables defined by user @@ -324,10 +333,10 @@ impl BuildEnvironment { // it will overwritten by more actual one .env("PATH", path_var) // Script to run - .arg(work_dir.join("build_frontend.py")) + .arg(self.work_dir().join("build_frontend.py")) // The working directory to use // will contain the output of the build - .arg(work_dir.as_path()) + .arg(output_dir) // Build system entry point .arg(&self.entry_point) // Building Wheel or Metadata diff --git a/crates/rattler_installs_packages/src/wheel_builder/mod.rs b/crates/rattler_installs_packages/src/wheel_builder/mod.rs index 471a5466..937960a4 100644 --- a/crates/rattler_installs_packages/src/wheel_builder/mod.rs +++ b/crates/rattler_installs_packages/src/wheel_builder/mod.rs @@ -9,7 +9,7 @@ use fs_err as fs; use std::collections::HashSet; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::{collections::HashMap, path::PathBuf}; use parking_lot::Mutex; @@ -23,14 +23,21 @@ use crate::wheel_builder::build_environment::BuildEnvironment; pub use crate::wheel_builder::wheel_cache::{WheelCache, WheelCacheKey}; use crate::{artifacts::Wheel, index::PackageDb, python_env::WheelTags, types::WheelCoreMetadata}; pub use error::WheelBuildError; +use tokio::sync::broadcast; type BuildCache = Mutex>>; +type OptionalBuildEnv = Option>; +type BuildEnvironmentSender = broadcast::Sender; +type BuildEnvironmentReceiver = broadcast::Receiver; /// A builder for wheels pub struct WheelBuilder { /// A cache for virtualenvs that might be reused later in the process venv_cache: BuildCache, + /// A cache for in-flight virtualenvs + in_setup_venv: Mutex>>, + /// The package database to use package_db: Arc, @@ -72,6 +79,7 @@ impl WheelBuilder { Ok(Self { venv_cache: Mutex::new(HashMap::new()), + in_setup_venv: Mutex::new(HashMap::new()), package_db, env_markers, wheel_tags, @@ -93,7 +101,9 @@ impl WheelBuilder { &self, sdist: &impl ArtifactFromSource, ) -> Result, WheelBuildError> { - if let Some(venv) = self.venv_cache.lock().get(&sdist.artifact_name()) { + // Either we have the venv cached or not yet + let name = sdist.artifact_name(); + if let Some(venv) = self.venv_cache.lock().get(&name) { tracing::debug!( "using cached virtual env for: {:?}", sdist.distribution_name() @@ -101,27 +111,103 @@ impl WheelBuilder { return Ok(venv.clone()); } - tracing::debug!("creating virtual env for: {:?}", sdist.distribution_name()); + // Even though there is no build environment yet. + // Check if another task is already setting up the build environment + // if so wait for it to finish + enum BuildEnvState { + // No build environment yet + New(Arc), + // Currently setting up the build environment + SettingUp(BuildEnvironmentReceiver), + } + + // Check if we are inflight + let state = { + let mut lock = self.in_setup_venv.lock(); + match lock.get(&name) { + // We are setting up lets wait for the broadcast + Some(notify) => { + // If the notify is still alive, we are setting up + if let Some(sender) = notify.upgrade() { + BuildEnvState::SettingUp(sender.subscribe()) + } else { + // Otherwise a panic happened, so we need to re-setup + let (tx, _) = broadcast::channel(1); + let tx = Arc::new(tx); + lock.insert(name.clone(), Arc::downgrade(&tx)); + BuildEnvState::New(tx) + } + } + // We are the first one here, so lets tell the other tasks to wait + None => { + let (tx, _) = broadcast::channel(1); + let tx = Arc::new(tx); + lock.insert(name.clone(), Arc::downgrade(&tx)); + BuildEnvState::New(tx) + } + } + }; + // Drop the lock to allow other tasks to continue getting to this point + + // If we are SettingUp wait for the response + let tx = match state { + BuildEnvState::SettingUp(mut rx) => { + tracing::debug!( + "waiting for in-flight virtual env for: {:?}", + sdist.distribution_name() + ); + // Wait for a value to return + // If the .recv() has an error all senders have been dropped + // this implies that the setup has panicked + return if let Some(build_env) = rx.recv().await.map_err(|_| { + WheelBuildError::Error( + "panic during setup of original build environment".to_string(), + ) + })? { + Ok(build_env) + } else { + // Error while setting up a build env + // but not a panic + Err(WheelBuildError::Error( + "error during setup of original build environment".to_string(), + )) + }; + } + BuildEnvState::New(notify) => notify, + }; - let mut build_environment = BuildEnvironment::setup(sdist, self).await?; + // Otherwise we need to do the work + tracing::debug!("creating virtual env for: {:?}", sdist.distribution_name()); - build_environment.install_build_files(sdist)?; + // Wrap this in a future to capture the result + let future = || async { + let mut build_environment = BuildEnvironment::setup(sdist, self).await?; + build_environment.install_build_files(sdist)?; + // Install extra requirements if any + build_environment.install_extra_requirements(self).await?; + Ok(build_environment) + }; - // Install extra requirements if any - build_environment.install_extra_requirements(self).await?; + match future().await { + Ok(build_environment) => { + let build_environment = Arc::new(build_environment); + // Insert into the venv cache + self.venv_cache + .lock() + .insert(sdist.artifact_name().clone(), build_environment.clone()); - // Insert into the venv cache - self.venv_cache - .lock() - .insert(sdist.artifact_name().clone(), Arc::new(build_environment)); + // Notify others that a result is available + let _ = tx.send(Some(build_environment.clone())); - // Return the cached values - return self - .venv_cache - .lock() - .get(&sdist.artifact_name()) - .cloned() - .ok_or_else(|| WheelBuildError::Error("Could not get venv from cache".to_string())); + Ok(build_environment) + } + Err(e) => { + // Notify others that a result is available + // It's fine that its none because the error is also propagated + let _ = tx.send(None); + Err(e) + } + } } /// Get the paths to the saved build environments @@ -156,8 +242,7 @@ impl WheelBuilder { /// Get the metadata for a given sdist by using the build_backend in a virtual env /// This function uses the `prepare_metadata_for_build_wheel` entry point of the build backend. - - #[tracing::instrument(skip_all, fields(name = %sdist.distribution_name(), version = %sdist.version()))] + #[tracing::instrument(skip_all, fields(name = % sdist.distribution_name(), version = % sdist.version()))] pub async fn get_sdist_metadata( &self, sdist: &S, @@ -186,7 +271,8 @@ impl WheelBuilder { build_environment: &BuildEnvironment, sdist: &S, ) -> Result<(Vec, WheelCoreMetadata), WheelBuildError> { - let output = build_environment.run_command("WheelMetadata")?; + let output_dir = tempfile::tempdir()?; + let output = build_environment.run_command("WheelMetadata", output_dir.path())?; if !output.status.success() { if output.status.code() == Some(50) { tracing::warn!("SDist build backend does not support metadata generation"); @@ -200,10 +286,12 @@ impl WheelBuilder { return Err(WheelBuildError::Error(stdout.to_string())); } - let result = fs::read_to_string(build_environment.work_dir().join("metadata_result"))?; + // Read the outputted file + let result = fs::read_to_string(output_dir.path().join("metadata_result"))?; let folder = PathBuf::from(result.trim()); let path = folder.join("METADATA"); + // Read the metadata let metadata = fs::read(path)?; let wheel_metadata = WheelCoreMetadata::try_from(metadata.as_slice())?; Ok((metadata, wheel_metadata)) @@ -211,7 +299,7 @@ impl WheelBuilder { /// Build a wheel from an sdist by using the build_backend in a virtual env. /// This function uses the `build_wheel` entry point of the build backend. - #[tracing::instrument(skip_all, fields(name = %sdist.distribution_name(), version = %sdist.version()))] + #[tracing::instrument(skip_all, fields(name = % sdist.distribution_name(), version = % sdist.version()))] pub async fn build_wheel( &self, sdist: &S, @@ -236,8 +324,9 @@ impl WheelBuilder { build_environment: &BuildEnvironment, sdist: &S, ) -> Result { + let output_dir = tempfile::tempdir()?; // Run the wheel stage - let output = build_environment.run_command("Wheel")?; + let output = build_environment.run_command("Wheel", output_dir.path())?; // Check for success if !output.status.success() { @@ -246,10 +335,9 @@ impl WheelBuilder { } // This is where the wheel file is located - let wheel_file: PathBuf = - fs::read_to_string(build_environment.work_dir().join("wheel_result"))? - .trim() - .into(); + let wheel_file: PathBuf = fs::read_to_string(output_dir.path().join("wheel_result"))? + .trim() + .into(); // Get the name of the package let package_name: NormalizedPackageName = PackageName::from_str(&sdist.distribution_name()) @@ -291,7 +379,7 @@ mod tests { use crate::artifacts::SDist; use crate::index::{PackageDb, PackageSourcesBuilder}; use crate::python_env::{Pep508EnvMakers, PythonInterpreterVersion}; - use crate::resolve::solve_options::ResolveOptions; + use crate::resolve::solve_options::{OnWheelBuildFailure, ResolveOptions}; use crate::wheel_builder::wheel_cache::WheelCacheKey; use crate::wheel_builder::WheelBuilder; use futures::future::TryJoinAll; @@ -301,6 +389,7 @@ mod tests { use std::path::Path; use std::sync::Arc; use tempfile::TempDir; + use tokio_util::either::Either; fn get_package_db() -> (Arc, TempDir) { let tempdir = tempfile::tempdir().unwrap(); @@ -315,6 +404,26 @@ mod tests { ) } + // Setup the test environment + pub async fn setup(resolve_options: ResolveOptions) -> (Arc, TempDir) { + let (package_db, tempdir) = get_package_db(); + let env_markers = Arc::new(Pep508EnvMakers::from_env().await.unwrap().0); + + ( + Arc::new( + WheelBuilder::new( + package_db.clone(), + env_markers.clone(), + None, + resolve_options, + HashMap::default(), + ) + .unwrap(), + ), + tempdir, + ) + } + #[tokio::test(flavor = "multi_thread")] pub async fn build_with_cache() { let path = @@ -322,16 +431,7 @@ mod tests { let sdist = SDist::from_path(&path, &"rich".parse().unwrap()).unwrap(); - let package_db = get_package_db(); - let env_markers = Arc::new(Pep508EnvMakers::from_env().await.unwrap().0); - let wheel_builder = WheelBuilder::new( - package_db.0, - env_markers, - None, - ResolveOptions::default(), - HashMap::default(), - ) - .unwrap(); + let (wheel_builder, _temp) = setup(ResolveOptions::default()).await; // Build the wheel wheel_builder.build_wheel(&sdist).await.unwrap(); @@ -362,23 +462,11 @@ mod tests { .join("../../test-data/sdists/tampered-rich-13.6.0.tar.gz"); let sdist = SDist::from_path(&path, &"tampered-rich".parse().unwrap()).unwrap(); - - let package_db = get_package_db(); - let env_markers = Arc::new(Pep508EnvMakers::from_env().await.unwrap().0); - let resolve_options = ResolveOptions { - on_wheel_build_failure: - crate::resolve::solve_options::OnWheelBuildFailure::SaveBuildEnv, + let (wheel_builder, _temp) = setup(ResolveOptions { + on_wheel_build_failure: OnWheelBuildFailure::SaveBuildEnv, ..Default::default() - }; - - let wheel_builder = WheelBuilder::new( - package_db.0, - env_markers, - None, - resolve_options, - Default::default(), - ) - .unwrap(); + }) + .await; // Build the wheel // this should fail because we don't have the right environment @@ -394,34 +482,54 @@ mod tests { assert!(path.exists()); } - // Skipped for now will fix this in a later PR + // Enable this if you need to know what's going on + // #[traced_test] #[tokio::test(flavor = "multi_thread")] - #[ignore] pub async fn build_sdist_metadata_concurrently() { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data/sdists/rich-13.6.0.tar.gz"); - let package_db = get_package_db(); - let env_markers = Arc::new(Pep508EnvMakers::from_env().await.unwrap().0); + let (wheel_builder, _temp) = setup(ResolveOptions::default()).await; + let mut handles = vec![]; - let wheel_builder = Arc::new( - WheelBuilder::new( - package_db.0, - env_markers, - None, - ResolveOptions::default(), - Default::default(), - ) - .unwrap(), - ); + for _ in 0..10 { + let sdist = SDist::from_path(&path, &"rich".parse().unwrap()).unwrap(); + let wheel_builder = wheel_builder.clone(); + handles.push(tokio::spawn(async move { + wheel_builder.get_sdist_metadata(&sdist).await + })); + } + let result = handles.into_iter().collect::>().await; + match result { + Ok(results) => { + for result in results { + assert!( + result.is_ok(), + "error during concurrent wheel build: {:?}", + result.err() + ); + } + } + Err(e) => { + panic!("Failed to build sdists concurrently: {}", e); + } + } + } + + #[tokio::test(flavor = "multi_thread")] + pub async fn build_wheels_concurrently() { + let path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data/sdists/rich-13.6.0.tar.gz"); + + let (wheel_builder, _temp) = setup(ResolveOptions::default()).await; let mut handles = vec![]; for _ in 0..10 { let sdist = SDist::from_path(&path, &"rich".parse().unwrap()).unwrap(); let wheel_builder = wheel_builder.clone(); handles.push(tokio::spawn(async move { - wheel_builder.get_sdist_metadata(&sdist).await + wheel_builder.build_wheel(&sdist).await })); } @@ -437,7 +545,56 @@ mod tests { } } Err(e) => { - panic!("Failed to build wheels concurrently: {}", e); + panic!("Failed to build sdists concurrently: {}", e); + } + } + } + + #[tokio::test(flavor = "multi_thread")] + pub async fn build_wheels_interleaved() { + let path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data/sdists/rich-13.6.0.tar.gz"); + + let (wheel_builder, _temp) = setup(ResolveOptions::default()).await; + + let mut handles = vec![]; + + for x in 0..10 { + let sdist = SDist::from_path(&path, &"rich".parse().unwrap()).unwrap(); + let wheel_builder = wheel_builder.clone(); + handles.push(tokio::spawn(async move { + if x % 2 == 0 { + Either::Left(wheel_builder.build_wheel(&sdist).await) + } else { + Either::Right(wheel_builder.get_sdist_metadata(&sdist).await) + } + })); + } + + let result = handles.into_iter().collect::>().await; + match result { + Ok(results) => { + for result in results { + match result { + Either::Left(result) => { + assert!( + result.is_ok(), + "error during concurrent wheel build: {:?}", + result.err() + ); + } + Either::Right(result) => { + assert!( + result.is_ok(), + "error during concurrent metadata build: {:?}", + result.err() + ); + } + } + } + } + Err(e) => { + panic!("Failed to build sdists concurrently: {}", e); } } } diff --git a/rust-toolchain b/rust-toolchain index 07cde984..9242d8e7 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.75 +1.76