Skip to content

Commit

Permalink
Async file locking (#126)
Browse files Browse the repository at this point in the history
Made the file locking code async. As this cannot block the thread as the
previous implementation was doing.
  • Loading branch information
tdejager authored Dec 12, 2023
1 parent b950ac8 commit 68f374b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
83 changes: 67 additions & 16 deletions crates/rattler_installs_packages/src/index/file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
marker::PhantomData,
path::{Path, PathBuf},
};
use tokio::task;

/// Types that implement this can be used as keys of the [`FileStore`].
pub trait CacheKey {
Expand Down Expand Up @@ -88,11 +89,11 @@ impl FileStore {

/// Gets readable access to the data with the specified key. If no such entry exists the
/// function `f` is called to populate the entry.
pub fn get_or_set<K: CacheKey, F>(&self, key: &K, f: F) -> io::Result<impl Read + Seek>
pub async fn get_or_set<K: CacheKey, F>(&self, key: &K, f: F) -> io::Result<impl Read + Seek>
where
F: FnOnce(&mut dyn Write) -> io::Result<()>,
{
let lock = self.lock(key)?;
let lock = self.lock(key).await?;
if let Some(reader) = lock.reader() {
// We use `detach_unlocked` here because we are sure that if the file exists it also has
// immutable content.
Expand All @@ -106,8 +107,8 @@ impl FileStore {

/// Gets readable access to the data with the specified key. Returns `None` if no such key
/// exists in the store.
pub fn get<K: CacheKey>(&self, key: &K) -> Option<impl Read + Seek> {
if let Some(lock) = self.lock_if_exists(key) {
pub async fn get<K: CacheKey>(&self, key: &K) -> Option<impl Read + Seek> {
if let Some(lock) = self.lock_if_exists(key).await {
if let Some(reader) = lock.reader() {
return Some(reader.detach_unlocked());
}
Expand All @@ -116,9 +117,9 @@ impl FileStore {
}

/// Locks a certain file in the cache for exclusive access.
pub fn lock<K: CacheKey>(&self, key: &K) -> io::Result<FileLock> {
pub async fn lock<K: CacheKey>(&self, key: &K) -> io::Result<FileLock> {
let path = self.base.join(key.key());
let lock = lock(&path, LockMode::Lock)?;
let lock = lock(&path, LockMode::Lock).await?;
Ok(FileLock {
tmp: self.tmp.clone(),
_lock_file: lock,
Expand All @@ -130,13 +131,16 @@ impl FileStore {
///
/// This function exists to ensure that we don't create tons of directories just to check if an
/// entry exists or not.
pub fn lock_if_exists<K: CacheKey>(&self, key: &K) -> Option<FileLock> {
pub async fn lock_if_exists<K: CacheKey>(&self, key: &K) -> Option<FileLock> {
let path = self.base.join(key.key());
lock(&path, LockMode::IfExists).ok().map(|lock| FileLock {
tmp: self.tmp.clone(),
_lock_file: lock,
path,
})
lock(&path, LockMode::IfExists)
.await
.ok()
.map(|lock| FileLock {
tmp: self.tmp.clone(),
_lock_file: lock,
path,
})
}
}

Expand Down Expand Up @@ -252,7 +256,7 @@ enum LockMode {

/// Create a `.lock` file for the file at the specified `path`. Only a single process has access to
/// the lock-file.
fn lock(path: &Path, mode: LockMode) -> io::Result<File> {
async fn lock(path: &Path, mode: LockMode) -> io::Result<File> {
// Determine the path of the lockfile
let lock_path = path.with_extension(".lock");

Expand All @@ -275,17 +279,27 @@ fn lock(path: &Path, mode: LockMode) -> io::Result<File> {

// Lock the file. On unix this is apparently a thin wrapper around flock(2) and it doesn't
// properly handle EINTR so we keep retrying when that happens.
retry_interrupted(|| lock.lock_exclusive())?;

let lock = task::spawn_blocking(move || {
retry_interrupted(|| lock.lock_exclusive()).unwrap();
lock
})
.await
.unwrap();

Ok(lock)
}

#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::timeout;

#[test]
fn test_file_store() {
#[tokio::test]
async fn test_file_store() {
let dir = tempfile::tempdir().unwrap();
let store = FileStore::new(dir.path()).unwrap();

Expand All @@ -294,9 +308,46 @@ mod test {
let mut read_back = Vec::new();
store
.get_or_set(&hello, |w| w.write_all(hello))
.await
.unwrap()
.read_to_end(&mut read_back)
.unwrap();
assert_eq!(read_back, hello);
}

/// Test deadlock situation that occurred
/// We want to test that progress can still be made even though a task is holding the lock
/// In the old implementation this would deadlock.
#[tokio::test]
async fn test_locking() {
// Start with some annoying async rust bookkeeping
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
let path2 = dir.path().to_path_buf();

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let notify3 = notify.clone();

// Use the same lock file for both tasks
let one = tokio::spawn(async move {
let _lock = lock(&path, LockMode::Lock).await.unwrap();
notify2.notify_one();
tokio::time::sleep(Duration::from_millis(100)).await;
});

let two = tokio::spawn(async move {
notify3.notified().await;
let _lock = lock(&path2, LockMode::Lock).await.unwrap();
});

// We expect this to finish in a reasonable amount of time
// so we set a timeout of 2 seconds
let (a, b) = tokio::join!(
timeout(Duration::from_secs(2), one),
timeout(Duration::from_secs(2), two)
);
a.unwrap().unwrap();
b.unwrap().unwrap();
}
}
2 changes: 1 addition & 1 deletion crates/rattler_installs_packages/src/index/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Http {
Ok(response)
} else {
let key = key_for_request(&url, method, &headers);
let lock = self.http_cache.lock(&key.as_slice())?;
let lock = self.http_cache.lock(&key.as_slice()).await?;

if let Some((old_policy, final_url, old_body)) = lock
.reader()
Expand Down
21 changes: 11 additions & 10 deletions crates/rattler_installs_packages/src/index/package_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,20 @@ impl PackageDb {

/// Reads the metadata for the given artifact from the cache or return `None` if the metadata
/// could not be found in the cache.
fn metadata_from_cache(&self, ai: &ArtifactInfo) -> Option<Vec<u8>> {
let mut data = self.metadata_cache.get(&ai.hashes.as_ref()?)?;
async fn metadata_from_cache(&self, ai: &ArtifactInfo) -> Option<Vec<u8>> {
let mut data = self.metadata_cache.get(&ai.hashes.as_ref()?).await?;
let mut bytes = Vec::new();
data.read_to_end(&mut bytes).ok()?;
Some(bytes)
}

/// Writes the metadata for the given artifact into the cache. If the metadata already exists
/// its not overwritten.
fn put_metadata_in_cache(&self, ai: &ArtifactInfo, blob: &[u8]) -> miette::Result<()> {
async fn put_metadata_in_cache(&self, ai: &ArtifactInfo, blob: &[u8]) -> miette::Result<()> {
if let Some(hash) = &ai.hashes {
self.metadata_cache
.get_or_set(&hash, |w| w.write_all(blob))
.await
.into_diagnostic()?;
}
Ok(())
Expand All @@ -140,7 +141,7 @@ impl PackageDb {
let metadata = artifact.metadata();
match metadata {
Ok((blob, metadata)) => {
self.put_metadata_in_cache(artifact_info, &blob)?;
self.put_metadata_in_cache(artifact_info, &blob).await?;
return Ok(Some((artifact_info, metadata)));
}
Err(err) => {
Expand Down Expand Up @@ -170,7 +171,7 @@ impl PackageDb {
// Save the pep643 metadata in the cache if it is available
let metadata = sdist.pep643_metadata();
if let Some((bytes, _)) = metadata {
self.put_metadata_in_cache(artifact_info, &bytes)?;
self.put_metadata_in_cache(artifact_info, &bytes).await?;
}
}
Err(err) => match err.downcast_ref::<HttpRequestError>() {
Expand Down Expand Up @@ -213,7 +214,7 @@ impl PackageDb {
let metadata = artifact.metadata();
match metadata {
Ok((blob, metadata)) => {
self.put_metadata_in_cache(artifact_info, &blob)?;
self.put_metadata_in_cache(artifact_info, &blob).await?;
return Ok(Some((artifact_info, metadata)));
}
Err(err) => {
Expand Down Expand Up @@ -246,7 +247,7 @@ impl PackageDb {
let metadata = wheel_builder.get_sdist_metadata(&artifact).await;
match metadata {
Ok((blob, metadata)) => {
self.put_metadata_in_cache(artifact_info, &blob)?;
self.put_metadata_in_cache(artifact_info, &blob).await?;
return Ok(Some((artifact_info, metadata)));
}
Err(err) => {
Expand All @@ -272,7 +273,7 @@ impl PackageDb {
// Check if we already have information about any of the artifacts cached.
// Return if we do
for artifact_info in artifacts.iter().copied() {
if let Some(metadata_bytes) = self.metadata_from_cache(artifact_info) {
if let Some(metadata_bytes) = self.metadata_from_cache(artifact_info).await {
return Ok(Some((
artifact_info,
WheelCoreMetadata::try_from(metadata_bytes.as_slice()).into_diagnostic()?,
Expand Down Expand Up @@ -329,7 +330,7 @@ impl PackageDb {
{
match Wheel::read_metadata_bytes(name, &mut reader).await {
Ok((blob, metadata)) => {
self.put_metadata_in_cache(artifact_info, &blob)?;
self.put_metadata_in_cache(artifact_info, &blob).await?;
return Ok(Some(metadata));
}
Err(err) => {
Expand Down Expand Up @@ -366,7 +367,7 @@ impl PackageDb {
.into_diagnostic()?;

let metadata = WheelCoreMetadata::try_from(bytes.as_slice()).into_diagnostic()?;
self.put_metadata_in_cache(artifact_info, &bytes)?;
self.put_metadata_in_cache(artifact_info, &bytes).await?;
Ok((artifact_info, metadata))
}

Expand Down

0 comments on commit 68f374b

Please sign in to comment.