Skip to content

Commit

Permalink
Always pre-fetch distribution metadata (#744)
Browse files Browse the repository at this point in the history
This PR fixes our prefetching logic to ensure that we always attempt to
prefetch the "best-guess" distribution for all dependencies. This logic
already existed, but because we only attempted to prefetch when package
metadata was available, it almost never triggered. Now, we wait for the
package metadata to become available, _then_ kick off the "best-guess"
prefetch (for every package).

In my testing, this dramatically improves performance (like 2x). I'm
wondering if this regressed at some point?

Closes #743.

Co-authored-by: konsti <[email protected]>
  • Loading branch information
charliermarsh and konstin authored Jan 3, 2024
1 parent ba23115 commit 5a98add
Showing 1 changed file with 57 additions and 33 deletions.
90 changes: 57 additions & 33 deletions crates/puffin-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
state.unit_propagation(next)?;

// Pre-visit all candidate packages, to allow metadata to be fetched in parallel.
self.pre_visit(state.partial_solution.prioritized_packages(), request_sink)?;
Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink)?;

// Choose a package version.
let Some(highest_priority_pkg) =
Expand Down Expand Up @@ -461,9 +461,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {

/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
/// metadata for all of the packages in parallel.
fn pre_visit(
&self,
packages: impl Iterator<Item = (&'a PubGrubPackage, &'a Range<PubGrubVersion>)>,
fn pre_visit<'data>(
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<PubGrubVersion>)>,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
) -> Result<(), ResolveError> {
// Iterate over the potential packages, and fetch file metadata for any of them. These
Expand All @@ -472,29 +471,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let PubGrubPackage::Package(package_name, _extra, None) = package else {
continue;
};

// If we don't have metadata for this package, we can't make an early decision.
let Some(entry) = self.index.packages.get(package_name) else {
continue;
};
let (index, base, version_map) = entry.value();

// Try to find a compatible version. If there aren't any compatible versions,
// short-circuit and return `None`.
let Some(candidate) = self.selector.select(package_name, range, version_map) else {
// Short-circuit: we couldn't find _any_ compatible versions for a package.
return Ok(());
};

// Emit a request to fetch the metadata for this version.
if self
.index
.distributions
.register_owned(candidate.package_id())
{
let distribution = candidate.into_distribution(index.clone(), base.clone());
request_sink.unbounded_send(Request::Dist(distribution))?;
}
request_sink.unbounded_send(Request::Prefetch(package_name.clone(), range.clone()))?;
}
Ok(())
}
Expand Down Expand Up @@ -704,19 +681,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {

while let Some(response) = response_stream.next().await {
match response? {
Response::Package(package_name, index, base, version_map) => {
Some(Response::Package(package_name, index, base, version_map)) => {
trace!("Received package metadata for: {package_name}");
self.index
.packages
.done(package_name, (index, base, version_map));
}
Response::Dist(Dist::Built(distribution), metadata, ..) => {
Some(Response::Dist(Dist::Built(distribution), metadata, ..)) => {
trace!("Received built distribution metadata for: {distribution}");
self.index
.distributions
.done(distribution.package_id(), metadata);
}
Response::Dist(Dist::Source(distribution), metadata, precise) => {
Some(Response::Dist(Dist::Source(distribution), metadata, precise)) => {
trace!("Received source distribution metadata for: {distribution}");
self.index
.distributions
Expand All @@ -736,13 +713,14 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
}
}
None => {}
}
}

Ok::<(), ResolveError>(())
}

async fn process_request(&self, request: Request) -> Result<Response, ResolveError> {
async fn process_request(&self, request: Request) -> Result<Option<Response>, ResolveError> {
match request {
// Fetch package metadata from the registry.
Request::Package(package_name) => {
Expand All @@ -751,9 +729,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
.get_version_map(&package_name)
.await
.map_err(ResolveError::Client)?;
Ok(Response::Package(package_name, index, base, metadata))
Ok(Some(Response::Package(package_name, index, base, metadata)))
}

// Fetch distribution metadata from the distribution database.
Request::Dist(dist) => {
let (metadata, precise) = self
.provider
Expand All @@ -771,7 +750,50 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
ResolveError::FetchAndBuild(Box::new(source_dist), err)
}
})?;
Ok(Response::Dist(dist, metadata, precise))
Ok(Some(Response::Dist(dist, metadata, precise)))
}

// Pre-fetch the package and distribution metadata.
Request::Prefetch(package_name, range) => {
// Wait for the package metadata to become available.
let entry = self.index.packages.wait(&package_name).await;
let (index, base, version_map) = entry.value();

// Try to find a compatible version. If there aren't any compatible versions,
// short-circuit and return `None`.
let Some(candidate) = self.selector.select(&package_name, &range, version_map)
else {
return Ok(None);
};

// Emit a request to fetch the metadata for this version.
if self.index.distributions.register(&candidate.package_id()) {
let dist = candidate.into_distribution(index.clone(), base.clone());
drop(entry);

let (metadata, precise) = self
.provider
.get_or_build_wheel_metadata(&dist)
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {
ResolveError::Read(Box::new(built_dist), err)
}
Dist::Source(SourceDist::Path(source_dist)) => {
ResolveError::Build(Box::new(source_dist), err)
}
Dist::Built(built_dist) => {
ResolveError::Fetch(Box::new(built_dist), err)
}
Dist::Source(source_dist) => {
ResolveError::FetchAndBuild(Box::new(source_dist), err)
}
})?;

Ok(Some(Response::Dist(dist, metadata, precise)))
} else {
Ok(None)
}
}
}
}
Expand Down Expand Up @@ -850,6 +872,8 @@ enum Request {
Package(PackageName),
/// A request to fetch the metadata for a built or source distribution.
Dist(Dist),
/// A request to pre-fetch the metadata for a package and the best-guess distribution.
Prefetch(PackageName, Range<PubGrubVersion>),
}

#[derive(Debug)]
Expand Down

0 comments on commit 5a98add

Please sign in to comment.