Skip to content

Commit

Permalink
fix: limits the amount of concurrent tasks with the new async resolvo
Browse files Browse the repository at this point in the history
code

Otherwise, `apache-airflow[all]` would just start failing randomly.
  • Loading branch information
tdejager committed Feb 9, 2024
1 parent 427f57c commit 08ae427
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/rattler_installs_packages/src/index/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ where
{
let data: CacheData = ciborium::de::from_reader(&mut f)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

let start = f.stream_position()?;
let end = f.seek(SeekFrom::End(0))?;
let mut body = SeekSlice::new(f, start, end)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
any::Any, borrow::Borrow, cmp::Ordering, collections::HashMap, rc::Rc, str::FromStr, sync::Arc,
};
use thiserror::Error;
use tokio::sync::Semaphore;
use url::Url;

/// This is a [`DependencyProvider`] for PyPI packages
Expand All @@ -44,6 +45,7 @@ pub(crate) struct PypiDependencyProvider {

options: ResolveOptions,
should_cancel_with_value: Mutex<Option<MetadataError>>,
concurrent_tasks: Arc<Semaphore>,
}

impl PypiDependencyProvider {
Expand Down Expand Up @@ -84,6 +86,7 @@ impl PypiDependencyProvider {
name_to_url,
options,
should_cancel_with_value: Default::default(),
concurrent_tasks: Arc::new(Semaphore::new(30)),
})
}

Expand Down Expand Up @@ -213,6 +216,16 @@ impl PypiDependencyProvider {
.iter()
.any(|a| a.is::<S>())
}

/// Acquires a lease to be able to spawn a task
/// this is used to limit the amount of concurrent tasks
async fn aquire_lease_to_run(&self) -> tokio::sync::OwnedSemaphorePermit {
self.concurrent_tasks
.clone()
.acquire_owned()
.await
.expect("could not acquire semaphore")
}
}

#[derive(Debug, Error, Diagnostic, Clone)]
Expand Down Expand Up @@ -306,9 +319,14 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
ArtifactRequest::FromIndex(package_name.base().clone())
};

let lease = self.aquire_lease_to_run().await;
let result: Result<_, miette::Report> = tokio::spawn({
let package_db = self.package_db.clone();
async move { Ok(package_db.available_artifacts(request).await?.clone()) }
async move {
let result = package_db.available_artifacts(request).await?.clone();
drop(lease);
Ok(result)
}
})
.await
.expect("cancelled");
Expand Down Expand Up @@ -502,13 +520,16 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
let package_db = self.package_db.clone();
let wheel_builder = self.wheel_builder.clone();
let artifacts = artifacts.to_vec();
let lease = self.aquire_lease_to_run().await;
async move {
if let Some((ai, metadata)) = package_db
.get_metadata(&artifacts, Some(&wheel_builder))
.await?
{
drop(lease);
Ok(Some((ai.clone(), metadata)))
} else {
drop(lease);
Ok(None)
}
}
Expand Down

0 comments on commit 08ae427

Please sign in to comment.