Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: concurrent metadata fetching #190

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ consolidate-commits = true
tag-prefix = ""

[patch.crates-io]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now requires at least rust 1.75. Would be good to add this to the Cargo.toml to avoid confusion.

https://doc.rust-lang.org/cargo/reference/manifest.html#the-rust-version-field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this should be added to resolvo as well of course :)

resolvo = { git = "https://github.com/aochagavia/resolvo.git", branch = "concurrent-metadata-fetching" }
15 changes: 10 additions & 5 deletions crates/rattler_installs_packages/src/index/package_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use elsa::sync::FrozenMap;
use futures::{pin_mut, stream, StreamExt};
use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue, Method};
use indexmap::IndexMap;
use itertools::Itertools;
use miette::{self, Diagnostic, IntoDiagnostic};
use parking_lot::Mutex;
use rattler_digest::{compute_bytes_digest, Sha256};
Expand Down Expand Up @@ -99,11 +100,15 @@ impl PackageDb {
} else {
// Start downloading the information for each url.
let http = self.http.clone();
let request_iter = stream::iter(self.index_urls.iter())
.map(|url| url.join(&format!("{}/", p.as_str())).expect("invalid url"))
.map(|url| fetch_simple_api(&http, url))
.buffer_unordered(10)
.filter_map(|result| async { result.transpose() });
let request_iter = stream::iter(
self.index_urls
.iter()
.map(|url| url.join(&format!("{}/", p.as_str())).expect("invalid url"))
.collect_vec(),
)
.map(|url| fetch_simple_api(&http, url))
.buffer_unordered(10)
.filter_map(|result| async { result.transpose() });

pin_mut!(request_iter);

Expand Down
98 changes: 67 additions & 31 deletions crates/rattler_installs_packages/src/resolve/dependency_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::task;
use url::Url;

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Display for PypiPackageName {

/// This is a [`DependencyProvider`] for PyPI packages
pub(crate) struct PypiDependencyProvider {
pub pool: Pool<PypiVersionSet, PypiPackageName>,
pub pool: Rc<Pool<PypiVersionSet, PypiPackageName>>,
package_db: Arc<PackageDb>,
wheel_builder: Arc<WheelBuilder>,
markers: Arc<MarkerEnvironment>,
Expand All @@ -215,6 +215,9 @@ pub(crate) struct PypiDependencyProvider {

options: ResolveOptions,
should_cancel_with_value: Mutex<Option<MetadataError>>,

concurrent_metadata_fetches: AtomicUsize,
concurrent_candidate_fetches: AtomicUsize,
}

impl PypiDependencyProvider {
Expand Down Expand Up @@ -244,7 +247,7 @@ impl PypiDependencyProvider {
);

Ok(Self {
pool,
pool: Rc::new(pool),
package_db,
wheel_builder,
markers,
Expand All @@ -255,6 +258,8 @@ impl PypiDependencyProvider {
name_to_url,
options,
should_cancel_with_value: Default::default(),
concurrent_metadata_fetches: AtomicUsize::new(0),
concurrent_candidate_fetches: AtomicUsize::new(0),
})
}

Expand Down Expand Up @@ -400,8 +405,8 @@ pub(crate) enum MetadataError {
}

impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDependencyProvider {
fn pool(&self) -> &Pool<PypiVersionSet, PypiPackageName> {
&self.pool
fn pool(&self) -> Rc<Pool<PypiVersionSet, PypiPackageName>> {
self.pool.clone()
}

fn should_cancel_with_value(&self) -> Option<Box<dyn Any>> {
Expand All @@ -414,7 +419,7 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende

fn sort_candidates(
&self,
solver: &SolverCache<PypiVersionSet, PypiPackageName, Self>,
_: &SolverCache<PypiVersionSet, PypiPackageName, Self>,
solvables: &mut [SolvableId],
) {
solvables.sort_by(|&a, &b| {
Expand All @@ -439,8 +444,8 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
}
}

let solvable_a = solver.pool().resolve_solvable(a);
let solvable_b = solver.pool().resolve_solvable(b);
let solvable_a = self.pool.resolve_solvable(a);
let solvable_b = self.pool.resolve_solvable(b);

match (&solvable_a.inner(), &solvable_b.inner()) {
// Sort Urls alphabetically
Expand All @@ -460,32 +465,46 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
})
}

fn get_candidates(&self, name: NameId) -> Option<Candidates> {
async fn get_candidates(&self, name: NameId) -> Option<Candidates> {
let package_name = self.pool.resolve_package_name(name);
tracing::info!("collecting {}", package_name);

// check if we have URL variant for this name
let url_version = self.name_to_url.get(package_name.base());

let result = if let Some(url) = url_version {
task::block_in_place(move || {
let concurrency_count = self
.concurrent_candidate_fetches
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
tracing::info!("requesting candidates #{}", concurrency_count);

let base_name = package_name.base().clone();
let result: miette::Result<_> = if let Some(url) = url_version {
tokio::spawn({
let url = Url::from_str(url).expect("cannot parse back url");
Handle::current().block_on(self.package_db.get_artifact_by_direct_url(
package_name.base().clone(),
url,
&self.wheel_builder,
))
let package_db = self.package_db.clone();
let wheel_builder = self.wheel_builder.clone();
async move {
Ok(package_db
.get_artifact_by_direct_url(base_name, url, &wheel_builder)
.await?
.clone())
}
})
.await
.expect("cancelled")
} else {
// Get all the metadata for this package
task::block_in_place(move || {
Handle::current().block_on(
self.package_db
.available_artifacts(package_name.base().clone()),
)
tokio::spawn({
let package_db = self.package_db.clone();
async move { Ok(package_db.available_artifacts(base_name).await?.clone()) }
})
.await
.expect("cancelled")
};

tracing::info!("DONE requesting candidates #{}", concurrency_count);
self.concurrent_candidate_fetches
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);

let artifacts = match result {
Ok(artifacts) => artifacts,
Err(err) => {
Expand Down Expand Up @@ -600,7 +619,7 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
Some(candidates)
}

fn get_dependencies(&self, solvable_id: SolvableId) -> Dependencies {
async fn get_dependencies(&self, solvable_id: SolvableId) -> Dependencies {
let solvable = self.pool.resolve_solvable(solvable_id);
let package_name = self.pool.resolve_package_name(solvable.name_id());
let package_version = solvable.inner();
Expand Down Expand Up @@ -671,13 +690,30 @@ impl<'p> DependencyProvider<PypiVersionSet, PypiPackageName> for &'p PypiDepende
return Dependencies::Unknown(error);
}

let result = task::block_in_place(|| {
// First try getting wheels
Handle::current().block_on(
self.package_db
.get_metadata(artifacts, Some(&self.wheel_builder)),
)
});
let concurrency_count = self
.concurrent_metadata_fetches
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
tracing::info!("fetching metadata #{}", concurrency_count);
let result: miette::Result<_> = tokio::spawn({
let package_db = self.package_db.clone();
let wheel_builder = self.wheel_builder.clone();
let artifacts = artifacts.to_vec();
async move {
if let Some((ai, metadata)) = package_db
.get_metadata(&artifacts, Some(&wheel_builder))
.await?
{
Ok(Some((ai.clone(), metadata)))
} else {
Ok(None)
}
}
})
.await
.expect("cancelled");
tracing::info!("DONE fetching metadata #{}", concurrency_count);
self.concurrent_metadata_fetches
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);

let metadata = match result {
// We have retrieved a value without error
Expand Down
54 changes: 49 additions & 5 deletions crates/rattler_installs_packages/src/resolve/solve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::str::FromStr;
use url::Url;

use std::collections::HashSet;
use std::convert::identity;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -264,6 +265,40 @@ pub async fn resolve(
favored_packages: HashMap<NormalizedPackageName, PinnedPackage>,
options: ResolveOptions,
env_variables: HashMap<String, String>,
) -> miette::Result<Vec<PinnedPackage>> {
let requirements: Vec<_> = requirements.into_iter().cloned().collect();
tokio::task::spawn_blocking(move || {
resolve_inner(
package_db,
&requirements,
env_markers,
compatible_tags,
locked_packages,
favored_packages,
options,
env_variables,
)
})
.await
.map_or_else(
|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(miette::miette!("the operation was cancelled")),
},
identity,
)
}

#[allow(clippy::too_many_arguments)]
fn resolve_inner<'r>(
package_db: Arc<PackageDb>,
requirements: impl IntoIterator<Item = &'r Requirement>,
env_markers: Arc<MarkerEnvironment>,
compatible_tags: Option<Arc<WheelTags>>,
locked_packages: HashMap<NormalizedPackageName, PinnedPackage>,
favored_packages: HashMap<NormalizedPackageName, PinnedPackage>,
options: ResolveOptions,
env_variables: HashMap<String, String>,
) -> miette::Result<Vec<PinnedPackage>> {
// Construct the pool
let pool = Pool::new();
Expand Down Expand Up @@ -323,15 +358,25 @@ pub async fn resolve(
)?;

// Invoke the solver to get a solution to the requirements
let mut solver = Solver::new(&provider);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.enable_io()
.build()
.unwrap();

let mut solver = Solver::new(&provider, runtime);
let solvables = match solver.solve(root_requirements) {
Ok(solvables) => solvables,
Err(e) => {
return match e {
UnsolvableOrCancelled::Unsolvable(problem) => Err(miette::miette!(
"{}",
problem
.display_user_friendly(&solver, &DefaultSolvableDisplay)
.display_user_friendly(
&solver,
solver.pool.clone(),
&DefaultSolvableDisplay
)
.to_string()
.trim()
)),
Expand All @@ -345,9 +390,8 @@ pub async fn resolve(
};
let mut result: HashMap<NormalizedPackageName, PinnedPackage> = HashMap::new();
for solvable_id in solvables {
let pool = solver.pool();
let solvable = pool.resolve_solvable(solvable_id);
let name = pool.resolve_package_name(solvable.name_id());
let solvable = solver.pool.resolve_solvable(solvable_id);
let name = solver.pool.resolve_package_name(solvable.name_id());
let version = solvable.inner();

let artifacts: Vec<_> = provider
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.72
1.75