Skip to content

Commit

Permalink
fix: rewrite prefix guard into async (#2908)
Browse files Browse the repository at this point in the history
Co-authored-by: Ruben Arts <[email protected]>
  • Loading branch information
nichmor and ruben-arts authored Jan 15, 2025
1 parent 3d9ebfd commit 5f2e932
Show file tree
Hide file tree
Showing 37 changed files with 241 additions and 144 deletions.
14 changes: 2 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions crates/pixi_build_frontend/src/tool/installer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::future::Future;
use std::path::PathBuf;

use pixi_consts::consts::CACHED_BUILD_TOOL_ENVS_DIR;
use pixi_progress::wrap_in_progress;
use pixi_utils::{EnvironmentHash, PrefixGuard};
use pixi_progress::await_in_progress;
use pixi_utils::{AsyncPrefixGuard, EnvironmentHash};
use rattler::{install::Installer, package_cache::PackageCache};
use rattler_conda_types::{Channel, ChannelConfig, GenericVirtualPackage, Platform};
use rattler_repodata_gateway::Gateway;
Expand Down Expand Up @@ -261,16 +261,17 @@ impl ToolInstaller for ToolContext {
.join(CACHED_BUILD_TOOL_ENVS_DIR)
.join(cache.name());

let mut prefix_guard = PrefixGuard::new(&cached_dir).into_diagnostic()?;
let prefix_guard = AsyncPrefixGuard::new(&cached_dir).await.into_diagnostic()?;

let mut write_guard =
wrap_in_progress("acquiring write lock on prefix", || prefix_guard.write())
await_in_progress("acquiring write lock on prefix", |_| prefix_guard.write())
.await
.into_diagnostic()?;

// If the environment already exists, we can return early.
if write_guard.is_ready() {
tracing::info!("reusing existing environment in {}", cached_dir.display());
let _ = write_guard.finish();
write_guard.finish().await.into_diagnostic()?;

// Get the activation scripts
let activator =
Expand All @@ -289,7 +290,7 @@ impl ToolInstaller for ToolContext {
}

// Update the prefix to indicate that we are installing it.
write_guard.begin().into_diagnostic()?;
write_guard.begin().await.into_diagnostic()?;

// Install the environment
Installer::new()
Expand All @@ -311,7 +312,7 @@ impl ToolInstaller for ToolContext {
.run_activation(ActivationVariables::from_env().unwrap_or_default(), None)
.unwrap();

let _ = write_guard.finish();
write_guard.finish().await.into_diagnostic()?;

Ok(IsolatedTool::new(
spec.command.clone(),
Expand Down
10 changes: 5 additions & 5 deletions crates/pixi_git/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::borrow::Cow;
use std::path::PathBuf;
use std::sync::Arc;

use pixi_utils::PrefixGuard;
use pixi_utils::AsyncPrefixGuard;
use tracing::debug;

use dashmap::mapref::one::Ref;
Expand Down Expand Up @@ -74,11 +74,11 @@ impl GitResolver {
let repository_url = RepositoryUrl::new(url.repository());

let write_guard_path = lock_dir.join(cache_digest(&repository_url));
let mut guard = PrefixGuard::new(&write_guard_path)?;
let mut write_guard = guard.write()?;
let guard = AsyncPrefixGuard::new(&write_guard_path).await?;
let mut write_guard = guard.write().await?;

// Update the prefix to indicate that we are installing it.
write_guard.begin()?;
write_guard.begin().await?;

// Fetch the Git repository.
let source = GitSource::new(url.as_ref().clone(), client, cache);
Expand All @@ -92,7 +92,7 @@ impl GitResolver {
if let Some(precise) = fetch.git().precise() {
self.insert(reference, precise);
}
let _ = write_guard.finish();
write_guard.finish().await?;

Ok(fetch)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/pixi_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rustls-tls = [


[dependencies]
fd-lock = { workspace = true }
async-fd-lock = { workspace = true }
fs-err = { workspace = true }
indicatif = { workspace = true }
itertools = { workspace = true }
Expand All @@ -42,6 +42,7 @@ serde_json = { workspace = true }
serde_yaml = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/pixi_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ pub use executable_utils::{
};

pub use cache::EnvironmentHash;
pub use prefix_guard::{PrefixGuard, WriteGuard};
pub use prefix_guard::{AsyncPrefixGuard, AsyncWriteGuard};
77 changes: 38 additions & 39 deletions crates/pixi_utils/src/prefix_guard.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{
io,
io::{Read, Seek, Write},
path::Path,
};
use std::{io, path::Path};

use fd_lock::RwLockWriteGuard;
use async_fd_lock::LockWrite;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{self, io::AsyncSeekExt};

const GUARD_PATH: &str = ".guard";

Expand All @@ -22,15 +20,16 @@ enum GuardState {
Ready,
}

pub struct WriteGuard<'a> {
guard: RwLockWriteGuard<'a, std::fs::File>,
#[derive(Debug)]
pub struct AsyncWriteGuard {
guard: async_fd_lock::RwLockWriteGuard<tokio::fs::File>,
state: GuardState,
}

impl<'a> WriteGuard<'a> {
fn new(mut guard: RwLockWriteGuard<'a, std::fs::File>) -> io::Result<Self> {
impl AsyncWriteGuard {
async fn new(mut guard: async_fd_lock::RwLockWriteGuard<tokio::fs::File>) -> io::Result<Self> {
let mut bytes = Vec::new();
guard.read_to_end(&mut bytes)?;
guard.read_to_end(&mut bytes).await?;
let state = serde_json::from_slice(&bytes).unwrap_or(GuardState::Unknown);
Ok(Self { guard, state })
}
Expand All @@ -41,64 +40,64 @@ impl<'a> WriteGuard<'a> {
}

/// Notify this instance that installation of the prefix has started.
pub fn begin(&mut self) -> io::Result<()> {
pub async fn begin(&mut self) -> io::Result<()> {
if self.state != GuardState::Installing {
self.guard.rewind()?;
self.guard.rewind().await?;
let bytes = serde_json::to_vec(&GuardState::Installing)?;
self.guard.write_all(&bytes)?;
self.guard.set_len(bytes.len() as u64)?;
self.guard.flush()?;
self.guard.write_all(&bytes).await?;
// self.guard.set_len(bytes.len() as u64)?;
self.guard.flush().await?;
self.state = GuardState::Installing;
}
Ok(())
}

/// Finishes writing to the guard and releases the lock.
pub fn finish(self) -> io::Result<()> {
let WriteGuard {
pub async fn finish(self) -> io::Result<()> {
let AsyncWriteGuard {
mut guard,
state: status,
} = self;
if status == GuardState::Installing {
guard.rewind()?;
guard.rewind().await?;
let bytes = serde_json::to_vec(&GuardState::Ready)?;
guard.write_all(&bytes)?;
guard.set_len(bytes.len() as u64)?;
guard.flush()?;
guard.write_all(&bytes).await?;
guard.flush().await?;
}
Ok(())
}
}

pub struct PrefixGuard {
guard: fd_lock::RwLock<std::fs::File>,
pub struct AsyncPrefixGuard {
guard: tokio::fs::File,
}

impl PrefixGuard {
impl AsyncPrefixGuard {
/// Constructs a new guard for the given prefix but does not perform any
/// locking operations yet.
pub fn new(prefix: &Path) -> io::Result<Self> {
pub async fn new(prefix: &Path) -> io::Result<Self> {
let guard_path = prefix.join(GUARD_PATH);

// Ensure that the directory exists
fs_err::create_dir_all(guard_path.parent().unwrap())?;
fs_err::tokio::create_dir_all(guard_path.parent().unwrap()).await?;

let file = tokio::fs::File::options()
.write(true)
.read(true)
.create(true)
.truncate(false)
.open(guard_path)
.await?;

// Open the file
Ok(Self {
guard: fd_lock::RwLock::new(
std::fs::File::options()
.write(true)
.read(true)
.create(true)
.truncate(false)
.open(guard_path)?,
),
})
Ok(Self { guard: file })
}

/// Locks the guard for writing and returns a write guard which can be used
/// to unlock it.
pub fn write(&mut self) -> io::Result<WriteGuard> {
WriteGuard::new(self.guard.write()?)
pub async fn write(self) -> io::Result<AsyncWriteGuard> {
let write_guard = self.guard.lock_write().await?;

AsyncWriteGuard::new(write_guard).await
}
}
13 changes: 8 additions & 5 deletions src/cli/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::{Parser, ValueHint};
use miette::{Context, IntoDiagnostic};
use pixi_config::{self, Config, ConfigCli};
use pixi_progress::{await_in_progress, global_multi_progress, wrap_in_progress};
use pixi_utils::{reqwest::build_reqwest_clients, EnvironmentHash, PrefixGuard};
use pixi_utils::{reqwest::build_reqwest_clients, AsyncPrefixGuard, EnvironmentHash};
use rattler::{
install::{IndicatifReporter, Installer},
package_cache::PackageCache,
Expand Down Expand Up @@ -100,11 +100,13 @@ pub async fn create_exec_prefix(
.join(environment_hash.name()),
);

let mut guard = PrefixGuard::new(prefix.root())
let guard = AsyncPrefixGuard::new(prefix.root())
.await
.into_diagnostic()
.context("failed to create prefix guard")?;

let mut write_guard = wrap_in_progress("acquiring write lock on prefix", || guard.write())
let mut write_guard = await_in_progress("acquiring write lock on prefix", |_| guard.write())
.await
.into_diagnostic()
.context("failed to acquire write lock to prefix guard")?;

Expand All @@ -115,13 +117,14 @@ pub async fn create_exec_prefix(
"reusing existing environment in {}",
prefix.root().display()
);
let _ = write_guard.finish();
write_guard.finish().await.into_diagnostic()?;
return Ok(prefix);
}

// Update the prefix to indicate that we are installing it.
write_guard
.begin()
.await
.into_diagnostic()
.context("failed to write lock status to prefix guard")?;

Expand Down Expand Up @@ -202,7 +205,7 @@ pub async fn create_exec_prefix(
.into_diagnostic()
.context("failed to create environment")?;

let _ = write_guard.finish();
write_guard.finish().await.into_diagnostic()?;
Ok(prefix)
}

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 5f2e932

Please sign in to comment.