From 2e8483d5009a50d766394a51d15d0d01a17e51be Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 3 Dec 2024 14:19:39 +0200 Subject: [PATCH] feat: add request coalescing for isolated tools (#2589) Co-authored-by: Bas Zalmstra --- crates/pixi_build_frontend/Cargo.toml | 5 + .../pixi_build_frontend/src/build_frontend.rs | 3 +- crates/pixi_build_frontend/src/tool/cache.rs | 631 +++++++++--------- .../pixi_build_frontend/src/tool/installer.rs | 300 +++++++++ crates/pixi_build_frontend/src/tool/mod.rs | 27 +- 5 files changed, 630 insertions(+), 336 deletions(-) create mode 100644 crates/pixi_build_frontend/src/tool/installer.rs diff --git a/crates/pixi_build_frontend/Cargo.toml b/crates/pixi_build_frontend/Cargo.toml index f5e186321..976b86429 100644 --- a/crates/pixi_build_frontend/Cargo.toml +++ b/crates/pixi_build_frontend/Cargo.toml @@ -47,3 +47,8 @@ pixi_build_types = { path = "../pixi_build_types" } insta = { workspace = true, features = ["yaml", "filters"] } rstest = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true, features = [ + "process", + "io-std", + "rt-multi-thread", +] } diff --git a/crates/pixi_build_frontend/src/build_frontend.rs b/crates/pixi_build_frontend/src/build_frontend.rs index aa58a6933..d025bfa16 100644 --- a/crates/pixi_build_frontend/src/build_frontend.rs +++ b/crates/pixi_build_frontend/src/build_frontend.rs @@ -7,8 +7,7 @@ use rattler_conda_types::ChannelConfig; use crate::{ protocol, protocol_builder::{EnabledProtocols, ProtocolBuilder}, - tool::ToolContext, - Protocol, SetupRequest, + Protocol, SetupRequest, ToolContext, }; /// The frontend for building packages. diff --git a/crates/pixi_build_frontend/src/tool/cache.rs b/crates/pixi_build_frontend/src/tool/cache.rs index fbd57e4f4..c16dd9960 100644 --- a/crates/pixi_build_frontend/src/tool/cache.rs +++ b/crates/pixi_build_frontend/src/tool/cache.rs @@ -1,302 +1,34 @@ -use std::{fmt::Debug, hash::Hash, path::PathBuf}; - -use dashmap::{DashMap, Entry}; -use miette::{miette, IntoDiagnostic, Result}; -use pixi_consts::consts::{CACHED_BUILD_ENVS_DIR, CONDA_REPODATA_CACHE_DIR}; -use pixi_progress::wrap_in_progress; -use pixi_utils::{EnvironmentHash, PrefixGuard}; -use rattler::{install::Installer, package_cache::PackageCache}; -use rattler_conda_types::{Channel, ChannelConfig, GenericVirtualPackage, Platform}; -use rattler_repodata_gateway::Gateway; -use rattler_shell::{ - activation::{ActivationVariables, Activator}, - shell::ShellEnum, -}; -use rattler_solve::{resolvo::Solver, SolverImpl, SolverTask}; -use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides}; -use reqwest_middleware::ClientWithMiddleware; - -use super::IsolatedTool; -use crate::{ - tool::{SystemTool, Tool, ToolSpec}, - IsolatedToolSpec, SystemToolSpec, +use std::{ + fmt::Debug, + path::PathBuf, + sync::{Arc, Weak}, }; -pub struct ToolContextBuilder { - gateway: Option, - client: ClientWithMiddleware, - cache_dir: PathBuf, - cache: ToolCache, - platform: Platform, -} - -impl Default for ToolContextBuilder { - fn default() -> Self { - Self::new() - } -} - -impl ToolContextBuilder { - /// Create a new tool context builder. - pub fn new() -> Self { - Self { - gateway: None, - client: ClientWithMiddleware::default(), - cache_dir: pixi_config::get_cache_dir().expect("we should have a cache dir"), - cache: ToolCache::default(), - platform: Platform::current(), - } - } - - /// Set the platform to install tools for. This is usually the current - /// platform but could also be a compatible platform. For instance if the - /// current platform is win-arm64, the compatible platform could be win-64. - pub fn with_platform(mut self, platform: Platform) -> Self { - self.platform = platform; - self - } - - /// Set the gateway for the tool context. - pub fn with_gateway(mut self, gateway: Gateway) -> Self { - self.gateway = Some(gateway); - self - } - - /// Set the client for the tool context. - pub fn with_client(mut self, client: ClientWithMiddleware) -> Self { - self.client = client; - self - } - - /// Set the cache directory for the tool context. - pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { - self.cache_dir = cache_dir; - self - } - - pub fn with_cache(mut self, cache: ToolCache) -> Self { - self.cache = cache; - self - } - - /// Build the `ToolContext` using builder configuration. - pub fn build(self) -> ToolContext { - let gateway = self.gateway.unwrap_or_else(|| { - Gateway::builder() - .with_client(self.client.clone()) - .with_cache_dir(self.cache_dir.join(CONDA_REPODATA_CACHE_DIR)) - .finish() - }); - - ToolContext { - cache_dir: self.cache_dir, - client: self.client, - cache: self.cache, - platform: self.platform, - gateway, - } - } -} - -/// The tool context, -/// containing client, channels and gateway configuration -/// that will be used to resolve and install tools. -pub struct ToolContext { - /// Authentication client to use for fetching repodata. - pub client: ClientWithMiddleware, - /// The cache directory to use for the tools. - pub cache_dir: PathBuf, - /// The gateway to use for fetching repodata. - pub gateway: Gateway, - /// The cache to use for the tools. - pub cache: ToolCache, - /// The platform to install tools for. This is usually the current platform - /// but could also be a compatible platform. For instance if the current - /// platform is win-arm64, the compatible platform could be win-64. - pub platform: Platform, -} - -impl Default for ToolContext { - fn default() -> Self { - Self::builder().build() - } -} - -impl Debug for ToolContext { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ToolContext") - .field("client", &self.client) - .field("cache_dir", &self.cache_dir) - .finish() - } -} - -impl ToolContext { - /// Create a new tool context builder with the given channels. - pub fn builder() -> ToolContextBuilder { - ToolContextBuilder::new() - } - - /// Instantiate a tool from a specification. - /// - /// If the tool is not already cached, it will be created, installed and cached. - pub async fn instantiate( - &self, - spec: ToolSpec, - channel_config: &ChannelConfig, - ) -> Result { - let spec = match spec { - ToolSpec::Io(ipc) => return Ok(Tool::Io(ipc)), - ToolSpec::Isolated(isolated) => CacheableToolSpec::Isolated(isolated), - ToolSpec::System(system) => CacheableToolSpec::System(system), - }; - - let cache_entry = match self.cache.cache.entry(spec.clone()) { - Entry::Occupied(entry) => return Ok(entry.get().clone().into()), - Entry::Vacant(entry) => entry, - }; - - let tool: CachedTool = match spec { - CacheableToolSpec::Isolated(spec) => CachedTool::Isolated(if spec.specs.is_empty() { - return Err(ToolCacheError::Install(miette!( - "No build match specs provided for '{}' command.", - spec.command - ))); - } else { - self.install(&spec, channel_config) - .await - .map_err(ToolCacheError::Install)? - }), - CacheableToolSpec::System(spec) => SystemTool::new(spec.command).into(), - }; - - cache_entry.insert(tool.clone()); - Ok(tool.into()) - } - - /// Installed the tool in the isolated environment. - pub async fn install( - &self, - spec: &IsolatedToolSpec, - channel_config: &ChannelConfig, - ) -> miette::Result { - let channels: Vec = spec - .channels - .iter() - .cloned() - .map(|channel| channel.into_channel(channel_config)) - .collect::, _>>() - .into_diagnostic()?; - - let repodata = self - .gateway - .query( - channels.clone(), - [self.platform, Platform::NoArch], - spec.specs.clone(), - ) - .recursive(true) - .execute() - .await - .into_diagnostic()?; - - // Determine virtual packages of the current platform - let virtual_packages = VirtualPackage::detect(&VirtualPackageOverrides::from_env()) - .unwrap() - .iter() - .cloned() - .map(GenericVirtualPackage::from) - .collect(); - - let solved_records = Solver - .solve(SolverTask { - specs: spec.specs.clone(), - virtual_packages, - ..SolverTask::from_iter(&repodata) - }) - .into_diagnostic()?; - - let cache = EnvironmentHash::new( - spec.command.clone(), - spec.specs.clone(), - channels.iter().map(|c| c.base_url.to_string()).collect(), - self.platform, - ); - - let cached_dir = self - .cache_dir - .join(CACHED_BUILD_ENVS_DIR) - .join(cache.name()); - - let mut prefix_guard = PrefixGuard::new(&cached_dir).into_diagnostic()?; - - let mut write_guard = - wrap_in_progress("acquiring write lock on prefix", || prefix_guard.write()) - .into_diagnostic()?; - - // If the environment already exists, we can return early. - if write_guard.is_ready() { - tracing::info!("reusing existing environment in {}", cached_dir.display()); - let _ = write_guard.finish(); - - // Get the activation scripts - let activator = - Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()) - .unwrap(); - - let activation_scripts = activator - .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) - .unwrap(); - - return Ok(IsolatedTool::new( - spec.command.clone(), - cached_dir, - activation_scripts, - )); - } - - // Update the prefix to indicate that we are installing it. - write_guard.begin().into_diagnostic()?; - - // Install the environment - Installer::new() - .with_target_platform(self.platform) - .with_download_client(self.client.clone()) - .with_package_cache(PackageCache::new( - self.cache_dir - .join(pixi_consts::consts::CONDA_PACKAGE_CACHE_DIR), - )) - .install(&cached_dir, solved_records) - .await - .into_diagnostic()?; - - // Get the activation scripts - let activator = - Activator::from_path(&cached_dir, ShellEnum::default(), self.platform).unwrap(); - - let activation_scripts = activator - .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) - .unwrap(); +use dashmap::{DashMap, Entry}; +use rattler_conda_types::ChannelConfig; +use tokio::sync::broadcast; - let _ = write_guard.finish(); +use super::{installer::ToolInstaller, IsolatedTool}; +use crate::IsolatedToolSpec; - Ok(IsolatedTool::new( - spec.command.clone(), - cached_dir, - activation_scripts, - )) - } +/// A entity that is either pending or has been fetched. +#[derive(Clone)] +enum PendingOrFetched { + Pending(Weak>), + Fetched(T), } -/// A [`ToolCache`] maintains a cache of environments for build tools. +/// A [`ToolCache`] maintains a cache of environments for isolated build tools. /// /// This is useful to ensure that if we need to build multiple packages that use /// the same tool, we can reuse their environments. -/// (nichita): it can also be seen as a way to create tools itself -#[derive(Default, Debug)] +/// Implementation for request coalescing is inspired by: +/// * https://github.com/conda/rattler/blob/main/crates/rattler_repodata_gateway/src/gateway/mod.rs#L180 +/// * https://github.com/prefix-dev/rip/blob/main/crates/rattler_installs_packages/src/wheel_builder/mod.rs#L39 +#[derive(Default)] pub struct ToolCache { /// The cache of tools. - pub cache: DashMap, + cache: DashMap>>, } #[derive(thiserror::Error, Debug)] @@ -309,42 +41,6 @@ pub enum ToolCacheError { CacheDir(miette::Report), } -/// Describes the specification of the tool. This can be used to cache tool -/// information. -#[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub enum CacheableToolSpec { - Isolated(IsolatedToolSpec), - System(SystemToolSpec), -} - -/// A tool that can be invoked. -#[derive(Debug, Clone)] -pub enum CachedTool { - Isolated(IsolatedTool), - System(SystemTool), -} - -impl From for Tool { - fn from(value: CachedTool) -> Self { - match value { - CachedTool::Isolated(tool) => Tool::Isolated(tool), - CachedTool::System(tool) => Tool::System(tool), - } - } -} - -impl From for CachedTool { - fn from(value: IsolatedTool) -> Self { - Self::Isolated(value) - } -} - -impl From for CachedTool { - fn from(value: SystemTool) -> Self { - Self::System(value) - } -} - impl ToolCache { /// Construct a new tool cache. pub fn new() -> Self { @@ -352,20 +48,133 @@ impl ToolCache { cache: DashMap::default(), } } + + pub async fn get_or_install_tool( + &self, + spec: IsolatedToolSpec, + context: &impl ToolInstaller, + channel_config: &ChannelConfig, + ) -> miette::Result> { + let sender = match self.cache.entry(spec.clone()) { + Entry::Vacant(entry) => { + // Construct a sender so other tasks can subscribe + let (sender, _) = broadcast::channel(1); + let sender = Arc::new(sender); + + // modify the current entry to the pending entry. + // this is an atomic operation + // because who holds the entry holds mutable access. + entry.insert(PendingOrFetched::Pending(Arc::downgrade(&sender))); + + sender + } + Entry::Occupied(mut entry) => { + let tool = entry.get(); + match tool { + PendingOrFetched::Pending(sender) => { + let sender = sender.upgrade(); + if let Some(sender) = sender { + // Create a receiver before we drop the entry. While we hold on to + // the entry we have exclusive access to it, this means the task + // currently installing the tool will not be able to store a value + // until we drop the entry. + // By creating the receiver here we ensure that we are subscribed + // before the other tasks sends a value over the channel. + let mut receiver = sender.subscribe(); + + // Explicitly drop the entry, so we don't block any other tasks. + drop(entry); + // Drop the sender + drop(sender); + + return match receiver.recv().await { + Ok(tool) => Ok(tool), + Err(_) => miette::bail!( + "a coalesced tool {} request install failed", + spec.command + ), + }; + } else { + // Construct a sender so other tasks can subscribe + let (sender, _) = broadcast::channel(1); + let sender = Arc::new(sender); + + // Modify the current entry to the pending entry, this is an atomic + // operation because who holds the entry holds mutable access. + entry.insert(PendingOrFetched::Pending(Arc::downgrade(&sender))); + + sender + } + } + PendingOrFetched::Fetched(tool) => return Ok(tool.clone()), + } + } + }; + + // At this point we have exclusive write access to this specific entry. All + // other tasks will find a pending entry and will wait for the tool + // to become available. + // + // Let's start by installing tool. If an error occurs we immediately return + // the error. This will drop the sender and all other waiting tasks will + // receive an error. + // Installation happens outside the critical section + let tool = Arc::new(context.install(&spec, channel_config).await?); + + // Store the fetched files in the entry. + self.cache + .insert(spec, PendingOrFetched::Fetched(tool.clone())); + + // Send the tool to all waiting tasks. We don't care if there are no + // receivers, so we drop the error. + let _ = sender.send(tool.clone()); + + Ok(tool) + } } #[cfg(test)] mod tests { + use std::{collections::HashMap, path::PathBuf, sync::Arc}; + use pixi_config::Config; - use rattler_conda_types::{MatchSpec, ParseStrictness, Platform}; + use rattler_conda_types::{ + ChannelConfig, MatchSpec, NamedChannelOrUrl, ParseStrictness, Platform, + }; use reqwest_middleware::ClientWithMiddleware; + use tokio::sync::{Barrier, Mutex}; - // use super::ToolCache; use crate::{ - tool::{ToolContext, ToolSpec}, + tool::{ + installer::{ToolContext, ToolInstaller}, + IsolatedTool, ToolSpec, + }, IsolatedToolSpec, }; + /// A test installer that will count how many times a tool was installed. + /// This is used to verify that we only install a tool once. + #[derive(Default, Clone)] + struct TestInstaller { + count: Arc>>, + } + + impl ToolInstaller for TestInstaller { + async fn install( + &self, + spec: &IsolatedToolSpec, + _channel_config: &ChannelConfig, + ) -> miette::Result { + let mut count = self.count.lock().await; + let count = count.entry(spec.clone()).or_insert(0); + *count += 1; + + let isolated_tool = + IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default()); + + Ok(isolated_tool) + } + } /// Returns the platform to use for the tool cache. Python is not yet /// available for win-arm64 so we use win-64. pub fn compatible_target_platform() -> Platform { @@ -403,4 +212,172 @@ mod tests { exec.command().arg("--version").spawn().unwrap(); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_installing_is_synced() { + // This test verifies that we only install a tool once even if multiple tasks + // request the same tool at the same time. + + let mut config = Config::default(); + config.default_channels = vec![NamedChannelOrUrl::Name("conda-forge".to_string())]; + + let auth_client = ClientWithMiddleware::default(); + let channel_config = ChannelConfig::default_with_root_dir(PathBuf::new()); + + let tool_context = Arc::new( + ToolContext::builder() + .with_client(auth_client.clone()) + .build(), + ); + + let tool_installer = TestInstaller::default(); + + let tool_spec = IsolatedToolSpec { + specs: vec![MatchSpec::from_str("cowpy", ParseStrictness::Strict).unwrap()], + command: "cowpy".into(), + channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]), + }; + + // Let's imitate that we have 4 requests to install a tool + // we will use a barrier to ensure all tasks start at the same time. + let num_tasks = 4; + let barrier = Arc::new(Barrier::new(num_tasks)); + let mut handles = Vec::new(); + + for _ in 0..num_tasks { + let barrier = barrier.clone(); + + let tool_context = tool_context.clone(); + + let tool_installer = tool_installer.clone(); + + let channel_config = channel_config.clone(); + let tool_spec = tool_spec.clone(); + + let handle = tokio::spawn(async move { + barrier.wait().await; + + tool_context + .cache + .get_or_install_tool(tool_spec, &tool_installer, &channel_config) + .await + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + let tools = futures::future::join_all(handles) + .await + .into_iter() + .map(|tool| tool.unwrap()) + .collect::>(); + + // verify that we dont have any errors + let errors = tools.iter().filter(|tool| tool.is_err()).count(); + assert_eq!(errors, 0); + + // verify that only one was installed + let lock = tool_installer.count.lock().await; + let install_count = lock.get(&tool_spec).unwrap(); + assert_eq!(install_count, &1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_handle_a_failure() { + // This test verifies that during the installation of a tool, if an error occurs + // the tool is not cached and the next request will try to install the tool again. + // A test installer that will fail on the first request. + #[derive(Default, Clone)] + struct TestInstaller { + count: Arc>>, + } + + impl ToolInstaller for TestInstaller { + async fn install( + &self, + spec: &IsolatedToolSpec, + _channel_config: &ChannelConfig, + ) -> miette::Result { + let mut count = self.count.lock().await; + let count = count.entry(spec.clone()).or_insert(0); + *count += 1; + + if count == &1 { + dbg!("a"); + miette::bail!("error on first request"); + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + // panic!("tim is right!"); + } + + dbg!("b"); + let isolated_tool = + IsolatedTool::new(spec.command.clone(), PathBuf::new(), HashMap::default()); + Ok(isolated_tool) + } + } + + let mut config = Config::default(); + config.default_channels = vec![NamedChannelOrUrl::Name("conda-forge".to_string())]; + + let auth_client = ClientWithMiddleware::default(); + let channel_config = ChannelConfig::default_with_root_dir(PathBuf::new()); + + let tool_context = Arc::new( + ToolContext::builder() + .with_client(auth_client.clone()) + .build(), + ); + + let tool_installer = TestInstaller::default(); + + let tool_spec = IsolatedToolSpec { + specs: vec![MatchSpec::from_str("cowpy", ParseStrictness::Strict).unwrap()], + command: "cowpy".into(), + channels: Vec::from([NamedChannelOrUrl::Name("conda-forge".to_string())]), + }; + + // Let's imitate that we have 4 requests to install a tool + // we will use a barrier to ensure all tasks start at the same time. + let num_tasks = 2; + let barrier = Arc::new(Barrier::new(num_tasks)); + let mut handles = Vec::new(); + + for _ in 0..num_tasks { + let barrier = barrier.clone(); + + let tool_context = tool_context.clone(); + + let tool_installer = tool_installer.clone(); + + let channel_config = channel_config.clone(); + let tool_spec = tool_spec.clone(); + + let handle = tokio::spawn(async move { + barrier.wait().await; + + tool_context + .cache + .get_or_install_tool(tool_spec, &tool_installer, &channel_config) + .await + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + let tools = futures::future::join_all(handles) + .await + .into_iter() + .map(|tool| tool.unwrap()) + .collect::>(); + + // now we need to validate that exactly one install was errored out + let errors = tools.iter().filter(|tool| tool.is_err()).count(); + assert_eq!(errors, 1); + + let lock = tool_installer.count.lock().await; + let install_count = lock.get(&tool_spec).unwrap(); + assert_eq!(install_count, &2); + } } diff --git a/crates/pixi_build_frontend/src/tool/installer.rs b/crates/pixi_build_frontend/src/tool/installer.rs new file mode 100644 index 000000000..c69463c65 --- /dev/null +++ b/crates/pixi_build_frontend/src/tool/installer.rs @@ -0,0 +1,300 @@ +use std::fmt::Debug; +use std::future::Future; +use std::path::PathBuf; + +use pixi_consts::consts::{CACHED_BUILD_ENVS_DIR, CONDA_REPODATA_CACHE_DIR}; +use pixi_progress::wrap_in_progress; +use pixi_utils::{EnvironmentHash, PrefixGuard}; +use rattler::{install::Installer, package_cache::PackageCache}; +use rattler_conda_types::{Channel, ChannelConfig, GenericVirtualPackage, Platform}; +use rattler_repodata_gateway::Gateway; +use rattler_shell::{ + activation::{ActivationVariables, Activator}, + shell::ShellEnum, +}; +use rattler_solve::{resolvo::Solver, SolverImpl, SolverTask}; +use rattler_virtual_packages::{VirtualPackage, VirtualPackageOverrides}; +use reqwest_middleware::ClientWithMiddleware; + +use super::{ + cache::ToolCache, IsolatedTool, IsolatedToolSpec, SystemTool, Tool, ToolCacheError, ToolSpec, +}; + +use miette::{miette, IntoDiagnostic}; + +/// A trait that is responsible for installing tools. +pub trait ToolInstaller { + /// Install the tool. + fn install( + &self, + tool: &IsolatedToolSpec, + channel_config: &ChannelConfig, + ) -> impl Future> + Send; +} + +pub struct ToolContextBuilder { + gateway: Option, + client: ClientWithMiddleware, + cache_dir: PathBuf, + cache: ToolCache, + platform: Platform, +} + +impl Default for ToolContextBuilder { + fn default() -> Self { + Self::new() + } +} + +impl ToolContextBuilder { + /// Create a new tool context builder. + pub fn new() -> Self { + Self { + gateway: None, + client: ClientWithMiddleware::default(), + cache_dir: pixi_config::get_cache_dir().expect("we should have a cache dir"), + cache: ToolCache::default(), + platform: Platform::current(), + } + } + + /// Set the platform to install tools for. This is usually the current + /// platform but could also be a compatible platform. For instance if the + /// current platform is win-arm64, the compatible platform could be win-64. + pub fn with_platform(mut self, platform: Platform) -> Self { + self.platform = platform; + self + } + + /// Set the gateway for the tool context. + pub fn with_gateway(mut self, gateway: Gateway) -> Self { + self.gateway = Some(gateway); + self + } + + /// Set the client for the tool context. + pub fn with_client(mut self, client: ClientWithMiddleware) -> Self { + self.client = client; + self + } + + /// Set the cache directory for the tool context. + pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { + self.cache_dir = cache_dir; + self + } + + pub fn with_cache(mut self, cache: ToolCache) -> Self { + self.cache = cache; + self + } + + /// Build the `ToolContext` using builder configuration. + pub fn build(self) -> ToolContext { + let gateway = self.gateway.unwrap_or_else(|| { + Gateway::builder() + .with_client(self.client.clone()) + .with_cache_dir(self.cache_dir.join(CONDA_REPODATA_CACHE_DIR)) + .finish() + }); + + ToolContext { + cache_dir: self.cache_dir, + client: self.client, + cache: self.cache, + platform: self.platform, + gateway, + } + } +} + +/// The tool context, +/// containing client, channels and gateway configuration +/// that will be used to resolve and install tools. +pub struct ToolContext { + // Authentication client to use for fetching repodata. + pub client: ClientWithMiddleware, + // The cache directory to use for the tools. + pub cache_dir: PathBuf, + // The gateway to use for fetching repodata. + pub gateway: Gateway, + // The cache to use for the tools. + pub cache: ToolCache, + /// The platform to install tools for. This is usually the current platform + /// but could also be a compatible platform. For instance if the current + /// platform is win-arm64, the compatible platform could be win-64. + pub platform: Platform, +} + +impl Default for ToolContext { + fn default() -> Self { + Self::builder().build() + } +} + +impl Debug for ToolContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ToolContext") + .field("client", &self.client) + .field("cache_dir", &self.cache_dir) + .field("platform", &self.platform) + .finish() + } +} + +impl ToolContext { + /// Create a new tool context builder with the given channels. + pub fn builder() -> ToolContextBuilder { + ToolContextBuilder::new() + } + + /// Instantiate a tool from a specification. + /// + /// If the tool is not already cached, it will be created, installed and cached. + pub async fn instantiate( + &self, + spec: ToolSpec, + channel_config: &ChannelConfig, + ) -> Result { + let spec = match spec { + ToolSpec::Io(ipc) => return Ok(Tool::Io(ipc)), + ToolSpec::Isolated(isolated) => { + if isolated.specs.is_empty() { + return Err(ToolCacheError::Install(miette!( + "No build match specs provided for '{}' command.", + isolated.command + ))); + } + + isolated + } + + // I think we cannot bypass caching SystemTool as it is a wrapper around a spec command + ToolSpec::System(system) => return Ok(Tool::System(SystemTool::new(system.command))), + }; + + let installed = self + .cache + .get_or_install_tool(spec, self, channel_config) + .await + .map_err(ToolCacheError::Install)?; + + // Return the installed tool as a non arc instance + Ok(installed.as_ref().clone().into()) + } +} + +impl ToolInstaller for ToolContext { + /// Installed the tool in the isolated environment. + async fn install( + &self, + spec: &IsolatedToolSpec, + channel_config: &ChannelConfig, + ) -> miette::Result { + let channels: Vec = spec + .channels + .iter() + .cloned() + .map(|channel| channel.into_channel(channel_config)) + .collect::, _>>() + .into_diagnostic()?; + + let repodata = self + .gateway + .query( + channels.clone(), + [self.platform, Platform::NoArch], + spec.specs.clone(), + ) + .recursive(true) + .execute() + .await + .into_diagnostic()?; + + // Determine virtual packages of the current platform + let virtual_packages = VirtualPackage::detect(&VirtualPackageOverrides::from_env()) + .unwrap() + .iter() + .cloned() + .map(GenericVirtualPackage::from) + .collect(); + + let solved_records = Solver + .solve(SolverTask { + specs: spec.specs.clone(), + virtual_packages, + ..SolverTask::from_iter(&repodata) + }) + .into_diagnostic()?; + + let cache = EnvironmentHash::new( + spec.command.clone(), + spec.specs.clone(), + channels.iter().map(|c| c.base_url.to_string()).collect(), + self.platform, + ); + + let cached_dir = self + .cache_dir + .join(CACHED_BUILD_ENVS_DIR) + .join(cache.name()); + + let mut prefix_guard = PrefixGuard::new(&cached_dir).into_diagnostic()?; + + let mut write_guard = + wrap_in_progress("acquiring write lock on prefix", || prefix_guard.write()) + .into_diagnostic()?; + + // If the environment already exists, we can return early. + if write_guard.is_ready() { + tracing::info!("reusing existing environment in {}", cached_dir.display()); + let _ = write_guard.finish(); + + // Get the activation scripts + let activator = + Activator::from_path(&cached_dir, ShellEnum::default(), Platform::current()) + .unwrap(); + + let activation_scripts = activator + .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) + .unwrap(); + + return Ok(IsolatedTool::new( + spec.command.clone(), + cached_dir, + activation_scripts, + )); + } + + // Update the prefix to indicate that we are installing it. + write_guard.begin().into_diagnostic()?; + + // Install the environment + Installer::new() + .with_target_platform(self.platform) + .with_download_client(self.client.clone()) + .with_package_cache(PackageCache::new( + self.cache_dir + .join(pixi_consts::consts::CONDA_PACKAGE_CACHE_DIR), + )) + .install(&cached_dir, solved_records) + .await + .into_diagnostic()?; + + // Get the activation scripts + let activator = + Activator::from_path(&cached_dir, ShellEnum::default(), self.platform).unwrap(); + + let activation_scripts = activator + .run_activation(ActivationVariables::from_env().unwrap_or_default(), None) + .unwrap(); + + let _ = write_guard.finish(); + + Ok(IsolatedTool::new( + spec.command.clone(), + cached_dir, + activation_scripts, + )) + } +} diff --git a/crates/pixi_build_frontend/src/tool/mod.rs b/crates/pixi_build_frontend/src/tool/mod.rs index 10a552478..ca21f38e4 100644 --- a/crates/pixi_build_frontend/src/tool/mod.rs +++ b/crates/pixi_build_frontend/src/tool/mod.rs @@ -1,13 +1,16 @@ mod cache; +mod installer; mod spec; use std::{collections::HashMap, path::PathBuf}; -pub use cache::{ToolCacheError, ToolContext}; +pub use cache::ToolCacheError; pub use spec::{IsolatedToolSpec, SystemToolSpec, ToolSpec}; use crate::InProcessBackend; +pub use installer::ToolContext; + /// A tool that can be invoked. #[derive(Debug)] pub enum Tool { @@ -16,6 +19,16 @@ pub enum Tool { Io(InProcessBackend), } +impl Tool { + pub fn as_isolated(&self) -> Option<&IsolatedTool> { + match self { + Tool::Isolated(tool) => Some(tool), + Tool::System(_) => None, + Tool::Io(_) => None, + } + } +} + #[derive(Debug)] pub enum ExecutableTool { Isolated(IsolatedTool), @@ -43,6 +56,12 @@ impl From for Tool { } } +impl From for Tool { + fn from(value: IsolatedTool) -> Self { + Self::Isolated(value) + } +} + /// A tool that is installed in its own isolated environment. #[derive(Debug, Clone)] pub struct IsolatedTool { @@ -69,12 +88,6 @@ impl IsolatedTool { } } -impl From for Tool { - fn from(value: IsolatedTool) -> Self { - Self::Isolated(value) - } -} - impl Tool { pub fn as_executable(&self) -> Option { match self {