diff --git a/crates/rattler_installs_packages/src/index/http.rs b/crates/rattler_installs_packages/src/index/http.rs index 232c9b5b..80c37c2a 100644 --- a/crates/rattler_installs_packages/src/index/http.rs +++ b/crates/rattler_installs_packages/src/index/http.rs @@ -254,6 +254,7 @@ where { let data: CacheData = ciborium::de::from_reader(&mut f) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let start = f.stream_position()?; let end = f.seek(SeekFrom::End(0))?; let mut body = SeekSlice::new(f, start, end)?; diff --git a/crates/rattler_installs_packages/src/resolve/dependency_provider.rs b/crates/rattler_installs_packages/src/resolve/dependency_provider.rs index 3def4270..9e48f2c2 100644 --- a/crates/rattler_installs_packages/src/resolve/dependency_provider.rs +++ b/crates/rattler_installs_packages/src/resolve/dependency_provider.rs @@ -26,6 +26,7 @@ use std::{ any::Any, borrow::Borrow, cmp::Ordering, collections::HashMap, rc::Rc, str::FromStr, sync::Arc, }; use thiserror::Error; +use tokio::sync::Semaphore; use url::Url; /// This is a [`DependencyProvider`] for PyPI packages @@ -44,6 +45,7 @@ pub(crate) struct PypiDependencyProvider { options: ResolveOptions, should_cancel_with_value: Mutex>, + concurrent_tasks: Arc, } impl PypiDependencyProvider { @@ -84,6 +86,7 @@ impl PypiDependencyProvider { name_to_url, options, should_cancel_with_value: Default::default(), + concurrent_tasks: Arc::new(Semaphore::new(30)), }) } @@ -213,6 +216,16 @@ impl PypiDependencyProvider { .iter() .any(|a| a.is::()) } + + /// Acquires a lease to be able to spawn a task + /// this is used to limit the amount of concurrent tasks + async fn aquire_lease_to_run(&self) -> tokio::sync::OwnedSemaphorePermit { + self.concurrent_tasks + .clone() + .acquire_owned() + .await + .expect("could not acquire semaphore") + } } #[derive(Debug, Error, Diagnostic, Clone)] @@ -306,9 +319,14 @@ impl<'p> DependencyProvider for &'p PypiDepende ArtifactRequest::FromIndex(package_name.base().clone()) }; + let lease = self.aquire_lease_to_run().await; let result: Result<_, miette::Report> = tokio::spawn({ let package_db = self.package_db.clone(); - async move { Ok(package_db.available_artifacts(request).await?.clone()) } + async move { + let result = package_db.available_artifacts(request).await?.clone(); + drop(lease); + Ok(result) + } }) .await .expect("cancelled"); @@ -502,13 +520,16 @@ impl<'p> DependencyProvider for &'p PypiDepende let package_db = self.package_db.clone(); let wheel_builder = self.wheel_builder.clone(); let artifacts = artifacts.to_vec(); + let lease = self.aquire_lease_to_run().await; async move { if let Some((ai, metadata)) = package_db .get_metadata(&artifacts, Some(&wheel_builder)) .await? { + drop(lease); Ok(Some((ai.clone(), metadata))) } else { + drop(lease); Ok(None) } }