Skip to content

Commit

Permalink
fix: duplicate move lock files (#8732)
Browse files Browse the repository at this point in the history
  • Loading branch information
LingyuCoder authored Dec 17, 2024
1 parent 36094e2 commit da44eab
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 110 deletions.
47 changes: 8 additions & 39 deletions crates/rspack_storage/src/pack/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,9 @@ async fn save_scopes(
) -> Result<ScopeMap> {
scopes.retain(|_, scope| scope.loaded());

for (_, scope) in scopes.iter_mut() {
strategy.before_all(scope)?;
}
strategy.before_all(&mut scopes).await?;

join_all(
scopes
.values()
.map(|scope| async move { strategy.before_write(scope).await })
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let wrote_results = join_all(
let changed = join_all(
scopes
.values_mut()
.map(|scope| async move {
Expand All @@ -261,33 +249,14 @@ async fn save_scopes(
.into_iter()
.collect::<Result<Vec<WriteScopeResult>>>()?
.into_iter()
.collect_vec();
.fold(WriteScopeResult::default(), |mut acc, res| {
acc.extend(res);
acc
});

strategy.write_root_meta(root_meta).await?;

join_all(
scopes
.values()
.zip(wrote_results)
.map(|(scope, scope_wrote_result)| async move {
strategy
.after_write(
scope,
scope_wrote_result.wrote_files,
scope_wrote_result.removed_files,
)
.await
})
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

for (_, scope) in scopes.iter_mut() {
strategy.after_all(scope)?;
}

strategy.merge_changed(changed).await?;
strategy.after_all(&mut scopes).await?;
strategy
.clean_unused(root_meta, &scopes, root_options)
.await?;
Expand Down
14 changes: 4 additions & 10 deletions crates/rspack_storage/src/pack/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait ScopeValidateStrategy {
async fn validate_packs(&self, scope: &mut PackScope) -> Result<ValidateResult>;
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WriteScopeResult {
pub wrote_files: HashSet<Utf8PathBuf>,
pub removed_files: HashSet<Utf8PathBuf>,
Expand All @@ -154,15 +154,9 @@ pub type ScopeUpdate = HashMap<StorageItemKey, Option<StorageItemValue>>;
#[async_trait]
pub trait ScopeWriteStrategy {
fn update_scope(&self, scope: &mut PackScope, updates: ScopeUpdate) -> Result<()>;
fn before_all(&self, scope: &mut PackScope) -> Result<()>;
async fn before_write(&self, scope: &PackScope) -> Result<()>;
async fn before_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
async fn write_packs(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn write_meta(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn after_write(
&self,
scope: &PackScope,
wrote_files: HashSet<Utf8PathBuf>,
removed_files: HashSet<Utf8PathBuf>,
) -> Result<()>;
fn after_all(&self, scope: &mut PackScope) -> Result<()>;
async fn merge_changed(&self, changed: WriteScopeResult) -> Result<()>;
async fn after_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
}
129 changes: 119 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/handle_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@ use crate::{
PackFS,
};

pub async fn prepare_scope(
scope_path: &Utf8Path,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let temp_path = redirect_to_path(scope_path, root, temp_root)?;
fs.remove_dir(&temp_path).await?;
fs.ensure_dir(&temp_path).await?;
fs.ensure_dir(scope_path).await?;
Ok(())
}

pub async fn prepare_scope_dirs(
scopes: &HashMap<String, PackScope>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let tasks = scopes.values().map(|scope| {
let fs = fs.clone();
let scope_path = scope.path.clone();
let root_path = root.to_path_buf();
let temp_root_path = temp_root.to_path_buf();
tokio::spawn(async move { prepare_scope(&scope_path, &root_path, &temp_root_path, fs).await })
.map_err(|e| error!("{e}"))
});

let res = join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let mut errors = vec![];
for task_result in res {
if let Err(e) = task_result {
errors.push(format!("- {}", e));
}
}

if errors.is_empty() {
Ok(())
} else {
Err(error!(
"prepare scopes directories failed:\n{}",
errors.join("\n")
))
}
}

pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> Result<()> {
let tasks = files.into_iter().map(|path| {
let fs = fs.clone();
Expand All @@ -35,7 +85,32 @@ pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> R
}
}

pub async fn move_temp_files(
pub async fn write_lock(
lock_file: &str,
files: &HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join(lock_file);
let mut lock_writer = fs.write_file(&lock_file).await?;
let mut contents = vec![temp_root.to_string()];
contents.extend(files.iter().map(|path| path.to_string()));

lock_writer
.write_all(contents.join("\n").as_bytes())
.await?;
lock_writer.flush().await?;
Ok(())
}

pub async fn remove_lock(lock_file: &str, root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<()> {
let lock_file = root.join(lock_file);
fs.remove_file(&lock_file).await?;
Ok(())
}

pub async fn move_files(
files: HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
Expand Down Expand Up @@ -75,27 +150,29 @@ pub async fn move_temp_files(
}

if errors.is_empty() {
fs.remove_file(&lock_file).await?;

Ok(())
} else {
Err(error!("move temp files failed:\n{}", errors.join("\n")))
}
}

pub async fn recovery_move_lock(
async fn recovery_lock(
lock: &str,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join("move.lock");
) -> Result<Vec<String>> {
let lock_file = root.join(lock);
if !fs.exists(&lock_file).await? {
return Ok(());
return Ok(vec![]);
}
let mut lock_reader = fs.read_file(&lock_file).await?;
let lock_file_content = String::from_utf8(lock_reader.read_to_end().await?)
.map_err(|e| error!("parse utf8 failed: {}", e))?;
let files = lock_file_content.split("\n").collect::<Vec<_>>();
let files = lock_file_content
.split("\n")
.map(|i| i.to_owned())
.collect::<Vec<_>>();
fs.remove_file(&lock_file).await?;

if files.is_empty() {
Expand All @@ -106,8 +183,20 @@ pub async fn recovery_move_lock(
"incomplete storage due to `move.lock` from an unexpected directory"
));
}
move_temp_files(
files[1..]
Ok(files[1..].to_vec())
}

pub async fn recovery_move_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let moving_files = recovery_lock("move.lock", root, temp_root, fs.clone()).await?;
if moving_files.is_empty() {
return Ok(());
}
move_files(
moving_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
Expand All @@ -119,6 +208,26 @@ pub async fn recovery_move_lock(
Ok(())
}

pub async fn recovery_remove_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let removing_files = recovery_lock("remove.lock", root, temp_root, fs.clone()).await?;
if removing_files.is_empty() {
return Ok(());
}
remove_files(
removing_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
fs,
)
.await?;
Ok(())
}

pub async fn walk_dir(root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<HashSet<Utf8PathBuf>> {
let mut files = HashSet::default();
let mut stack = vec![root.to_owned()];
Expand Down
5 changes: 4 additions & 1 deletion crates/rspack_storage/src/pack/strategy/split/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ mod write_scope;

use std::{hash::Hasher, sync::Arc};

use handle_file::{clean_root, clean_scopes, clean_versions, recovery_move_lock};
use handle_file::{
clean_root, clean_scopes, clean_versions, recovery_move_lock, recovery_remove_lock,
};
use itertools::Itertools;
use rspack_error::{error, Result};
use rspack_paths::{Utf8Path, Utf8PathBuf};
Expand Down Expand Up @@ -58,6 +60,7 @@ impl SplitPackStrategy {
#[async_trait::async_trait]
impl RootStrategy for SplitPackStrategy {
async fn before_load(&self) -> Result<()> {
recovery_remove_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
recovery_move_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
Ok(())
}
Expand Down
39 changes: 29 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use crate::pack::data::{Pack, PackContents, PackFileMeta, PackKeys, PackScope};
pub type PackIndexList = Vec<(usize, usize)>;
pub type PackInfoList<'a> = Vec<(&'a PackFileMeta, &'a Pack)>;

pub fn flag_scope_wrote(scope: &mut PackScope) {
let scope_meta = scope.meta.expect_value_mut();
for bucket in scope_meta.packs.iter_mut() {
for pack in bucket {
pack.wrote = true;
}
}
}

pub fn get_indexed_packs<'a>(
scope: &'a PackScope,
filter: Option<&dyn Fn(&'a Pack, &'a PackFileMeta) -> bool>,
Expand Down Expand Up @@ -73,11 +82,15 @@ pub mod test_pack_utils {
use rspack_paths::{AssertUtf8, Utf8Path, Utf8PathBuf};
use rustc_hash::FxHashMap as HashMap;

use super::flag_scope_wrote;
use crate::{
pack::{
data::{current_time, PackOptions, PackScope},
fs::PackFS,
strategy::{ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy, WriteScopeResult},
strategy::{
split::handle_file::prepare_scope, ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy,
WriteScopeResult,
},
},
PackBridgeFS,
};
Expand Down Expand Up @@ -226,15 +239,21 @@ pub mod test_pack_utils {
scope: &mut PackScope,
strategy: &SplitPackStrategy,
) -> Result<WriteScopeResult> {
let mut res = WriteScopeResult::default();
strategy.before_write(scope).await?;
res.extend(strategy.write_packs(scope).await?);
res.extend(strategy.write_meta(scope).await?);
strategy
.after_write(scope, res.wrote_files.clone(), res.removed_files.clone())
.await?;
strategy.after_all(scope)?;
Ok(res)
prepare_scope(
&scope.path,
&strategy.root,
&strategy.temp_root,
strategy.fs.clone(),
)
.await?;

let mut changed = WriteScopeResult::default();
changed.extend(strategy.write_packs(scope).await?);
changed.extend(strategy.write_meta(scope).await?);
strategy.merge_changed(changed.clone()).await?;
flag_scope_wrote(scope);

Ok(changed)
}

pub fn get_native_path(p: &str) -> Utf8PathBuf {
Expand Down
Loading

2 comments on commit da44eab

@rspack-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Ran ecosystem CI: Open

suite result
modernjs ❌ failure
_selftest ✅ success
rsdoctor ❌ failure
rspress ✅ success
rslib ✅ success
rsbuild ❌ failure
examples ❌ failure
devserver ✅ success
nuxt ✅ success

@rspack-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Benchmark detail: Open

Name Base (2024-12-17 fb2869d) Current Change
10000_big_production-mode_disable-minimize + exec 37.9 s ± 622 ms 38.2 s ± 795 ms +0.78 %
10000_development-mode + exec 1.87 s ± 24 ms 1.84 s ± 24 ms -1.97 %
10000_development-mode_hmr + exec 687 ms ± 28 ms 676 ms ± 5.1 ms -1.68 %
10000_production-mode + exec 2.47 s ± 48 ms 2.39 s ± 81 ms -3.18 %
arco-pro_development-mode + exec 1.77 s ± 90 ms 1.79 s ± 122 ms +1.43 %
arco-pro_development-mode_hmr + exec 379 ms ± 1.6 ms 378 ms ± 3.6 ms -0.17 %
arco-pro_production-mode + exec 3.31 s ± 75 ms 3.29 s ± 89 ms -0.67 %
arco-pro_production-mode_generate-package-json-webpack-plugin + exec 3.37 s ± 152 ms 3.35 s ± 85 ms -0.79 %
arco-pro_production-mode_traverse-chunk-modules + exec 3.34 s ± 124 ms 3.31 s ± 53 ms -1.12 %
threejs_development-mode_10x + exec 1.61 s ± 20 ms 1.6 s ± 19 ms -0.55 %
threejs_development-mode_10x_hmr + exec 794 ms ± 28 ms 793 ms ± 25 ms -0.08 %
threejs_production-mode_10x + exec 5.42 s ± 141 ms 5.39 s ± 110 ms -0.49 %
10000_big_production-mode_disable-minimize + rss memory 9482 MiB ± 276 MiB 9554 MiB ± 391 MiB +0.76 %
10000_development-mode + rss memory 633 MiB ± 14.8 MiB 685 MiB ± 19.1 MiB +8.21 %
10000_development-mode_hmr + rss memory 1396 MiB ± 296 MiB 1424 MiB ± 389 MiB +2.02 %
10000_production-mode + rss memory 601 MiB ± 31.9 MiB 659 MiB ± 32.8 MiB +9.66 %
arco-pro_development-mode + rss memory 587 MiB ± 40.3 MiB 613 MiB ± 30 MiB +4.40 %
arco-pro_development-mode_hmr + rss memory 639 MiB ± 102 MiB 626 MiB ± 87.6 MiB -2.03 %
arco-pro_production-mode + rss memory 742 MiB ± 50.5 MiB 740 MiB ± 36.1 MiB -0.29 %
arco-pro_production-mode_generate-package-json-webpack-plugin + rss memory 737 MiB ± 45.5 MiB 764 MiB ± 65.7 MiB +3.77 %
arco-pro_production-mode_traverse-chunk-modules + rss memory 720 MiB ± 58.3 MiB 769 MiB ± 58.5 MiB +6.78 %
threejs_development-mode_10x + rss memory 638 MiB ± 22.2 MiB 686 MiB ± 49.9 MiB +7.61 %
threejs_development-mode_10x_hmr + rss memory 1194 MiB ± 248 MiB 1125 MiB ± 235 MiB -5.78 %
threejs_production-mode_10x + rss memory 948 MiB ± 55 MiB 986 MiB ± 74.6 MiB +4.02 %

Please sign in to comment.