Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use more rayon over tokio #983

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ plist = "1"
purl = { version = "0.1.3", features = ["serde"] }
quote = "1.0.37"
rand = "0.8.5"
rayon = "1.10.0"
reflink-copy = "0.1.20"
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false }
Expand Down
17 changes: 11 additions & 6 deletions crates/rattler-bin/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rattler_solve::{
libsolv_c::{self},
resolvo, SolverImpl, SolverTask,
};
use reqwest::Client;
use reqwest::{Client, Url};

use crate::global_multi_progress;

Expand Down Expand Up @@ -165,11 +165,16 @@ pub async fn create(opt: Opt) -> anyhow::Result<()> {
))
.with_client(download_client.clone())
.with_channel_config(rattler_repodata_gateway::ChannelConfig {
default: SourceConfig {
sharded_enabled: false,
..SourceConfig::default()
},
..rattler_repodata_gateway::ChannelConfig::default()
default: SourceConfig::default(),
per_channel: [(
Url::parse("https://prefix.dev")?,
SourceConfig {
sharded_enabled: true,
..SourceConfig::default()
},
)]
.into_iter()
.collect(),
})
.finish();

Expand Down
1 change: 1 addition & 0 deletions crates/rattler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rattler_digest = { path = "../rattler_digest", version = "1.0.3", default-featur
rattler_networking = { path = "../rattler_networking", version = "0.21.8", default-features = false }
rattler_shell = { path = "../rattler_shell", version = "0.22.10", default-features = false }
rattler_package_streaming = { path = "../rattler_package_streaming", version = "0.22.18", default-features = false, features = ["reqwest"] }
rayon = { workspace = true }
reflink-copy = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, features = ["stream", "json", "gzip"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/rattler/src/install/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::install::link_script::LinkScriptError;
/// system has available.
pub struct InstallDriver {
io_concurrency_semaphore: Option<Arc<Semaphore>>,
clobber_registry: Arc<Mutex<ClobberRegistry>>,
pub(crate) clobber_registry: Arc<Mutex<ClobberRegistry>>,
execute_link_scripts: bool,
}

Expand Down
75 changes: 46 additions & 29 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,35 +462,44 @@ async fn link_package(
install_options: InstallOptions,
driver: &InstallDriver,
) -> Result<(), InstallerError> {
// Link the contents of the package into the prefix.
let paths =
crate::install::link_package(cached_package_dir, target_prefix, driver, install_options)
.await
let record = record.clone();
let target_prefix = target_prefix.to_path_buf();
let cached_package_dir = cached_package_dir.to_path_buf();
let clobber_registry = driver.clobber_registry.clone();

let (tx, rx) = tokio::sync::oneshot::channel();

rayon::spawn_fifo(move || {
let inner = move || {
// Link the contents of the package into the prefix.
let paths = crate::install::link_package_sync(
&cached_package_dir,
&target_prefix,
clobber_registry,
install_options,
)
.map_err(|e| InstallerError::LinkError(record.file_name.clone(), e))?;

// Construct a PrefixRecord for the package
let prefix_record = PrefixRecord {
repodata_record: record.clone(),
package_tarball_full_path: None,
extracted_package_dir: Some(cached_package_dir.to_path_buf()),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
.collect(),
paths_data: paths.into(),
// TODO: Retrieve the requested spec for this package from the request
requested_spec: None,

link: Some(Link {
source: cached_package_dir.to_path_buf(),
// TODO: compute the right value here based on the options and `can_hard_link` ...
link_type: Some(LinkType::HardLink),
}),
};
// Construct a PrefixRecord for the package
let prefix_record = PrefixRecord {
repodata_record: record.clone(),
package_tarball_full_path: None,
extracted_package_dir: Some(cached_package_dir.clone()),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
.collect(),
paths_data: paths.into(),
// TODO: Retrieve the requested spec for this package from the request
requested_spec: None,

link: Some(Link {
source: cached_package_dir,
// TODO: compute the right value here based on the options and `can_hard_link` ...
link_type: Some(LinkType::HardLink),
}),
};

let target_prefix = target_prefix.to_path_buf();
driver
.run_blocking_io_task(move || {
let conda_meta_path = target_prefix.join("conda-meta");
std::fs::create_dir_all(&conda_meta_path).map_err(|e| {
InstallerError::IoError("failed to create conda-meta directory".to_string(), e)
Expand All @@ -508,9 +517,17 @@ async fn link_package(
);
prefix_record
.write_to_path(conda_meta_path.join(&pkg_meta_path), true)
.map_err(|e| InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e))
})
.await
.map_err(|e| {
InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e)
})?;

Ok(())
};

let _ = tx.send(inner());
});

rx.await.unwrap_or(Err(InstallerError::Cancelled))
}

/// Given a repodata record, fetch the package into the cache if its not already
Expand Down
Loading
Loading