Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duplicate move lock files #8732

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 8 additions & 39 deletions crates/rspack_storage/src/pack/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,9 @@ async fn save_scopes(
) -> Result<ScopeMap> {
scopes.retain(|_, scope| scope.loaded());

for (_, scope) in scopes.iter_mut() {
strategy.before_all(scope)?;
}
strategy.before_all(&mut scopes).await?;

join_all(
scopes
.values()
.map(|scope| async move { strategy.before_write(scope).await })
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let wrote_results = join_all(
let changed = join_all(
scopes
.values_mut()
.map(|scope| async move {
Expand All @@ -261,33 +249,14 @@ async fn save_scopes(
.into_iter()
.collect::<Result<Vec<WriteScopeResult>>>()?
.into_iter()
.collect_vec();
.fold(WriteScopeResult::default(), |mut acc, res| {
acc.extend(res);
acc
});

strategy.write_root_meta(root_meta).await?;

join_all(
scopes
.values()
.zip(wrote_results)
.map(|(scope, scope_wrote_result)| async move {
strategy
.after_write(
scope,
scope_wrote_result.wrote_files,
scope_wrote_result.removed_files,
)
.await
})
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

for (_, scope) in scopes.iter_mut() {
strategy.after_all(scope)?;
}

strategy.merge_changed(changed).await?;
strategy.after_all(&mut scopes).await?;
strategy
.clean_unused(root_meta, &scopes, root_options)
.await?;
Expand Down
14 changes: 4 additions & 10 deletions crates/rspack_storage/src/pack/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait ScopeValidateStrategy {
async fn validate_packs(&self, scope: &mut PackScope) -> Result<ValidateResult>;
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WriteScopeResult {
pub wrote_files: HashSet<Utf8PathBuf>,
pub removed_files: HashSet<Utf8PathBuf>,
Expand All @@ -154,15 +154,9 @@ pub type ScopeUpdate = HashMap<StorageItemKey, Option<StorageItemValue>>;
#[async_trait]
pub trait ScopeWriteStrategy {
fn update_scope(&self, scope: &mut PackScope, updates: ScopeUpdate) -> Result<()>;
fn before_all(&self, scope: &mut PackScope) -> Result<()>;
async fn before_write(&self, scope: &PackScope) -> Result<()>;
async fn before_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
async fn write_packs(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn write_meta(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn after_write(
&self,
scope: &PackScope,
wrote_files: HashSet<Utf8PathBuf>,
removed_files: HashSet<Utf8PathBuf>,
) -> Result<()>;
fn after_all(&self, scope: &mut PackScope) -> Result<()>;
async fn merge_changed(&self, changed: WriteScopeResult) -> Result<()>;
async fn after_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
}
129 changes: 119 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/handle_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@ use crate::{
PackFS,
};

pub async fn prepare_scope(
scope_path: &Utf8Path,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let temp_path = redirect_to_path(scope_path, root, temp_root)?;
fs.remove_dir(&temp_path).await?;
fs.ensure_dir(&temp_path).await?;
fs.ensure_dir(scope_path).await?;
Ok(())
}

pub async fn prepare_scope_dirs(
scopes: &HashMap<String, PackScope>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let tasks = scopes.values().map(|scope| {
let fs = fs.clone();
let scope_path = scope.path.clone();
let root_path = root.to_path_buf();
let temp_root_path = temp_root.to_path_buf();
tokio::spawn(async move { prepare_scope(&scope_path, &root_path, &temp_root_path, fs).await })
.map_err(|e| error!("{e}"))
});

let res = join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let mut errors = vec![];
for task_result in res {
if let Err(e) = task_result {
errors.push(format!("- {}", e));
}
}

if errors.is_empty() {
Ok(())
} else {
Err(error!(
"prepare scopes directories failed:\n{}",
errors.join("\n")
))
}
}

pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> Result<()> {
let tasks = files.into_iter().map(|path| {
let fs = fs.clone();
Expand All @@ -35,7 +85,32 @@ pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> R
}
}

pub async fn move_temp_files(
pub async fn write_lock(
lock_file: &str,
files: &HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join(lock_file);
let mut lock_writer = fs.write_file(&lock_file).await?;
let mut contents = vec![temp_root.to_string()];
contents.extend(files.iter().map(|path| path.to_string()));

lock_writer
.write_all(contents.join("\n").as_bytes())
.await?;
lock_writer.flush().await?;
Ok(())
}

pub async fn remove_lock(lock_file: &str, root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<()> {
let lock_file = root.join(lock_file);
fs.remove_file(&lock_file).await?;
Ok(())
}

pub async fn move_files(
files: HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
Expand Down Expand Up @@ -75,27 +150,29 @@ pub async fn move_temp_files(
}

if errors.is_empty() {
fs.remove_file(&lock_file).await?;

Ok(())
} else {
Err(error!("move temp files failed:\n{}", errors.join("\n")))
}
}

pub async fn recovery_move_lock(
async fn recovery_lock(
lock: &str,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join("move.lock");
) -> Result<Vec<String>> {
let lock_file = root.join(lock);
if !fs.exists(&lock_file).await? {
return Ok(());
return Ok(vec![]);
}
let mut lock_reader = fs.read_file(&lock_file).await?;
let lock_file_content = String::from_utf8(lock_reader.read_to_end().await?)
.map_err(|e| error!("parse utf8 failed: {}", e))?;
let files = lock_file_content.split("\n").collect::<Vec<_>>();
let files = lock_file_content
.split("\n")
.map(|i| i.to_owned())
.collect::<Vec<_>>();
fs.remove_file(&lock_file).await?;

if files.is_empty() {
Expand All @@ -106,8 +183,20 @@ pub async fn recovery_move_lock(
"incomplete storage due to `move.lock` from an unexpected directory"
));
}
move_temp_files(
files[1..]
Ok(files[1..].to_vec())
}

pub async fn recovery_move_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let moving_files = recovery_lock("move.lock", root, temp_root, fs.clone()).await?;
if moving_files.is_empty() {
return Ok(());
}
move_files(
moving_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
Expand All @@ -119,6 +208,26 @@ pub async fn recovery_move_lock(
Ok(())
}

pub async fn recovery_remove_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let removing_files = recovery_lock("remove.lock", root, temp_root, fs.clone()).await?;
if removing_files.is_empty() {
return Ok(());
}
remove_files(
removing_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
fs,
)
.await?;
Ok(())
}

pub async fn walk_dir(root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<HashSet<Utf8PathBuf>> {
let mut files = HashSet::default();
let mut stack = vec![root.to_owned()];
Expand Down
5 changes: 4 additions & 1 deletion crates/rspack_storage/src/pack/strategy/split/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ mod write_scope;

use std::{hash::Hasher, sync::Arc};

use handle_file::{clean_root, clean_scopes, clean_versions, recovery_move_lock};
use handle_file::{
clean_root, clean_scopes, clean_versions, recovery_move_lock, recovery_remove_lock,
};
use itertools::Itertools;
use rspack_error::{error, Result};
use rspack_paths::{Utf8Path, Utf8PathBuf};
Expand Down Expand Up @@ -58,6 +60,7 @@ impl SplitPackStrategy {
#[async_trait::async_trait]
impl RootStrategy for SplitPackStrategy {
async fn before_load(&self) -> Result<()> {
recovery_remove_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
recovery_move_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
Ok(())
}
Expand Down
39 changes: 29 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use crate::pack::data::{Pack, PackContents, PackFileMeta, PackKeys, PackScope};
pub type PackIndexList = Vec<(usize, usize)>;
pub type PackInfoList<'a> = Vec<(&'a PackFileMeta, &'a Pack)>;

pub fn flag_scope_wrote(scope: &mut PackScope) {
let scope_meta = scope.meta.expect_value_mut();
for bucket in scope_meta.packs.iter_mut() {
for pack in bucket {
pack.wrote = true;
}
}
}

pub fn get_indexed_packs<'a>(
scope: &'a PackScope,
filter: Option<&dyn Fn(&'a Pack, &'a PackFileMeta) -> bool>,
Expand Down Expand Up @@ -73,11 +82,15 @@ pub mod test_pack_utils {
use rspack_paths::{AssertUtf8, Utf8Path, Utf8PathBuf};
use rustc_hash::FxHashMap as HashMap;

use super::flag_scope_wrote;
use crate::{
pack::{
data::{current_time, PackOptions, PackScope},
fs::PackFS,
strategy::{ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy, WriteScopeResult},
strategy::{
split::handle_file::prepare_scope, ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy,
WriteScopeResult,
},
},
PackBridgeFS,
};
Expand Down Expand Up @@ -226,15 +239,21 @@ pub mod test_pack_utils {
scope: &mut PackScope,
strategy: &SplitPackStrategy,
) -> Result<WriteScopeResult> {
let mut res = WriteScopeResult::default();
strategy.before_write(scope).await?;
res.extend(strategy.write_packs(scope).await?);
res.extend(strategy.write_meta(scope).await?);
strategy
.after_write(scope, res.wrote_files.clone(), res.removed_files.clone())
.await?;
strategy.after_all(scope)?;
Ok(res)
prepare_scope(
&scope.path,
&strategy.root,
&strategy.temp_root,
strategy.fs.clone(),
)
.await?;

let mut changed = WriteScopeResult::default();
changed.extend(strategy.write_packs(scope).await?);
changed.extend(strategy.write_meta(scope).await?);
strategy.merge_changed(changed.clone()).await?;
flag_scope_wrote(scope);

Ok(changed)
}

pub fn get_native_path(p: &str) -> Utf8PathBuf {
Expand Down
Loading
Loading