diff --git a/src/cli/global/install.rs b/src/cli/global/install.rs index e8c268611..a196c05ad 100644 --- a/src/cli/global/install.rs +++ b/src/cli/global/install.rs @@ -1,6 +1,5 @@ use std::ffi::OsStr; use std::path::{Path, PathBuf}; -use std::sync::Arc; use crate::cli::has_specs::HasSpecs; use crate::config::{Config, ConfigCli}; @@ -348,7 +347,7 @@ pub(super) async fn globally_install_package( // Execute the transaction if there is work to do if has_transactions { - let package_cache = Arc::new(PackageCache::new(config::get_cache_dir()?.join("pkgs"))); + let package_cache = PackageCache::new(config::get_cache_dir()?.join("pkgs")); // Execute the operations that are returned by the solver. await_in_progress("creating virtual environment", |pb| { diff --git a/src/environment.rs b/src/environment.rs index f738faeaa..5509db305 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -27,7 +27,7 @@ use rattler_conda_types::{Platform, PrefixRecord, RepoDataRecord}; use rattler_lock::{PypiPackageData, PypiPackageEnvironmentData}; use reqwest_middleware::ClientWithMiddleware; use std::convert::identity; -use std::{collections::HashMap, io::ErrorKind, path::Path, sync::Arc}; +use std::{collections::HashMap, io::ErrorKind, path::Path}; /// Verify the location of the prefix folder is not changed so the applied prefix path is still valid. /// Errors when there is a file system error or the path does not align with the defined prefix. @@ -262,7 +262,7 @@ pub async fn update_prefix_pypi( pypi_records: &[(PypiPackageData, PypiPackageEnvironmentData)], status: &PythonStatus, system_requirements: &SystemRequirements, - uv_context: UvResolutionContext, + uv_context: &UvResolutionContext, pypi_options: &PypiOptions, environment_variables: &HashMap, lock_file_dir: &Path, @@ -441,7 +441,7 @@ impl PythonStatus { pub async fn update_prefix_conda( environment_name: GroupedEnvironmentName, prefix: &Prefix, - package_cache: Arc, + package_cache: PackageCache, authenticated_client: ClientWithMiddleware, installed_packages: Vec, repodata_records: &[RepoDataRecord], diff --git a/src/install.rs b/src/install.rs index ca567b289..8ccb10669 100644 --- a/src/install.rs +++ b/src/install.rs @@ -17,12 +17,11 @@ use rattler_conda_types::{PrefixRecord, RepoDataRecord}; use reqwest_middleware::ClientWithMiddleware; use std::cmp::Ordering; use std::path::{Path, PathBuf}; -use std::sync::Arc; use std::time::Duration; /// Executes the transaction on the given environment. pub async fn execute_transaction( - package_cache: Arc, + package_cache: PackageCache, transaction: &Transaction, prefix_records: &[PrefixRecord], target_prefix: PathBuf, diff --git a/src/install_pypi.rs b/src/install_pypi.rs index 9416db210..208890739 100644 --- a/src/install_pypi.rs +++ b/src/install_pypi.rs @@ -691,7 +691,7 @@ pub async fn update_python_distributions( python_packages: &[CombinedPypiPackageData], python_interpreter_path: &Path, system_requirements: &SystemRequirements, - uv_context: UvResolutionContext, + uv_context: &UvResolutionContext, pypi_options: &PypiOptions, environment_variables: &HashMap, platform: Platform, @@ -774,7 +774,7 @@ pub async fn update_python_distributions( lock_file_dir, editables, &site_packages, - &uv_context, + uv_context, &tags, ®istry_client, &build_dispatch, diff --git a/src/lock_file/update.rs b/src/lock_file/update.rs index 625fb6f63..6a75af764 100644 --- a/src/lock_file/update.rs +++ b/src/lock_file/update.rs @@ -1,22 +1,17 @@ -use crate::lock_file::{PypiRecord, UvResolutionContext}; -use crate::project::grouped_environment::GroupedEnvironmentName; -use crate::project::has_features::HasFeatures; -use crate::pypi_mapping::{self, Reporter}; -use crate::pypi_marker_env::determine_marker_environment; -use crate::pypi_tags::is_python_record; -use crate::{ - config, consts, - environment::{ - self, LockFileUsage, PerEnvironmentAndPlatform, PerGroup, PerGroupAndPlatform, PythonStatus, +use std::{ + borrow::Cow, + collections::{HashMap, HashSet, VecDeque}, + fmt::Write, + future::{ready, Future}, + iter, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }, - load_lock_file, - lock_file::{self, update, OutdatedEnvironments, PypiRecordsByName, RepoDataRecordsByName}, - prefix::Prefix, - progress::global_multi_progress, - project::{grouped_environment::GroupedEnvironment, Environment}, - utils::BarrierCell, - EnvironmentName, Project, + time::{Duration, Instant}, }; + use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; use indexmap::IndexSet; use indicatif::{HumanBytes, ProgressBar, ProgressState}; @@ -27,28 +22,40 @@ use rattler::package_cache::PackageCache; use rattler_conda_types::{Arch, MatchSpec, Platform, RepoDataRecord}; use rattler_lock::{LockFile, PypiPackageData, PypiPackageEnvironmentData}; use rattler_repodata_gateway::{Gateway, RepoData}; -use std::collections::VecDeque; -use std::fmt::Write; -use std::{ - borrow::Cow, - collections::{HashMap, HashSet}, - future::{ready, Future}, - iter, - path::PathBuf, - sync::atomic::{AtomicBool, Ordering}, - sync::Arc, - time::{Duration, Instant}, -}; use tokio::sync::Semaphore; use tracing::Instrument; use url::Url; use uv_normalize::ExtraName; +use crate::{ + config, consts, + environment::{ + self, LockFileUsage, PerEnvironmentAndPlatform, PerGroup, PerGroupAndPlatform, PythonStatus, + }, + load_lock_file, + lock_file::{ + self, update, OutdatedEnvironments, PypiRecord, PypiRecordsByName, RepoDataRecordsByName, + UvResolutionContext, + }, + prefix::Prefix, + progress::global_multi_progress, + project::{ + grouped_environment::{GroupedEnvironment, GroupedEnvironmentName}, + has_features::HasFeatures, + Environment, + }, + pypi_mapping::{self, Reporter}, + pypi_marker_env::determine_marker_environment, + pypi_tags::is_python_record, + utils::BarrierCell, + EnvironmentName, Project, +}; + impl Project { /// Ensures that the lock-file is up-to-date with the project information. /// - /// Returns the lock-file and any potential derived data that was computed as part of this - /// operation. + /// Returns the lock-file and any potential derived data that was computed + /// as part of this operation. pub async fn up_to_date_lock_file( &self, options: UpdateLockFileOptions, @@ -66,13 +73,14 @@ pub struct UpdateLockFileOptions { /// Don't install anything to disk. pub no_install: bool, - /// The maximum number of concurrent solves that are allowed to run. If this value is None - /// a heuristic is used based on the number of cores available from the system. + /// The maximum number of concurrent solves that are allowed to run. If this + /// value is None a heuristic is used based on the number of cores + /// available from the system. pub max_concurrent_solves: Option, } -/// A struct that holds the lock-file and any potential derived data that was computed when calling -/// `ensure_up_to_date_lock_file`. +/// A struct that holds the lock-file and any potential derived data that was +/// computed when calling `ensure_up_to_date_lock_file`. pub struct LockFileDerivedData<'p> { pub project: &'p Project, @@ -80,12 +88,13 @@ pub struct LockFileDerivedData<'p> { pub lock_file: LockFile, /// The package cache - pub package_cache: Arc, + pub package_cache: PackageCache, /// A list of prefixes that are up-to-date with the latest conda packages. pub updated_conda_prefixes: HashMap, (Prefix, PythonStatus)>, - /// A list of prefixes that have been updated while resolving all dependencies. + /// A list of prefixes that have been updated while resolving all + /// dependencies. pub updated_pypi_prefixes: HashMap, Prefix>, /// The cached uv context @@ -93,6 +102,15 @@ pub struct LockFileDerivedData<'p> { } impl<'p> LockFileDerivedData<'p> { + /// Write the lock-file to disk. + pub fn write_to_disk(&self) -> miette::Result<()> { + let lock_file_path = self.project.lock_file_path(); + self.lock_file + .to_path(&lock_file_path) + .into_diagnostic() + .context("failed to write lock-file to disk") + } + /// Returns the up-to-date prefix for the given environment. pub async fn prefix(&mut self, environment: &Environment<'p>) -> miette::Result { if let Some(prefix) = self.updated_pypi_prefixes.get(environment) { @@ -131,7 +149,7 @@ impl<'p> LockFileDerivedData<'p> { &pypi_records, &python_status, &environment.system_requirements(), - uv_context, + &uv_context, &environment.pypi_options(), env_variables, self.project.root(), @@ -218,24 +236,30 @@ impl<'p> LockFileDerivedData<'p> { } } -#[derive(Default)] struct UpdateContext<'p> { - /// Repodata records from the lock-file. This contains the records that actually exist in the - /// lock-file. If the lock-file is missing or partially missing then the data also won't exist - /// in this field. + project: &'p Project, + + /// Repodata records from the lock-file. This contains the records that + /// actually exist in the lock-file. If the lock-file is missing or + /// partially missing then the data also won't exist in this field. locked_repodata_records: PerEnvironmentAndPlatform<'p, Arc>, /// Repodata records from the lock-file grouped by solve-group. locked_grouped_repodata_records: PerGroupAndPlatform<'p, Arc>, - /// Repodata records from the lock-file. This contains the records that actually exist in the - /// lock-file. If the lock-file is missing or partially missing then the data also won't exist - /// in this field. + /// Repodata records from the lock-file. This contains the records that + /// actually exist in the lock-file. If the lock-file is missing or + /// partially missing then the data also won't exist in this field. locked_pypi_records: PerEnvironmentAndPlatform<'p, Arc>, - /// Keeps track of all pending conda targets that are being solved. The mapping contains a - /// [`BarrierCell`] that will eventually contain the solved records computed by another task. - /// This allows tasks to wait for the records to be solved before proceeding. + /// Information about environments that are considered out of date. Only + /// these environments are updated. + outdated_envs: OutdatedEnvironments<'p>, + + /// Keeps track of all pending conda targets that are being solved. The + /// mapping contains a [`BarrierCell`] that will eventually contain the + /// solved records computed by another task. This allows tasks to wait + /// for the records to be solved before proceeding. solved_repodata_records: PerEnvironmentAndPlatform<'p, Arc>>>, @@ -243,23 +267,39 @@ struct UpdateContext<'p> { grouped_solved_repodata_records: PerGroupAndPlatform<'p, Arc>>>, - /// Keeps track of all pending prefix updates. This only tracks the conda updates to a prefix, - /// not whether the pypi packages have also been updated. + /// Keeps track of all pending prefix updates. This only tracks the conda + /// updates to a prefix, not whether the pypi packages have also been + /// updated. instantiated_conda_prefixes: PerGroup<'p, Arc>>, - /// Keeps track of all pending conda targets that are being solved. The mapping contains a - /// [`BarrierCell`] that will eventually contain the solved records computed by another task. - /// This allows tasks to wait for the records to be solved before proceeding. + /// Keeps track of all pending conda targets that are being solved. The + /// mapping contains a [`BarrierCell`] that will eventually contain the + /// solved records computed by another task. This allows tasks to wait + /// for the records to be solved before proceeding. solved_pypi_records: PerEnvironmentAndPlatform<'p, Arc>>>, /// Keeps track of all pending grouped pypi targets that are being solved. grouped_solved_pypi_records: PerGroupAndPlatform<'p, Arc>>>, + + /// The package cache to use when instantiating prefixes. + package_cache: PackageCache, + + /// A semaphore to limit the number of concurrent solves. + conda_solve_semaphore: Arc, + + /// A semaphore to limit the number of concurrent pypi solves. + /// TODO(tim): we need this semaphore, to limit the number of concurrent + /// solves. This is a problem when using source dependencies + pypi_solve_semaphore: Arc, + + /// Whether it is allowed to instantiate any prefix. + no_install: bool, } impl<'p> UpdateContext<'p> { - /// Returns a future that will resolve to the solved repodata records for the given environment - /// group or `None` if the records do not exist and are also not in the process of being - /// updated. + /// Returns a future that will resolve to the solved repodata records for + /// the given environment group or `None` if the records do not exist + /// and are also not in the process of being updated. pub fn get_latest_group_repodata_records( &self, group: &GroupedEnvironment<'p>, @@ -285,9 +325,9 @@ impl<'p> UpdateContext<'p> { Some(ready(locked_records).right_future()) } - /// Returns a future that will resolve to the solved pypi records for the given environment - /// group or `None` if the records do not exist and are also not in the process of being - /// updated. + /// Returns a future that will resolve to the solved pypi records for the + /// given environment group or `None` if the records do not exist and + /// are also not in the process of being updated. pub fn get_latest_group_pypi_records( &self, group: &GroupedEnvironment<'p>, @@ -306,8 +346,9 @@ impl<'p> UpdateContext<'p> { None } - /// Takes the latest repodata records for the given environment and platform. Returns `None` if - /// neither the records exist nor are in the process of being updated. + /// Takes the latest repodata records for the given environment and + /// platform. Returns `None` if neither the records exist nor are in the + /// process of being updated. /// /// This function panics if the repodata records are still pending. pub fn take_latest_repodata_records( @@ -332,8 +373,9 @@ impl<'p> UpdateContext<'p> { .map(|records| Arc::try_unwrap(records).unwrap_or_else(|arc| (*arc).clone())) } - /// Takes the latest pypi records for the given environment and platform. Returns `None` if - /// neither the records exist nor are in the process of being updated. + /// Takes the latest pypi records for the given environment and platform. + /// Returns `None` if neither the records exist nor are in the process + /// of being updated. /// /// This function panics if the repodata records are still pending. pub fn take_latest_pypi_records( @@ -377,8 +419,9 @@ impl<'p> UpdateContext<'p> { .collect() } - /// Returns a future that will resolve to the solved repodata records for the given environment - /// or `None` if no task was spawned to instantiate the prefix. + /// Returns a future that will resolve to the solved repodata records for + /// the given environment or `None` if no task was spawned to + /// instantiate the prefix. pub fn get_conda_prefix( &self, environment: &GroupedEnvironment<'p>, @@ -393,8 +436,8 @@ fn default_max_concurrent_solves() -> usize { std::thread::available_parallelism().map_or(1, |n| n.get()) } -/// If the project has any source dependencies, like `git` or `path` dependencies. -/// for pypi dependencies, we need to limit the solve to 1, +/// If the project has any source dependencies, like `git` or `path` +/// dependencies. for pypi dependencies, we need to limit the solve to 1, /// because of uv internals fn determine_pypi_solve_permits(project: &Project) -> usize { // Get all environments @@ -414,29 +457,21 @@ fn determine_pypi_solve_permits(project: &Project) -> usize { /// Ensures that the lock-file is up-to-date with the project. /// -/// This function will return a [`LockFileDerivedData`] struct that contains the lock-file and any -/// potential derived data that was computed as part of this function. The derived data might be -/// usable by other functions to avoid recomputing the same data. +/// This function will return a [`LockFileDerivedData`] struct that contains the +/// lock-file and any potential derived data that was computed as part of this +/// function. The derived data might be usable by other functions to avoid +/// recomputing the same data. /// -/// This function starts by checking if the lock-file is up-to-date. If it is not up-to-date it will -/// construct a task graph of all the work that needs to be done to update the lock-file. The tasks -/// are awaited in a specific order to make sure that we can start instantiating prefixes as soon as -/// possible. +/// This function starts by checking if the lock-file is up-to-date. If it is +/// not up-to-date it will construct a task graph of all the work that needs to +/// be done to update the lock-file. The tasks are awaited in a specific order +/// to make sure that we can start instantiating prefixes as soon as possible. pub async fn ensure_up_to_date_lock_file( project: &Project, options: UpdateLockFileOptions, ) -> miette::Result> { let lock_file = load_lock_file(project).await?; - let current_platform = Platform::current(); - let package_cache = Arc::new(PackageCache::new(config::get_cache_dir()?.join("pkgs"))); - let max_concurrent_solves = options - .max_concurrent_solves - .unwrap_or_else(default_max_concurrent_solves); - - let solve_semaphore = Arc::new(Semaphore::new(max_concurrent_solves)); - - // TODO(tim): we need this semaphore, to limit the number of concurrent solves. This is a problem when using source dependencies - let pypi_solve_semaphore = Arc::new(Semaphore::new(determine_pypi_solve_permits(project))); + let package_cache = PackageCache::new(config::get_cache_dir()?.join("pkgs")); // should we check the lock-file in the first place? if !options.lock_file_usage.should_check_if_out_of_date() { @@ -468,605 +503,755 @@ pub async fn ensure_up_to_date_lock_file( }); } - // If the lock-file is out of date, but we're not allowed to update it, we should exit. + // If the lock-file is out of date, but we're not allowed to update it, we + // should exit. if !options.lock_file_usage.allows_lock_file_updates() { miette::bail!("lock-file not up-to-date with the project"); } - // Extract the current conda records from the lock-file - // TODO: Should we parallelize this? Measure please. - let locked_repodata_records = project - .environments() - .into_iter() - .flat_map(|env| { - lock_file - .environment(env.name().as_str()) - .into_iter() - .map(move |locked_env| { - locked_env.conda_repodata_records().map(|records| { + let max_concurrent_solves = options + .max_concurrent_solves + .unwrap_or_else(default_max_concurrent_solves); + + // Construct an update context and perform the actual update. + let lock_file_derived_data = UpdateContext::builder(project) + .with_max_concurrent_solves(max_concurrent_solves) + .with_package_cache(package_cache) + .with_no_install(options.no_install) + .with_outdated_environments(outdated) + .with_lock_file(lock_file) + .finish()? + .update() + .await?; + + // Write the lock-file to disk + lock_file_derived_data.write_to_disk()?; + + Ok(lock_file_derived_data) +} + +struct UpdateContextBuilder<'p> { + /// The project + project: &'p Project, + + /// The current lock-file. + lock_file: LockFile, + + /// The environments that are considered outdated. These are the + /// environments that will be updated in the lock-file. If this value is + /// `None` it will be computed from the project and the lock-file. + outdated_environments: Option>, + + /// Defines if during the update-process it is allowed to create prefixes. + /// This might be required to solve pypi dependencies because those require + /// a python interpreter. + no_install: bool, + + /// The package cache to use during the update process. + package_cache: Option, + + /// The maximum number of concurrent solves that are allowed to run. If this + /// value is `None` a heuristic is used based on the number of cores + /// available from the system. + max_concurrent_solves: Option, +} + +impl<'p> UpdateContextBuilder<'p> { + /// The package cache to use during the update process. Prefixes might need + /// to be instantiated to be able to solve pypi dependencies. + pub fn with_package_cache(self, package_cache: PackageCache) -> Self { + Self { + package_cache: Some(package_cache), + ..self + } + } + + /// Defines if during the update-process it is allowed to create prefixes. + /// This might be required to solve pypi dependencies because those require + /// a python interpreter. + pub fn with_no_install(self, no_install: bool) -> Self { + Self { no_install, ..self } + } + + /// Sets the current lock-file that should be used to determine the + /// previously locked packages. + pub fn with_lock_file(self, lock_file: LockFile) -> Self { + Self { lock_file, ..self } + } + + /// Explicitly set the environments that are considered out-of-date. Only + /// these environments will be updated during the update process. + pub fn with_outdated_environments( + self, + outdated_environments: OutdatedEnvironments<'p>, + ) -> Self { + Self { + outdated_environments: Some(outdated_environments), + ..self + } + } + + /// Sets the maximum number of solves that are allowed to run concurrently. + pub fn with_max_concurrent_solves(self, max_concurrent_solves: usize) -> Self { + Self { + max_concurrent_solves: Some(max_concurrent_solves), + ..self + } + } + + /// Construct the context. + pub fn finish(self) -> miette::Result> { + let project = self.project; + let package_cache = match self.package_cache { + Some(package_cache) => package_cache, + None => PackageCache::new(config::get_cache_dir()?.join("pkgs")), + }; + let lock_file = self.lock_file; + let outdated = self.outdated_environments.unwrap_or_else(|| { + OutdatedEnvironments::from_project_and_lock_file(project, &lock_file) + }); + + // Extract the current conda records from the lock-file + // TODO: Should we parallelize this? Measure please. + let locked_repodata_records = project + .environments() + .into_iter() + .flat_map(|env| { + lock_file + .environment(env.name().as_str()) + .into_iter() + .map(move |locked_env| { + locked_env.conda_repodata_records().map(|records| { + ( + env.clone(), + records + .into_iter() + .map(|(platform, records)| { + ( + platform, + Arc::new(RepoDataRecordsByName::from_iter(records)), + ) + }) + .collect(), + ) + }) + }) + }) + .collect::>, _>>() + .into_diagnostic()?; + + let locked_pypi_records = project + .environments() + .into_iter() + .flat_map(|env| { + lock_file + .environment(env.name().as_str()) + .into_iter() + .map(move |locked_env| { ( env.clone(), - records + locked_env + .pypi_packages() .into_iter() .map(|(platform, records)| { - ( - platform, - Arc::new(RepoDataRecordsByName::from_iter(records)), - ) + (platform, Arc::new(PypiRecordsByName::from_iter(records))) }) .collect(), ) }) - }) - }) - .collect::>, _>>() - .into_diagnostic()?; - - let locked_pypi_records = project - .environments() - .into_iter() - .flat_map(|env| { - lock_file - .environment(env.name().as_str()) - .into_iter() - .map(move |locked_env| { - ( - env.clone(), - locked_env - .pypi_packages() + }) + .collect::>>(); + + // Create a collection of all the [`GroupedEnvironments`] involved in the solve. + let all_grouped_environments = project + .environments() + .into_iter() + .map(GroupedEnvironment::from) + .unique() + .collect_vec(); + + // For every grouped environment extract the data from the lock-file. If + // multiple environments in a single solve-group have different versions for + // a single package name than the record with the highest version is used. + // This logic is implemented in `RepoDataRecordsByName::from_iter`. This can + // happen if previously two environments did not share the same solve-group. + let locked_grouped_repodata_records = all_grouped_environments + .iter() + .filter_map(|group| { + // If any content of the environments in the group are outdated we need to + // disregard the locked content. + if group + .environments() + .any(|e| outdated.disregard_locked_content.contains(&e)) + { + return None; + } + + let records = match group { + GroupedEnvironment::Environment(env) => { + locked_repodata_records.get(env)?.clone() + } + GroupedEnvironment::Group(group) => { + let mut by_platform = HashMap::new(); + for env in group.environments() { + let Some(records) = locked_repodata_records.get(&env) else { + continue; + }; + + for (platform, records) in records.iter() { + by_platform + .entry(*platform) + .or_insert_with(Vec::new) + .extend(records.records.iter().cloned()); + } + } + + by_platform .into_iter() .map(|(platform, records)| { - (platform, Arc::new(PypiRecordsByName::from_iter(records))) + ( + platform, + Arc::new(RepoDataRecordsByName::from_iter(records)), + ) }) - .collect(), - ) - }) - }) - .collect::>>(); - - // Create a collection of all the [`GroupedEnvironments`] involved in the solve. - let all_grouped_environments = project - .environments() - .into_iter() - .map(GroupedEnvironment::from) - .unique() - .collect_vec(); - - // For every grouped environment extract the data from the lock-file. If multiple environments in a single - // solve-group have different versions for a single package name than the record with the highest version is used. - // This logic is implemented in `RepoDataRecordsByName::from_iter`. This can happen if previously two environments - // did not share the same solve-group. - let locked_grouped_repodata_records = all_grouped_environments - .iter() - .filter_map(|group| { - // If any content of the environments in the group are outdated we need to disregard the locked content. - if group - .environments() - .any(|e| outdated.disregard_locked_content.contains(&e)) - { - return None; - } + .collect() + } + }; + Some((group.clone(), records)) + }) + .collect(); - let records = match group { - GroupedEnvironment::Environment(env) => locked_repodata_records.get(env)?.clone(), - GroupedEnvironment::Group(group) => { - let mut by_platform = HashMap::new(); - for env in group.environments() { - let Some(records) = locked_repodata_records.get(&env) else { - continue; - }; + let max_concurrent_solves = self + .max_concurrent_solves + .unwrap_or_else(default_max_concurrent_solves); - for (platform, records) in records.iter() { - by_platform - .entry(*platform) - .or_insert_with(Vec::new) - .extend(records.records.iter().cloned()); - } - } + Ok(UpdateContext { + project, - by_platform - .into_iter() - .map(|(platform, records)| { - ( - platform, - Arc::new(RepoDataRecordsByName::from_iter(records)), - ) - }) - .collect() - } - }; - Some((group.clone(), records)) + locked_repodata_records, + locked_grouped_repodata_records, + locked_pypi_records, + outdated_envs: outdated, + + solved_repodata_records: HashMap::new(), + instantiated_conda_prefixes: HashMap::new(), + solved_pypi_records: HashMap::new(), + grouped_solved_repodata_records: HashMap::new(), + grouped_solved_pypi_records: HashMap::new(), + + package_cache, + conda_solve_semaphore: Arc::new(Semaphore::new(max_concurrent_solves)), + pypi_solve_semaphore: Arc::new(Semaphore::new(determine_pypi_solve_permits(project))), + + no_install: self.no_install, }) - .collect(); - - // Create a mapping that iterators over all outdated environments and their platforms for both - // and pypi. - let all_outdated_envs = itertools::chain(outdated.conda.iter(), outdated.pypi.iter()).fold( - HashMap::, HashSet>::new(), - |mut acc, (env, platforms)| { - acc.entry(env.clone()) - .or_default() - .extend(platforms.iter().cloned()); - acc - }, - ); - - let mut context = UpdateContext { - locked_repodata_records, - locked_grouped_repodata_records, - locked_pypi_records, - - solved_repodata_records: HashMap::new(), - instantiated_conda_prefixes: HashMap::new(), - solved_pypi_records: HashMap::new(), - grouped_solved_repodata_records: HashMap::new(), - grouped_solved_pypi_records: HashMap::new(), - }; + } +} - // This will keep track of all outstanding tasks that we need to wait for. All tasks are added - // to this list after they are spawned. This function blocks until all pending tasks have either - // completed or errored. - let mut pending_futures = FuturesUnordered::new(); - - // Spawn tasks for all the conda targets that are out of date. - for (environment, platforms) in outdated.conda { - // Turn the platforms into an IndexSet, so we have a little control over the order in which - // we solve the platforms. We want to solve the current platform first, so we can start - // instantiating prefixes if we have to. - let mut ordered_platforms = platforms.into_iter().collect::>(); - if let Some(current_platform_index) = - ordered_platforms.get_index_of(&environment.best_platform()) - { - ordered_platforms.move_index(current_platform_index, 0); +impl<'p> UpdateContext<'p> { + /// Construct a new builder for the update context. + pub fn builder(project: &'p Project) -> UpdateContextBuilder<'p> { + UpdateContextBuilder { + project, + lock_file: LockFile::default(), + outdated_environments: None, + no_install: false, + package_cache: None, + max_concurrent_solves: None, } + } - // Determine the source of the solve information - let source = GroupedEnvironment::from(environment.clone()); + pub async fn update(mut self) -> miette::Result> { + let project = self.project; + let current_platform = Platform::current(); - for platform in ordered_platforms { - // Is there an existing pending task to solve the group? - if context - .grouped_solved_repodata_records - .get(&source) - .and_then(|platforms| platforms.get(&platform)) - .is_some() + // Create a mapping that iterators over all outdated environments and their + // platforms for both and pypi. + let all_outdated_envs = itertools::chain( + self.outdated_envs.conda.iter(), + self.outdated_envs.pypi.iter(), + ) + .fold( + HashMap::, HashSet>::new(), + |mut acc, (env, platforms)| { + acc.entry(env.clone()) + .or_default() + .extend(platforms.iter().cloned()); + acc + }, + ); + + // This will keep track of all outstanding tasks that we need to wait for. All + // tasks are added to this list after they are spawned. This function blocks + // until all pending tasks have either completed or errored. + let mut pending_futures = FuturesUnordered::new(); + + // Spawn tasks for all the conda targets that are out of date. + for (environment, platforms) in self.outdated_envs.conda.iter() { + // Turn the platforms into an IndexSet, so we have a little control over the + // order in which we solve the platforms. We want to solve the current + // platform first, so we can start instantiating prefixes if we have to. + let mut ordered_platforms = platforms.iter().copied().collect::>(); + if let Some(current_platform_index) = + ordered_platforms.get_index_of(&environment.best_platform()) { - // Yes, we can reuse the existing cell. - continue; + ordered_platforms.move_index(current_platform_index, 0); } - // No, we need to spawn a task to update for the entire solve group. - let locked_group_records = context - .locked_grouped_repodata_records - .get(&source) - .and_then(|records| records.get(&platform)) - .cloned() - .unwrap_or_default(); - - // Spawn a task to solve the group. - let group_solve_task = spawn_solve_conda_environment_task( - source.clone(), - locked_group_records, - project.repodata_gateway().clone(), - platform, - solve_semaphore.clone(), - project.client().clone(), - ) - .boxed_local(); - // Store the task so we can poll it later. - pending_futures.push(group_solve_task); + // Determine the source of the solve information + let source = GroupedEnvironment::from(environment.clone()); - // Create an entry that can be used by other tasks to wait for the result. - let previous_cell = context - .grouped_solved_repodata_records - .entry(source.clone()) - .or_default() - .insert(platform, Arc::default()); - assert!( - previous_cell.is_none(), - "a cell has already been added to update conda records" - ); - } - } + for platform in ordered_platforms { + // Is there an existing pending task to solve the group? + if self + .grouped_solved_repodata_records + .get(&source) + .and_then(|platforms| platforms.get(&platform)) + .is_some() + { + // Yes, we can reuse the existing cell. + continue; + } + // No, we need to spawn a task to update for the entire solve group. + let locked_group_records = self + .locked_grouped_repodata_records + .get(&source) + .and_then(|records| records.get(&platform)) + .cloned() + .unwrap_or_default(); + + // Spawn a task to solve the group. + let group_solve_task = spawn_solve_conda_environment_task( + source.clone(), + locked_group_records, + project.repodata_gateway().clone(), + platform, + self.conda_solve_semaphore.clone(), + project.client().clone(), + ) + .boxed_local(); - // Spawn tasks to instantiate prefixes that we need to be able to solve Pypi packages. - // - // Solving Pypi packages requires a python interpreter to be present in the prefix, therefore we - // first need to make sure we have conda packages available, then we can instantiate the - // prefix with at least the required conda packages (including a python interpreter) and then - // we can solve the Pypi packages using the installed interpreter. - // - // We only need to instantiate the prefix for the current platform. - for (environment, platforms) in outdated.pypi.iter() { - // Only instantiate a prefix if any of the platforms actually contain pypi dependencies. If - // there are no pypi-dependencies than solving is also not required and thus a prefix is - // also not required. - if !platforms - .iter() - .any(|p| !environment.pypi_dependencies(Some(*p)).is_empty()) - { - continue; - } + // Store the task so we can poll it later. + pending_futures.push(group_solve_task); - // If we are not allowed to install, we can't instantiate a prefix. - if options.no_install { - miette::bail!("Cannot update pypi dependencies without first installing a conda prefix that includes python."); + // Create an entry that can be used by other tasks to wait for the result. + let previous_cell = self + .grouped_solved_repodata_records + .entry(source.clone()) + .or_default() + .insert(platform, Arc::default()); + assert!( + previous_cell.is_none(), + "a cell has already been added to update conda records" + ); + } } - // Check if the group is already being instantiated - let group = GroupedEnvironment::from(environment.clone()); - if context.instantiated_conda_prefixes.contains_key(&group) { - continue; - } + // Spawn tasks to instantiate prefixes that we need to be able to solve Pypi + // packages. + // + // Solving Pypi packages requires a python interpreter to be present in the + // prefix, therefore we first need to make sure we have conda packages + // available, then we can instantiate the prefix with at least the required + // conda packages (including a python interpreter) and then we can solve the + // Pypi packages using the installed interpreter. + // + // We only need to instantiate the prefix for the current platform. + for (environment, platforms) in self.outdated_envs.pypi.iter() { + // Only instantiate a prefix if any of the platforms actually contain pypi + // dependencies. If there are no pypi-dependencies than solving is also + // not required and thus a prefix is also not required. + if !platforms + .iter() + .any(|p| !environment.pypi_dependencies(Some(*p)).is_empty()) + { + continue; + } - // Construct a future that will resolve when we have the repodata available for the current - // platform for this group. - let records_future = context - .get_latest_group_repodata_records(&group, environment.best_platform()) - .ok_or_else(|| make_unsupported_pypi_platform_error(environment, current_platform))?; - - // Spawn a task to instantiate the environment - let environment_name = environment.name().clone(); - let pypi_env_task = - spawn_create_prefix_task(group.clone(), package_cache.clone(), records_future) - .map_err(move |e| { - e.context(format!( - "failed to instantiate a prefix for '{}'", - environment_name - )) - }) - .boxed_local(); + // If we are not allowed to install, we can't instantiate a prefix. + if self.no_install { + miette::bail!("Cannot update pypi dependencies without first installing a conda prefix that includes python."); + } - pending_futures.push(pypi_env_task); - let previous_cell = context - .instantiated_conda_prefixes - .insert(group, Arc::new(BarrierCell::new())); - assert!( - previous_cell.is_none(), - "cannot update the same group twice" - ) - } + // Check if the group is already being instantiated + let group = GroupedEnvironment::from(environment.clone()); + if self.instantiated_conda_prefixes.contains_key(&group) { + continue; + } - // Spawn tasks to update the pypi packages. - let mut uv_context = None; - for (environment, platform) in outdated - .pypi - .into_iter() - .flat_map(|(env, platforms)| platforms.into_iter().map(move |p| (env.clone(), p))) - { - let group = GroupedEnvironment::from(environment.clone()); - - // If the environment does not have any pypi dependencies we can skip it. - if environment.pypi_dependencies(Some(platform)).is_empty() { - continue; - } + // Construct a future that will resolve when we have the repodata available for + // the current platform for this group. + let records_future = self + .get_latest_group_repodata_records(&group, environment.best_platform()) + .ok_or_else(|| { + make_unsupported_pypi_platform_error(environment, current_platform) + })?; + + // Spawn a task to instantiate the environment + let environment_name = environment.name().clone(); + let pypi_env_task = + spawn_create_prefix_task(group.clone(), self.package_cache.clone(), records_future) + .map_err(move |e| { + e.context(format!( + "failed to instantiate a prefix for '{}'", + environment_name + )) + }) + .boxed_local(); - // Solve all the pypi records in the solve group together. - if context - .grouped_solved_pypi_records - .get(&group) - .and_then(|records| records.get(&platform)) - .is_some() - { - // There is already a task to solve the pypi records for the group. - continue; + pending_futures.push(pypi_env_task); + let previous_cell = self + .instantiated_conda_prefixes + .insert(group, Arc::new(BarrierCell::new())); + assert!( + previous_cell.is_none(), + "cannot update the same group twice" + ) } - // Construct a future that will resolve when we have the repodata available - let repodata_future = context - .get_latest_group_repodata_records(&group, platform) - .expect("conda records should be available now or in the future"); + // Spawn tasks to update the pypi packages. + let mut uv_context = None; + for (environment, platform) in self + .outdated_envs + .pypi + .iter() + .flat_map(|(env, platforms)| platforms.iter().map(move |p| (env.clone(), *p))) + { + let group = GroupedEnvironment::from(environment.clone()); - // Construct a future that will resolve when we have the conda prefix available - let prefix_future = context - .get_conda_prefix(&group) - .expect("prefix should be available now or in the future"); + // If the environment does not have any pypi dependencies we can skip it. + if environment.pypi_dependencies(Some(platform)).is_empty() { + continue; + } - // Get the uv context - let uv_context = match &uv_context { - None => { - let context = UvResolutionContext::from_project(project)?; - uv_context = Some(context.clone()); - context + // Solve all the pypi records in the solve group together. + if self + .grouped_solved_pypi_records + .get(&group) + .and_then(|records| records.get(&platform)) + .is_some() + { + // There is already a task to solve the pypi records for the group. + continue; } - Some(context) => context.clone(), - }; - // Get environment variables from the activation - let env_variables = project.get_env_variables(&environment).await?; + // Construct a future that will resolve when we have the repodata available + let repodata_future = self + .get_latest_group_repodata_records(&group, platform) + .expect("conda records should be available now or in the future"); + + // Construct a future that will resolve when we have the conda prefix available + let prefix_future = self + .get_conda_prefix(&group) + .expect("prefix should be available now or in the future"); + + // Get the uv context + let uv_context = match uv_context.as_ref() { + None => uv_context + .insert(UvResolutionContext::from_project(project)?) + .clone(), + Some(context) => context.clone(), + }; - // Get the previously locked pypi records - let locked_pypi_records = Arc::new( - context - .locked_pypi_records - .get(&environment) - .and_then(|records| records.get(&platform)) - .map(|records| records.records.clone()) - .unwrap_or_default(), - ); + // Get environment variables from the activation + let env_variables = project.get_env_variables(&environment).await?; - // Spawn a task to solve the pypi environment - let pypi_solve_future = spawn_solve_pypi_task( - uv_context, - group.clone(), - platform, - repodata_future, - prefix_future, - env_variables, - pypi_solve_semaphore.clone(), - project.root().to_path_buf(), - locked_pypi_records, - ); + // Get the previously locked pypi records + let locked_pypi_records = Arc::new( + self.locked_pypi_records + .get(&environment) + .and_then(|records| records.get(&platform)) + .map(|records| records.records.clone()) + .unwrap_or_default(), + ); - pending_futures.push(pypi_solve_future.boxed_local()); + // Spawn a task to solve the pypi environment + let pypi_solve_future = spawn_solve_pypi_task( + uv_context, + group.clone(), + platform, + repodata_future, + prefix_future, + env_variables, + self.pypi_solve_semaphore.clone(), + project.root().to_path_buf(), + locked_pypi_records, + ); - let previous_cell = context - .grouped_solved_pypi_records - .entry(group) - .or_default() - .insert(platform, Arc::default()); - assert!( - previous_cell.is_none(), - "a cell has already been added to update pypi records" - ); - } + pending_futures.push(pypi_solve_future.boxed_local()); - // Iteratate over all outdated environments and their platforms and extract the corresponding records from them. - for (environment, platform) in all_outdated_envs.iter().flat_map(|(env, platforms)| { - iter::once(env.clone()).cartesian_product(platforms.iter().cloned()) - }) { - let grouped_environment = GroupedEnvironment::from(environment.clone()); - - // Get futures that will resolve when the conda and pypi records become available. - let grouped_repodata_records = context - .get_latest_group_repodata_records(&grouped_environment, platform) - .expect("conda records should be available now or in the future"); - let grouped_pypi_records = context - .get_latest_group_pypi_records(&grouped_environment, platform) - .map(Either::Left) - .unwrap_or_else(|| Either::Right(ready(Arc::default()))); - - // Spawn a task to extract a subset of the resolution. - let extract_resolution_task = spawn_extract_environment_task( - environment.clone(), - platform, - grouped_repodata_records, - grouped_pypi_records, - ); - pending_futures.push(extract_resolution_task.boxed_local()); - - // Create a cell that will be used to store the result of the extraction. - let previous_cell = context - .solved_repodata_records - .entry(environment.clone()) - .or_default() - .insert(platform, Arc::default()); - assert!( - previous_cell.is_none(), - "a cell has already been added to update conda records" - ); + let previous_cell = self + .grouped_solved_pypi_records + .entry(group) + .or_default() + .insert(platform, Arc::default()); + assert!( + previous_cell.is_none(), + "a cell has already been added to update pypi records" + ); + } - let previous_cell = context - .solved_pypi_records - .entry(environment.clone()) - .or_default() - .insert(platform, Arc::default()); - assert!( - previous_cell.is_none(), - "a cell has already been added to update pypi records" - ); - } + // Iteratate over all outdated environments and their platforms and extract the + // corresponding records from them. + for (environment, platform) in all_outdated_envs.iter().flat_map(|(env, platforms)| { + iter::once(env.clone()).cartesian_product(platforms.iter().cloned()) + }) { + let grouped_environment = GroupedEnvironment::from(environment.clone()); + + // Get futures that will resolve when the conda and pypi records become + // available. + let grouped_repodata_records = self + .get_latest_group_repodata_records(&grouped_environment, platform) + .expect("conda records should be available now or in the future"); + let grouped_pypi_records = self + .get_latest_group_pypi_records(&grouped_environment, platform) + .map(Either::Left) + .unwrap_or_else(|| Either::Right(ready(Arc::default()))); + + // Spawn a task to extract a subset of the resolution. + let extract_resolution_task = spawn_extract_environment_task( + environment.clone(), + platform, + grouped_repodata_records, + grouped_pypi_records, + ); + pending_futures.push(extract_resolution_task.boxed_local()); - let top_level_progress = - global_multi_progress().add(ProgressBar::new(pending_futures.len() as u64)); - top_level_progress.set_style(indicatif::ProgressStyle::default_bar() - .template("{spinner:.cyan} {prefix:20!} [{elapsed_precise}] [{bar:40!.bright.yellow/dim.white}] {pos:>4}/{len:4} {wide_msg:.dim}").unwrap() - .progress_chars("━━╾─")); - top_level_progress.enable_steady_tick(Duration::from_millis(50)); - top_level_progress.set_prefix("updating lock-file"); - - // Iterate over all the futures we spawned and wait for them to complete. - // - // The spawned futures each result either in an error or in a `TaskResult`. The `TaskResult` - // contains the result of the task. The results are stored into [`BarrierCell`]s which allows - // other tasks to respond to the data becoming available. - // - // A loop on the main task is used versus individually spawning all tasks for two reasons: - // - // 1. This provides some control over when data is polled and broadcasted to other tasks. No - // data is broadcasted until we start polling futures here. This reduces the risk of - // race-conditions where data has already been broadcasted before a task subscribes to it. - // 2. The futures stored in `pending_futures` do not necessarily have to be `'static`. Which - // makes them easier to work with. - while let Some(result) = pending_futures.next().await { - top_level_progress.inc(1); - match result? { - TaskResult::CondaGroupSolved(group_name, platform, records, duration) => { - let group = GroupedEnvironment::from_name(project, &group_name) - .expect("group should exist"); + // Create a cell that will be used to store the result of the extraction. + let previous_cell = self + .solved_repodata_records + .entry(environment.clone()) + .or_default() + .insert(platform, Arc::default()); + assert!( + previous_cell.is_none(), + "a cell has already been added to update conda records" + ); - context - .grouped_solved_repodata_records - .get_mut(&group) - .expect("the entry for this environment should exist") - .get_mut(&platform) - .expect("the entry for this platform should exist") - .set(Arc::new(records)) - .expect("records should not be solved twice"); - - match group_name { - GroupedEnvironmentName::Group(_) => { - tracing::info!( - "resolved conda environment for solve group '{}' '{}' in {}", - group_name.fancy_display(), - consts::PLATFORM_STYLE.apply_to(platform), - humantime::format_duration(duration) - ); - } - GroupedEnvironmentName::Environment(env_name) => { - tracing::info!( - "resolved conda environment for environment '{}' '{}' in {}", - env_name.fancy_display(), - consts::PLATFORM_STYLE.apply_to(platform), - humantime::format_duration(duration) - ); + let previous_cell = self + .solved_pypi_records + .entry(environment.clone()) + .or_default() + .insert(platform, Arc::default()); + assert!( + previous_cell.is_none(), + "a cell has already been added to update pypi records" + ); + } + + let top_level_progress = + global_multi_progress().add(ProgressBar::new(pending_futures.len() as u64)); + top_level_progress.set_style(indicatif::ProgressStyle::default_bar() + .template("{spinner:.cyan} {prefix:20!} [{elapsed_precise}] [{bar:40!.bright.yellow/dim.white}] {pos:>4}/{len:4} {wide_msg:.dim}").unwrap() + .progress_chars("━━╾─")); + top_level_progress.enable_steady_tick(Duration::from_millis(50)); + top_level_progress.set_prefix("updating lock-file"); + + // Iterate over all the futures we spawned and wait for them to complete. + // + // The spawned futures each result either in an error or in a `TaskResult`. The + // `TaskResult` contains the result of the task. The results are stored into + // [`BarrierCell`]s which allows other tasks to respond to the data becoming + // available. + // + // A loop on the main task is used versus individually spawning all tasks for + // two reasons: + // + // 1. This provides some control over when data is polled and broadcasted to + // other tasks. No data is broadcasted until we start polling futures here. + // This reduces the risk of race-conditions where data has already been + // broadcasted before a task subscribes to it. + // 2. The futures stored in `pending_futures` do not necessarily have to be + // `'static`. Which makes them easier to work with. + while let Some(result) = pending_futures.next().await { + top_level_progress.inc(1); + match result? { + TaskResult::CondaGroupSolved(group_name, platform, records, duration) => { + let group = GroupedEnvironment::from_name(project, &group_name) + .expect("group should exist"); + + self.grouped_solved_repodata_records + .get_mut(&group) + .expect("the entry for this environment should exist") + .get_mut(&platform) + .expect("the entry for this platform should exist") + .set(Arc::new(records)) + .expect("records should not be solved twice"); + + match group_name { + GroupedEnvironmentName::Group(_) => { + tracing::info!( + "resolved conda environment for solve group '{}' '{}' in {}", + group_name.fancy_display(), + consts::PLATFORM_STYLE.apply_to(platform), + humantime::format_duration(duration) + ); + } + GroupedEnvironmentName::Environment(env_name) => { + tracing::info!( + "resolved conda environment for environment '{}' '{}' in {}", + env_name.fancy_display(), + consts::PLATFORM_STYLE.apply_to(platform), + humantime::format_duration(duration) + ); + } } } - } - TaskResult::CondaPrefixUpdated(group_name, prefix, python_status, duration) => { - let group = GroupedEnvironment::from_name(project, &group_name) - .expect("grouped environment should exist"); + TaskResult::CondaPrefixUpdated(group_name, prefix, python_status, duration) => { + let group = GroupedEnvironment::from_name(project, &group_name) + .expect("grouped environment should exist"); - context - .instantiated_conda_prefixes - .get_mut(&group) - .expect("the entry for this environment should exists") - .set((prefix, *python_status)) - .expect("prefix should not be instantiated twice"); - - tracing::info!( - "updated conda packages in the '{}' prefix in {}", - group.name().fancy_display(), - humantime::format_duration(duration) - ); - } - TaskResult::PypiGroupSolved(group_name, platform, records, duration) => { - let group = GroupedEnvironment::from_name(project, &group_name) - .expect("group should exist"); + self.instantiated_conda_prefixes + .get_mut(&group) + .expect("the entry for this environment should exists") + .set((prefix, *python_status)) + .expect("prefix should not be instantiated twice"); - context - .grouped_solved_pypi_records - .get_mut(&group) - .expect("the entry for this environment should exist") - .get_mut(&platform) - .expect("the entry for this platform should exist") - .set(Arc::new(records)) - .expect("records should not be solved twice"); - - match group_name { - GroupedEnvironmentName::Group(_) => { - tracing::info!( - "resolved pypi packages for solve group '{}' '{}' in {}", - group_name.fancy_display(), - consts::PLATFORM_STYLE.apply_to(platform), - humantime::format_duration(duration), - ); + tracing::info!( + "updated conda packages in the '{}' prefix in {}", + group.name().fancy_display(), + humantime::format_duration(duration) + ); + } + TaskResult::PypiGroupSolved(group_name, platform, records, duration) => { + let group = GroupedEnvironment::from_name(project, &group_name) + .expect("group should exist"); + + self.grouped_solved_pypi_records + .get_mut(&group) + .expect("the entry for this environment should exist") + .get_mut(&platform) + .expect("the entry for this platform should exist") + .set(Arc::new(records)) + .expect("records should not be solved twice"); + + match group_name { + GroupedEnvironmentName::Group(_) => { + tracing::info!( + "resolved pypi packages for solve group '{}' '{}' in {}", + group_name.fancy_display(), + consts::PLATFORM_STYLE.apply_to(platform), + humantime::format_duration(duration), + ); + } + GroupedEnvironmentName::Environment(env_name) => { + tracing::info!( + "resolved pypi packages for environment '{}' '{}' in {}", + env_name.fancy_display(), + consts::PLATFORM_STYLE.apply_to(platform), + humantime::format_duration(duration), + ); + } } - GroupedEnvironmentName::Environment(env_name) => { + } + TaskResult::ExtractedRecordsSubset( + environment, + platform, + repodata_records, + pypi_records, + ) => { + let environment = project + .environment(&environment) + .expect("environment should exist"); + + self.solved_pypi_records + .get_mut(&environment) + .expect("the entry for this environment should exist") + .get_mut(&platform) + .expect("the entry for this platform should exist") + .set(pypi_records) + .expect("records should not be solved twice"); + + self.solved_repodata_records + .get_mut(&environment) + .expect("the entry for this environment should exist") + .get_mut(&platform) + .expect("the entry for this platform should exist") + .set(repodata_records) + .expect("records should not be solved twice"); + + let group = GroupedEnvironment::from(environment.clone()); + if matches!(group, GroupedEnvironment::Group(_)) { tracing::info!( - "resolved pypi packages for environment '{}' '{}' in {}", - env_name.fancy_display(), + "extracted subset of records for '{}' '{}' from the '{}' group", + environment.name().fancy_display(), consts::PLATFORM_STYLE.apply_to(platform), - humantime::format_duration(duration), + group.name().fancy_display(), ); } } } - TaskResult::ExtractedRecordsSubset( - environment, - platform, - repodata_records, - pypi_records, - ) => { - let environment = project - .environment(&environment) - .expect("environment should exist"); - - context - .solved_pypi_records - .get_mut(&environment) - .expect("the entry for this environment should exist") - .get_mut(&platform) - .expect("the entry for this platform should exist") - .set(pypi_records) - .expect("records should not be solved twice"); - - context - .solved_repodata_records - .get_mut(&environment) - .expect("the entry for this environment should exist") - .get_mut(&platform) - .expect("the entry for this platform should exist") - .set(repodata_records) - .expect("records should not be solved twice"); - - let group = GroupedEnvironment::from(environment.clone()); - if matches!(group, GroupedEnvironment::Group(_)) { - tracing::info!( - "extracted subset of records for '{}' '{}' from the '{}' group", - environment.name().fancy_display(), - consts::PLATFORM_STYLE.apply_to(platform), - group.name().fancy_display(), - ); - } - } } - } - // Construct a new lock-file containing all the updated or old records. - let mut builder = LockFile::builder(); + // Construct a new lock-file containing all the updated or old records. + let mut builder = LockFile::builder(); - // Iterate over all environments and add their records to the lock-file. - for environment in project.environments() { - let environment_name = environment.name().to_string(); - let grouped_env = GroupedEnvironment::from(environment.clone()); + // Iterate over all environments and add their records to the lock-file. + for environment in project.environments() { + let environment_name = environment.name().to_string(); + let grouped_env = GroupedEnvironment::from(environment.clone()); - builder.set_channels( - &environment_name, - grouped_env - .channels() - .into_iter() - .map(|channel| rattler_lock::Channel::from(channel.base_url().to_string())), - ); + builder.set_channels( + &environment_name, + grouped_env + .channels() + .into_iter() + .map(|channel| rattler_lock::Channel::from(channel.base_url().to_string())), + ); - let mut has_pypi_records = false; - for platform in environment.platforms() { - if let Some(records) = context.take_latest_repodata_records(&environment, platform) { - for record in records.into_inner() { - builder.add_conda_package(&environment_name, platform, record.into()); + let mut has_pypi_records = false; + for platform in environment.platforms() { + if let Some(records) = self.take_latest_repodata_records(&environment, platform) { + for record in records.into_inner() { + builder.add_conda_package(&environment_name, platform, record.into()); + } } - } - if let Some(records) = context.take_latest_pypi_records(&environment, platform) { - for (pkg_data, pkg_env_data) in records.into_inner() { - builder.add_pypi_package(&environment_name, platform, pkg_data, pkg_env_data); - has_pypi_records = true; + if let Some(records) = self.take_latest_pypi_records(&environment, platform) { + for (pkg_data, pkg_env_data) in records.into_inner() { + builder.add_pypi_package( + &environment_name, + platform, + pkg_data, + pkg_env_data, + ); + has_pypi_records = true; + } } } - } - // Store the indexes that were used to solve the environment. But only if there are pypi packages. - if has_pypi_records { - builder.set_pypi_indexes(&environment_name, grouped_env.pypi_options().into()); + // Store the indexes that were used to solve the environment. But only if there + // are pypi packages. + if has_pypi_records { + builder.set_pypi_indexes(&environment_name, grouped_env.pypi_options().into()); + } } - } - // Store the lock file - let lock_file = builder.finish(); - lock_file - .to_path(&project.lock_file_path()) - .into_diagnostic() - .context("failed to write lock-file to disk")?; - - top_level_progress.finish_and_clear(); - - Ok(LockFileDerivedData { - project, - lock_file, - package_cache, - updated_conda_prefixes: context.take_instantiated_conda_prefixes(), - updated_pypi_prefixes: HashMap::default(), - uv_context, - }) + // Store the lock file + let lock_file = builder.finish(); + top_level_progress.finish_and_clear(); + + Ok(LockFileDerivedData { + project, + lock_file, + updated_conda_prefixes: self.take_instantiated_conda_prefixes(), + package_cache: self.package_cache, + updated_pypi_prefixes: HashMap::default(), + uv_context, + }) + } } -/// Constructs an error that indicates that the current platform cannot solve pypi dependencies because there is no python interpreter available for the current platform. +/// Constructs an error that indicates that the current platform cannot solve +/// pypi dependencies because there is no python interpreter available for the +/// current platform. fn make_unsupported_pypi_platform_error( environment: &Environment<'_>, current_platform: Platform, ) -> miette::Report { let grouped_environment = GroupedEnvironment::from(environment.clone()); - // Construct a diagnostic that explains that the current platform is not supported. + // Construct a diagnostic that explains that the current platform is not + // supported. let mut diag = MietteDiagnostic::new(format!("Unable to solve pypi dependencies for the {} {} because no compatible python interpreter can be installed for the current platform", grouped_environment.name().fancy_display(), match &grouped_environment { GroupedEnvironment::Group(_) => "solve group", GroupedEnvironment::Environment(_) => "environment" @@ -1113,8 +1298,9 @@ fn make_unsupported_pypi_platform_error( miette::Report::new(diag).with_source_code(environment.project().manifest.contents.clone()) } -/// Represents data that is sent back from a task. This is used to communicate the result of a task -/// back to the main task which will forward the information to other tasks waiting for results. +/// Represents data that is sent back from a task. This is used to communicate +/// the result of a task back to the main task which will forward the +/// information to other tasks waiting for results. enum TaskResult { /// The conda dependencies for a grouped environment have been solved. CondaGroupSolved( @@ -1135,7 +1321,8 @@ enum TaskResult { Duration, ), - /// The records for a specific environment have been extracted from a grouped solve. + /// The records for a specific environment have been extracted from a + /// grouped solve. ExtractedRecordsSubset( EnvironmentName, Platform, @@ -1231,7 +1418,8 @@ async fn spawn_solve_conda_environment_task( ) })?; - // Add purl's for the conda packages that are also available as pypi packages if we need them. + // Add purl's for the conda packages that are also available as pypi packages if + // we need them. if has_pypi_dependencies { pb.set_message("extracting pypi packages"); pypi_mapping::amend_pypi_purls( @@ -1271,7 +1459,8 @@ async fn spawn_solve_conda_environment_task( }) } -/// Distill the repodata that is applicable for the given `environment` from the repodata of an entire solve group. +/// Distill the repodata that is applicable for the given `environment` from the +/// repodata of an entire solve group. async fn spawn_extract_environment_task( environment: Environment<'_>, platform: Platform, @@ -1284,7 +1473,8 @@ async fn spawn_extract_environment_task( let (grouped_repodata_records, grouped_pypi_records) = tokio::join!(grouped_repodata_records, grouped_pypi_records); - // If the group is just the environment on its own we can immediately return the records. + // If the group is just the environment on its own we can immediately return the + // records. if let GroupedEnvironment::Environment(_) = group { return Ok(TaskResult::ExtractedRecordsSubset( environment.name().clone(), @@ -1330,7 +1520,8 @@ async fn spawn_extract_environment_task( pypi_package_names.insert(PackageName::Pypi((name, None))); } - // Compute the Pypi marker environment. Only do this if we have pypi dependencies. + // Compute the Pypi marker environment. Only do this if we have pypi + // dependencies. let marker_environment = if has_pypi_dependencies { grouped_repodata_records .records @@ -1542,10 +1733,11 @@ async fn spawn_solve_pypi_task( /// Updates the prefix for the given environment. /// -/// This function will wait until the conda records for the prefix are available. +/// This function will wait until the conda records for the prefix are +/// available. async fn spawn_create_prefix_task( group: GroupedEnvironment<'_>, - package_cache: Arc, + package_cache: PackageCache, conda_records: impl Future>, ) -> miette::Result { let group_name = group.name().clone(); @@ -1562,8 +1754,8 @@ async fn spawn_create_prefix_task( Err(_err) => Err(miette::miette!("the operation was cancelled")), }); - // Wait until the conda records are available and until the installed packages for this prefix - // are available. + // Wait until the conda records are available and until the installed packages + // for this prefix are available. let (conda_records, installed_packages) = tokio::try_join!(conda_records.map(Ok), installed_packages_future)?;