Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
baszalmstra committed Dec 13, 2024
1 parent 77c7f35 commit 9a5c439
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 99 deletions.
1 change: 0 additions & 1 deletion crates/rattler-bin/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ pub async fn create(opt: Opt) -> anyhow::Result<()> {
)]
.into_iter()
.collect(),
..rattler_repodata_gateway::ChannelConfig::default()
})
.finish();

Expand Down
4 changes: 2 additions & 2 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async fn link_package(
let prefix_record = PrefixRecord {
repodata_record: record.clone(),
package_tarball_full_path: None,
extracted_package_dir: Some(cached_package_dir.to_path_buf()),
extracted_package_dir: Some(cached_package_dir.clone()),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
Expand All @@ -494,7 +494,7 @@ async fn link_package(
requested_spec: None,

link: Some(Link {
source: cached_package_dir.to_path_buf(),
source: cached_package_dir,
// TODO: compute the right value here based on the options and `can_hard_link` ...
link_type: Some(LinkType::HardLink),
}),
Expand Down
268 changes: 184 additions & 84 deletions crates/rattler/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub use crate::install::entry_point::{get_windows_launcher, python_entry_point_t
pub use apple_codesign::AppleCodeSignBehavior;
pub use driver::InstallDriver;
use fs_err::tokio as tokio_fs;
use futures::FutureExt;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
#[cfg(feature = "indicatif")]
pub use installer::{
DefaultProgressFormatter, IndicatifReporter, IndicatifReporterBuilder, Placement,
Expand All @@ -55,6 +56,9 @@ use rattler_conda_types::{
use rayon::iter::Either;
use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use simple_spawn_blocking::Cancelled;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::sync::Mutex;
use std::{
collections::HashSet,
Expand Down Expand Up @@ -267,7 +271,9 @@ pub async fn link_package(
.to_owned();

// Ensure target directory exists
fs_err::create_dir_all(&target_dir).map_err(InstallError::FailedToCreateTargetDirectory)?;
tokio_fs::create_dir_all(&target_dir)
.await
.map_err(InstallError::FailedToCreateTargetDirectory)?;

// Reuse or read the `paths.json` and `index.json` files from the package
// directory
Expand Down Expand Up @@ -360,37 +366,52 @@ pub async fn link_package(
// tasks.
let python_info = options.python_info.map(Arc::new);

// Link the individual files in parallel
let link_target_prefix = target_prefix.clone();
let package_dir = package_dir.to_path_buf();
let link_target_dir = target_dir.to_path_buf();
let link_entries_iter = final_paths
.into_par_iter()
.map(move |(entry, computed_path)| {
let clobber_rename = clobber_paths.get(&entry.relative_path).cloned();
let link_result = link_file(
&entry,
computed_path.clone(),
&package_dir,
&link_target_dir,
&link_target_prefix,
allow_symbolic_links && !entry.no_link,
allow_hard_links && !entry.no_link,
allow_ref_links && !entry.no_link,
platform,
options.apple_codesign_behavior,
);
// Start linking all package files in parallel
let mut pending_futures = FuturesUnordered::new();
let mut number_of_paths_entries = 0;
for (entry, computed_path) in final_paths {
let package_dir = package_dir.to_owned();
let target_dir = target_dir.to_owned();
let target_prefix = target_prefix.clone();

let result = match link_result {
Ok(linked_file) => linked_file,
Err(e) => return Err(InstallError::FailedToLink(entry.relative_path.clone(), e)),
let clobber_rename = clobber_paths.get(&entry.relative_path).cloned();
let install_future = async move {
let _permit = driver.acquire_io_permit().await;

// Spawn a blocking task to link the specific file. We use a blocking task here
// because filesystem access is blocking anyway so its more
// efficient to group them together in a single blocking call.
let cloned_entry = entry.clone();
let result = match tokio::task::spawn_blocking(move || {
link_file(
&cloned_entry,
computed_path,
&package_dir,
&target_dir,
&target_prefix,
allow_symbolic_links && !cloned_entry.no_link,
allow_hard_links && !cloned_entry.no_link,
allow_ref_links && !cloned_entry.no_link,
platform,
options.apple_codesign_behavior,
)
})
.await
.map_err(JoinError::try_into_panic)
{
Ok(Ok(linked_file)) => linked_file,
Ok(Err(e)) => {
return Err(InstallError::FailedToLink(entry.relative_path.clone(), e))
}
Err(Ok(payload)) => std::panic::resume_unwind(payload),
Err(Err(_err)) => return Err(InstallError::Cancelled),
};

// Construct a `PathsEntry` from the result of the linking operation
Ok(PathsEntry {
let paths_entry = PathsEntry {
relative_path: result.relative_path,
original_path: if clobber_rename.is_some() {
Some(entry.relative_path.clone())
Some(entry.relative_path)
} else {
None
},
Expand All @@ -407,15 +428,21 @@ pub async fn link_package(
.prefix_placeholder
.as_ref()
.map(|p| p.placeholder.clone()),
})
});
};

Ok(vec![(number_of_paths_entries, paths_entry)])
};

pending_futures.push(install_future.boxed());
number_of_paths_entries += 1;
}

// If this package is a noarch python package we also have to create entry
// points.
//
// Be careful with the fact that this code is currently running in parallel with
// the linking of individual files.
let entry_points_iter = if let Some(link_json) = link_json {
if let Some(link_json) = link_json {
// Parse the `link.json` file and extract entry points from it.
let entry_points = match link_json.noarch {
NoArchLinks::Python(entry_points) => entry_points.entry_points,
Expand All @@ -429,58 +456,96 @@ pub async fn link_package(
.clone()
.expect("should be safe because its checked above that this contains a value");

let target_prefix = target_prefix.clone();
let target_dir = target_dir.to_path_buf();

// Create entry points for each listed item. This is different between Windows
// and unix because on Windows, two PathEntry's are created whereas on
// Linux only one is created.
let entry_point_iter = if platform.is_windows() {
rayon::iter::Either::Left(entry_points.into_par_iter().flat_map(move |entry_point| {
match create_windows_python_entry_point(
&target_dir,
&target_prefix,
&entry_point,
&python_info,
&platform,
) {
Ok([a, b]) => rayon::iter::Either::Left([Ok(a), Ok(b)].into_par_iter()),
Err(e) => rayon::iter::Either::Right(rayon::iter::once(Err(
InstallError::FailedToCreatePythonEntryPoint(e),
))),
}
}))
} else {
rayon::iter::Either::Right(entry_points.into_par_iter().map(move |entry_point| {
match create_unix_python_entry_point(
&target_dir,
&target_prefix,
&entry_point,
&python_info,
) {
Ok(a) => Ok(a),
Err(e) => Err(InstallError::FailedToCreatePythonEntryPoint(e)),
}
}))
};
for entry_point in entry_points {
let python_info = python_info.clone();
let target_dir = target_dir.to_owned();
let target_prefix = target_prefix.clone();

rayon::iter::Either::Left(entry_point_iter)
} else {
rayon::iter::Either::Right(rayon::iter::empty())
};
let entry_point_fut = async move {
// Acquire an IO permit
let _permit = driver.acquire_io_permit().await;

// Collect all path entries
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
let _ = tx.send(
link_entries_iter
// .by_uniform_blocks(10000)
.chain(entry_points_iter)
.collect::<Result<Vec<_>, _>>(),
);
});
let entries = if platform.is_windows() {
match create_windows_python_entry_point(
&target_dir,
&target_prefix,
&entry_point,
&python_info,
&platform,
) {
Ok([a, b]) => vec![
(number_of_paths_entries, a),
(number_of_paths_entries + 1, b),
],
Err(e) => return Err(InstallError::FailedToCreatePythonEntryPoint(e)),
}
} else {
match create_unix_python_entry_point(
&target_dir,
&target_prefix,
&entry_point,
&python_info,
) {
Ok(a) => vec![(number_of_paths_entries, a)],
Err(e) => return Err(InstallError::FailedToCreatePythonEntryPoint(e)),
}
};

Ok(entries)
};

pending_futures.push(entry_point_fut.boxed());
number_of_paths_entries += if platform.is_windows() { 2 } else { 1 };
}
}

rx.await.unwrap_or_else(|_| Err(InstallError::Cancelled))
// Await the result of all the background tasks. The background tasks are
// scheduled in order, however, they can complete in any order. This means
// we have to reorder them back into their original order. This is achieved
// by waiting to add finished results to the result Vec, if the result
// before it has not yet finished. To that end we use a `BinaryHeap` as a
// priority queue which will buffer up finished results that finished before
// their predecessor.
//
// What makes this loop special is that it also aborts if any of the returned
// results indicate a failure.
let mut paths = Vec::with_capacity(number_of_paths_entries);
let mut out_of_order_queue = BinaryHeap::<OrderWrapper<PathsEntry>>::with_capacity(100);
while let Some(link_result) = pending_futures.next().await {
for (index, data) in link_result? {
if index == paths.len() {
// If this is the next element expected in the sorted list, add it immediately.
// This basically means the future finished in order.
paths.push(data);

// By adding a finished future we have to check if there might also be another
// future that finished earlier and should also now be added to
// the result Vec.
while let Some(next_output) = out_of_order_queue.peek_mut() {
if next_output.index == paths.len() {
paths.push(PeekMut::pop(next_output).data);
} else {
break;
}
}
} else {
// Otherwise add it to the out-of-order queue. This means that we still have to
// wait for another element before we can add the result to the
// ordered list.
out_of_order_queue.push(OrderWrapper { index, data });
}
}
}
debug_assert_eq!(
paths.len(),
paths.capacity(),
"some futures where not added to the result"
);

Ok(paths)
}

/// Given an extracted package archive (`package_dir`), installs its files to
Expand All @@ -506,17 +571,24 @@ pub fn link_package_sync(
.to_owned();

// Ensure target directory exists
fs_err::create_dir_all(&target_dir).map_err(InstallError::FailedToCreateTargetDirectory)?;
fs_err::create_dir_all(target_dir).map_err(InstallError::FailedToCreateTargetDirectory)?;

// Reuse or read the `paths.json` and `index.json` files from the package
// directory
let paths_json = options.paths_json.map(Ok).unwrap_or_else(|| {
PathsJson::from_package_directory_with_deprecated_fallback(&package_dir)
.map_err(InstallError::FailedToReadPathsJson)
})?;
let index_json = options.index_json.map(Ok).unwrap_or_else(|| {
IndexJson::from_package_directory(package_dir).map_err(InstallError::FailedToReadIndexJson)
})?;
let paths_json = options.paths_json.map_or_else(
|| {
PathsJson::from_package_directory_with_deprecated_fallback(package_dir)
.map_err(InstallError::FailedToReadPathsJson)
},
Ok,
)?;
let index_json = options.index_json.map_or_else(
|| {
IndexJson::from_package_directory(package_dir)
.map_err(InstallError::FailedToReadIndexJson)
},
Ok,
)?;

// Error out if this is a noarch python package but the python information is
// missing.
Expand Down Expand Up @@ -849,6 +921,34 @@ fn can_create_symlinks_sync(target_dir: &Path) -> bool {
}
}

/// A helper struct for a `BinaryHeap` to provides ordering to items that are
/// otherwise unordered.
struct OrderWrapper<T> {
index: usize,
data: T,
}

impl<T> PartialEq for OrderWrapper<T> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}

impl<T> Eq for OrderWrapper<T> {}

impl<T> PartialOrd for OrderWrapper<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<T> Ord for OrderWrapper<T> {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max heap, so compare backwards here.
other.index.cmp(&self.index)
}
}

/// Returns true if it is possible to create symlinks in the target directory.
async fn can_create_symlinks(target_dir: &Path) -> bool {
let uuid = uuid::Uuid::new_v4();
Expand Down
8 changes: 5 additions & 3 deletions crates/rattler_cache/src/package_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use simple_spawn_blocking::Cancelled;
use tracing::instrument;
use url::Url;

use crate::validation::validate_package_directory;
use crate::validation::{validate_package_directory, ValidationMode};

mod cache_key;
mod cache_lock;
Expand Down Expand Up @@ -364,8 +364,10 @@ where
}

// Validate the package directory.
let validation_result =
tokio::task::spawn_blocking(move || validate_package_directory(&path_inner)).await;
let validation_result = tokio::task::spawn_blocking(move || {
validate_package_directory(&path_inner, ValidationMode::Full)
})
.await;

if let Some((reporter, index)) = reporter {
reporter.on_validate_complete(index);
Expand Down
Loading

0 comments on commit 9a5c439

Please sign in to comment.