From f141e1a7e152fd0f15f36c7e67571bdd5e5c2746 Mon Sep 17 00:00:00 2001 From: jinrui Date: Mon, 6 May 2024 14:51:49 +0800 Subject: [PATCH] perf: optimize execute module (#6415) * feat: task_loop support listen task finish event * perf: optimize execute module * fix: clippy * fix: clean event before run finish module task * fix: self module FinishDeps bug * fix: hmr panic --- .../rspack_binding_values/src/compilation.rs | 13 - .../rspack_core/src/compiler/compilation.rs | 18 +- crates/rspack_core/src/compiler/hmr.rs | 10 +- crates/rspack_core/src/compiler/make/mod.rs | 37 ++- .../src/compiler/make/repair/factorize.rs | 2 +- .../src/compiler/make/repair/mod.rs | 64 +++- .../src/compiler/module_executor/ctrl.rs | 287 ++++++++++++++++++ .../src/compiler/module_executor/entry.rs | 71 +++++ .../execute.rs} | 139 ++++----- .../src/compiler/module_executor/mod.rs | 178 +++++++++++ .../src/compiler/module_executor/overwrite.rs | 92 ++++++ crates/rspack_core/src/utils/task_loop.rs | 34 ++- 12 files changed, 808 insertions(+), 137 deletions(-) create mode 100644 crates/rspack_core/src/compiler/module_executor/ctrl.rs create mode 100644 crates/rspack_core/src/compiler/module_executor/entry.rs rename crates/rspack_core/src/compiler/{module_executor.rs => module_executor/execute.rs} (69%) create mode 100644 crates/rspack_core/src/compiler/module_executor/mod.rs create mode 100644 crates/rspack_core/src/compiler/module_executor/overwrite.rs diff --git a/crates/rspack_binding_values/src/compilation.rs b/crates/rspack_binding_values/src/compilation.rs index 94b7503db83..f8ae66a72fb 100644 --- a/crates/rspack_binding_values/src/compilation.rs +++ b/crates/rspack_binding_values/src/compilation.rs @@ -463,13 +463,6 @@ impl JsCompilation { original_module_context: Option, callback: JsFunction, ) -> Result<()> { - let options = self.0.options.clone(); - let plugin_driver = self.0.plugin_driver.clone(); - let resolver_factory = self.0.resolver_factory.clone(); - let loader_resolver_factory = self.0.loader_resolver_factory.clone(); - let cache = self.0.cache.clone(); - let dependency_factories = self.0.dependency_factories.clone(); - callbackify(env, callback, async { let module_executor = self .0 @@ -478,12 +471,6 @@ impl JsCompilation { .expect("should have module executor"); let result = module_executor .import_module( - options, - plugin_driver, - resolver_factory, - loader_resolver_factory, - cache, - dependency_factories, request, public_path, base_uri, diff --git a/crates/rspack_core/src/compiler/compilation.rs b/crates/rspack_core/src/compiler/compilation.rs index 29366ac2605..2cc3c6d47b5 100644 --- a/crates/rspack_core/src/compiler/compilation.rs +++ b/crates/rspack_core/src/compiler/compilation.rs @@ -609,6 +609,13 @@ impl Compilation { #[instrument(name = "compilation:make", skip_all)] pub async fn make(&mut self, mut params: Vec) -> Result<()> { + // run module_executor + if let Some(module_executor) = &mut self.module_executor { + let mut module_executor = std::mem::take(module_executor); + module_executor.hook_before_make(self, ¶ms).await; + self.module_executor = Some(module_executor); + } + let make_failed_module = MakeParam::ForceBuildModules(std::mem::take(&mut self.make_failed_module)); let make_failed_dependencies = @@ -1042,13 +1049,10 @@ impl Compilation { logger.time_end(start); // sync assets to compilation from module_executor - let assets = self - .module_executor - .as_mut() - .map(|module_executor| std::mem::take(&mut module_executor.assets)) - .unwrap_or_default(); - for (filename, asset) in assets { - self.emit_asset(filename, asset) + if let Some(module_executor) = &mut self.module_executor { + let mut module_executor = std::mem::take(module_executor); + module_executor.hook_before_process_assets(self).await; + self.module_executor = Some(module_executor); } let start = logger.time("process assets"); diff --git a/crates/rspack_core/src/compiler/hmr.rs b/crates/rspack_core/src/compiler/hmr.rs index d6eca817681..ad3868e00ec 100644 --- a/crates/rspack_core/src/compiler/hmr.rs +++ b/crates/rspack_core/src/compiler/hmr.rs @@ -9,7 +9,9 @@ use rspack_sources::Source; use rustc_hash::FxHashSet as HashSet; use super::MakeParam; -use crate::{fast_set, get_chunk_from_ukey, ChunkKind, Compilation, Compiler, RuntimeSpec}; +use crate::{ + fast_set, get_chunk_from_ukey, ChunkKind, Compilation, Compiler, ModuleExecutor, RuntimeSpec, +}; impl Compiler where @@ -73,8 +75,7 @@ where self.loader_resolver_factory.clone(), Some(records), self.cache.clone(), - // reuse module executor - std::mem::take(&mut self.compilation.module_executor), + Some(ModuleExecutor::default()), ); if let Some(state) = self.options.get_incremental_rebuild_make_state() { @@ -118,6 +119,9 @@ where new_compilation.code_splitting_cache = std::mem::take(&mut self.compilation.code_splitting_cache); + // reuse module executor + new_compilation.module_executor = std::mem::take(&mut self.compilation.module_executor); + new_compilation.has_module_import_export_change = false; } diff --git a/crates/rspack_core/src/compiler/make/mod.rs b/crates/rspack_core/src/compiler/make/mod.rs index fa0cc36356f..e59a90ecad9 100644 --- a/crates/rspack_core/src/compiler/make/mod.rs +++ b/crates/rspack_core/src/compiler/make/mod.rs @@ -1,5 +1,5 @@ mod cutout; -mod repair; +pub mod repair; use std::{hash::BuildHasherDefault, path::PathBuf}; @@ -19,16 +19,16 @@ use crate::{ #[derive(Debug, Default)] pub struct MakeArtifact { module_graph_partial: ModuleGraphPartial, - make_failed_dependencies: HashSet, - make_failed_module: HashSet, - diagnostics: Vec, + pub make_failed_dependencies: HashSet, + pub make_failed_module: HashSet, + pub diagnostics: Vec, entry_module_identifiers: IdentifierSet, optimize_analyze_result_map: IdentifierMap, - file_dependencies: IndexSet>, - context_dependencies: IndexSet>, - missing_dependencies: IndexSet>, - build_dependencies: IndexSet>, + pub file_dependencies: IndexSet>, + pub context_dependencies: IndexSet>, + pub missing_dependencies: IndexSet>, + pub build_dependencies: IndexSet>, has_module_graph_change: bool, } @@ -76,7 +76,7 @@ impl MakeArtifact { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum MakeParam { ModifiedFiles(HashSet), DeletedFiles(HashSet), @@ -99,10 +99,8 @@ pub async fn update_module_graph( let mut artifact = MakeArtifact::default(); compilation.swap_make_artifact(&mut artifact); artifact.move_data_from_compilation(compilation); - let mut cutout = Cutout::default(); - let build_dependencies = cutout.cutout_artifact(&mut artifact, params); - artifact = repair(compilation, artifact, build_dependencies)?; - cutout.fix_artifact(&mut artifact); + + artifact = update_module_graph_with_artifact(compilation, artifact, params).await?; // Avoid to introduce too much overhead, // until we find a better way to align with webpack hmr behavior @@ -149,3 +147,16 @@ pub async fn update_module_graph( compilation.swap_make_artifact(&mut artifact); Ok(()) } + +pub async fn update_module_graph_with_artifact( + compilation: &Compilation, + mut artifact: MakeArtifact, + params: Vec, +) -> Result { + let mut cutout = Cutout::default(); + let build_dependencies = cutout.cutout_artifact(&mut artifact, params); + artifact = repair(compilation, artifact, build_dependencies)?; + cutout.fix_artifact(&mut artifact); + + Ok(artifact) +} diff --git a/crates/rspack_core/src/compiler/make/repair/factorize.rs b/crates/rspack_core/src/compiler/make/repair/factorize.rs index fcbe67a6739..497fe759742 100644 --- a/crates/rspack_core/src/compiler/make/repair/factorize.rs +++ b/crates/rspack_core/src/compiler/make/repair/factorize.rs @@ -147,7 +147,7 @@ pub struct ExportsInfoRelated { } #[derive(Debug)] -struct FactorizeResultTask { +pub struct FactorizeResultTask { // pub dependency: DependencyId, pub original_module_identifier: Option, /// Result will be available if [crate::ModuleFactory::create] returns `Ok`. diff --git a/crates/rspack_core/src/compiler/make/repair/mod.rs b/crates/rspack_core/src/compiler/make/repair/mod.rs index e34aa16038f..369f60a3801 100644 --- a/crates/rspack_core/src/compiler/make/repair/mod.rs +++ b/crates/rspack_core/src/compiler/make/repair/mod.rs @@ -1,7 +1,7 @@ -mod add; -mod build; -mod factorize; -mod process_dependencies; +pub mod add; +pub mod build; +pub mod factorize; +pub mod process_dependencies; use std::{hash::BuildHasherDefault, path::PathBuf, sync::Arc}; @@ -21,14 +21,14 @@ use crate::{ NormalModuleSource, ResolverFactory, SharedPluginDriver, }; -struct MakeTaskContext { +pub struct MakeTaskContext { // compilation info - plugin_driver: SharedPluginDriver, - compiler_options: Arc, - resolver_factory: Arc, - loader_resolver_factory: Arc, - cache: Arc, - dependency_factories: HashMap>, + pub plugin_driver: SharedPluginDriver, + pub compiler_options: Arc, + pub resolver_factory: Arc, + pub loader_resolver_factory: Arc, + pub cache: Arc, + pub dependency_factories: HashMap>, // TODO move outof context logger: CompilationLogger, @@ -41,7 +41,7 @@ struct MakeTaskContext { /// Collecting all module that need to skip in tree-shaking ast modification phase // bailout_module_identifiers: IdentifierMap, // TODO change to artifact - module_graph_partial: ModuleGraphPartial, + pub module_graph_partial: ModuleGraphPartial, make_failed_dependencies: HashSet, make_failed_module: HashSet, @@ -56,7 +56,7 @@ struct MakeTaskContext { } impl MakeTaskContext { - fn new(compilation: &Compilation, artifact: MakeArtifact) -> Self { + pub fn new(compilation: &Compilation, artifact: MakeArtifact) -> Self { let logger = compilation.get_logger("rspack.Compilation"); let mut build_cache_counter = None; let mut factorize_cache_counter = None; @@ -97,7 +97,7 @@ impl MakeTaskContext { } } - fn transform_to_make_artifact(self) -> MakeArtifact { + pub fn transform_to_make_artifact(self) -> MakeArtifact { let Self { module_graph_partial, make_failed_dependencies, @@ -137,9 +137,43 @@ impl MakeTaskContext { } // TODO use module graph with make artifact - fn get_module_graph_mut(partial: &mut ModuleGraphPartial) -> ModuleGraph { + pub fn get_module_graph_mut(partial: &mut ModuleGraphPartial) -> ModuleGraph { ModuleGraph::new(vec![], Some(partial)) } + + // TODO remove it after incremental rebuild cover all stage + pub fn transform_to_temp_compilation(&mut self) -> Compilation { + let mut compilation = Compilation::new( + self.compiler_options.clone(), + self.plugin_driver.clone(), + self.resolver_factory.clone(), + self.loader_resolver_factory.clone(), + None, + self.cache.clone(), + None, + ); + compilation.dependency_factories = self.dependency_factories.clone(); + let mut make_artifact = MakeArtifact { + module_graph_partial: std::mem::take(&mut self.module_graph_partial), + file_dependencies: std::mem::take(&mut self.file_dependencies), + context_dependencies: std::mem::take(&mut self.context_dependencies), + missing_dependencies: std::mem::take(&mut self.missing_dependencies), + build_dependencies: std::mem::take(&mut self.build_dependencies), + ..Default::default() + }; + compilation.swap_make_artifact(&mut make_artifact); + compilation + } + + pub fn recovery_from_temp_compilation(&mut self, mut compilation: Compilation) { + let mut make_artifact = Default::default(); + compilation.swap_make_artifact(&mut make_artifact); + self.module_graph_partial = make_artifact.module_graph_partial; + self.file_dependencies = make_artifact.file_dependencies; + self.context_dependencies = make_artifact.context_dependencies; + self.missing_dependencies = make_artifact.missing_dependencies; + self.build_dependencies = make_artifact.build_dependencies; + } } pub fn repair( diff --git a/crates/rspack_core/src/compiler/module_executor/ctrl.rs b/crates/rspack_core/src/compiler/module_executor/ctrl.rs new file mode 100644 index 00000000000..550980dd14b --- /dev/null +++ b/crates/rspack_core/src/compiler/module_executor/ctrl.rs @@ -0,0 +1,287 @@ +use std::collections::VecDeque; + +use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; +use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver}; + +use super::{ + entry::{EntryParam, EntryTask}, + execute::ExecuteTask, +}; +use crate::{ + compiler::make::repair::MakeTaskContext, + utils::task_loop::{Task, TaskResult, TaskType}, + Dependency, DependencyId, ModuleIdentifier, +}; + +#[derive(Debug)] +struct UnfinishCounter { + is_building: bool, + unfinished_child_module_count: usize, +} + +impl UnfinishCounter { + fn new() -> Self { + UnfinishCounter { + is_building: true, + unfinished_child_module_count: 0, + } + } + + fn set_unfinished_child_module_count(&mut self, count: usize) { + self.is_building = false; + self.unfinished_child_module_count = count; + } + + fn minus_one(&mut self) { + if self.is_building || self.unfinished_child_module_count == 0 { + panic!("UnfinishDepCount Error") + } + self.unfinished_child_module_count -= 1; + } + + fn is_finished(&self) -> bool { + !self.is_building && self.unfinished_child_module_count == 0 + } +} + +// send event can only use in sync task +pub enum Event { + StartBuild(ModuleIdentifier), + // origin_module_identifier and current dependency id and target_module_identifier + FinishDeps( + Option, + DependencyId, + Option, + ), + // current_module_identifier and sub dependency count + FinishModule(ModuleIdentifier, usize), + ExecuteModule(EntryParam, ExecuteTask), + Stop(), +} + +#[derive(Debug)] +pub struct CtrlTask { + pub event_receiver: UnboundedReceiver, + execute_task_map: HashMap, + running_module_map: HashMap, +} + +impl CtrlTask { + pub fn new(event_receiver: UnboundedReceiver) -> Self { + Self { + event_receiver, + execute_task_map: Default::default(), + running_module_map: Default::default(), + } + } +} + +#[async_trait::async_trait] +impl Task for CtrlTask { + fn get_task_type(&self) -> TaskType { + TaskType::Async + } + + async fn async_run(mut self: Box) -> TaskResult { + while let Some(event) = self.event_receiver.recv().await { + match event { + Event::StartBuild(module_identifier) => { + self + .running_module_map + .insert(module_identifier, UnfinishCounter::new()); + } + Event::FinishDeps(origin_module_identifier, dep_id, target_module_graph) => { + if let Some(target_module_graph) = target_module_graph { + if self.running_module_map.contains_key(&target_module_graph) + && Some(target_module_graph) != origin_module_identifier + { + continue; + } + } + + // target module finished + let Some(origin_module_identifier) = origin_module_identifier else { + // origin_module_identifier is none means entry dep + let execute_task = self + .execute_task_map + .remove(&dep_id) + .expect("should have execute task"); + return Ok(vec![Box::new(execute_task), self]); + }; + + let value = self + .running_module_map + .get_mut(&origin_module_identifier) + .expect("should have counter"); + value.minus_one(); + if value.is_finished() { + return Ok(vec![Box::new(FinishModuleTask { + ctrl_task: self, + module_identifier: origin_module_identifier, + })]); + } + } + Event::FinishModule(mid, size) => { + let value = self + .running_module_map + .get_mut(&mid) + .expect("should have counter"); + value.set_unfinished_child_module_count(size); + if value.is_finished() { + return Ok(vec![Box::new(FinishModuleTask { + ctrl_task: self, + module_identifier: mid, + })]); + } + } + Event::ExecuteModule(param, execute_task) => { + let dep_id = match ¶m { + EntryParam::DependencyId(id, _) => *id, + EntryParam::EntryDependency(dep) => *dep.id(), + }; + self.execute_task_map.insert(dep_id, execute_task); + return Ok(vec![Box::new(EntryTask { param }), self]); + } + Event::Stop() => { + return Ok(vec![]); + } + } + } + // if channel has been closed, finish this task + Ok(vec![]) + } +} + +#[derive(Debug)] +struct FinishModuleTask { + ctrl_task: Box, + module_identifier: ModuleIdentifier, +} + +impl Task for FinishModuleTask { + fn get_task_type(&self) -> TaskType { + TaskType::Sync + } + + fn sync_run(self: Box, context: &mut MakeTaskContext) -> TaskResult { + let Self { + mut ctrl_task, + module_identifier, + } = *self; + let mut res: Vec>> = vec![]; + let module_graph = MakeTaskContext::get_module_graph_mut(&mut context.module_graph_partial); + let mut queue = VecDeque::new(); + queue.push_back(module_identifier); + + // clean ctrl task events + loop { + let event = ctrl_task.event_receiver.try_recv(); + let Ok(event) = event else { + if matches!(event, Err(TryRecvError::Empty)) { + break; + } else { + panic!("clean ctrl_task event failed"); + } + }; + + match event { + Event::StartBuild(module_identifier) => { + ctrl_task + .running_module_map + .insert(module_identifier, UnfinishCounter::new()); + } + Event::FinishDeps(origin_module_identifier, dep_id, target_module_graph) => { + if let Some(target_module_graph) = target_module_graph { + if ctrl_task + .running_module_map + .contains_key(&target_module_graph) + && Some(target_module_graph) != origin_module_identifier + { + continue; + } + } + + // target module finished + let Some(origin_module_identifier) = origin_module_identifier else { + // origin_module_identifier is none means entry dep + let execute_task = ctrl_task + .execute_task_map + .remove(&dep_id) + .expect("should have execute task"); + res.push(Box::new(execute_task)); + continue; + }; + + let value = ctrl_task + .running_module_map + .get_mut(&origin_module_identifier) + .expect("should have counter"); + value.minus_one(); + if value.is_finished() { + queue.push_back(origin_module_identifier); + } + } + Event::FinishModule(mid, size) => { + let value = ctrl_task + .running_module_map + .get_mut(&mid) + .expect("should have counter"); + value.set_unfinished_child_module_count(size); + if value.is_finished() { + queue.push_back(mid); + } + } + Event::ExecuteModule(param, execute_task) => { + let dep_id = match ¶m { + EntryParam::DependencyId(id, _) => *id, + EntryParam::EntryDependency(dep) => *dep.id(), + }; + ctrl_task.execute_task_map.insert(dep_id, execute_task); + res.push(Box::new(EntryTask { param })); + } + Event::Stop() => { + return Ok(vec![]); + } + } + } + + while let Some(module_identifier) = queue.pop_front() { + ctrl_task.running_module_map.remove(&module_identifier); + + let mgm = module_graph + .module_graph_module_by_identifier(&module_identifier) + .expect("should have mgm"); + + let mut original_module_identifiers = HashSet::default(); + for connection_id in mgm.incoming_connections() { + let connection = module_graph + .connection_by_connection_id(connection_id) + .expect("should have connection"); + if let Some(original_module_identifier) = &connection.original_module_identifier { + original_module_identifiers.insert(*original_module_identifier); + } else { + // entry + let execute_task = ctrl_task + .execute_task_map + .remove(&connection.dependency_id) + .expect("should have execute task"); + res.push(Box::new(execute_task)); + } + } + + for id in original_module_identifiers { + let value = ctrl_task + .running_module_map + .get_mut(&id) + .expect("should have counter"); + value.minus_one(); + if value.is_finished() { + queue.push_back(id); + } + } + } + + res.push(ctrl_task); + Ok(res) + } +} diff --git a/crates/rspack_core/src/compiler/module_executor/entry.rs b/crates/rspack_core/src/compiler/module_executor/entry.rs new file mode 100644 index 00000000000..b3bc357d358 --- /dev/null +++ b/crates/rspack_core/src/compiler/module_executor/entry.rs @@ -0,0 +1,71 @@ +use tokio::sync::mpsc::UnboundedSender; + +use super::ctrl::Event; +use crate::{ + compiler::make::repair::{factorize::FactorizeTask, MakeTaskContext}, + utils::task_loop::{Task, TaskResult, TaskType}, + Dependency, DependencyId, EntryDependency, ModuleProfile, +}; + +#[derive(Debug)] +pub enum EntryParam { + DependencyId(DependencyId, UnboundedSender), + EntryDependency(Box), +} + +#[derive(Debug)] +pub struct EntryTask { + pub param: EntryParam, +} + +impl Task for EntryTask { + fn get_task_type(&self) -> TaskType { + TaskType::Sync + } + + fn sync_run(self: Box, context: &mut MakeTaskContext) -> TaskResult { + let Self { param } = *self; + let mut module_graph = MakeTaskContext::get_module_graph_mut(&mut context.module_graph_partial); + + match param { + EntryParam::DependencyId(dep_id, sender) => { + if let Some(module_identifier) = module_graph.module_identifier_by_dependency_id(&dep_id) { + sender + .send(Event::FinishDeps(None, dep_id, Some(*module_identifier))) + .expect("should success"); + } else { + // no module_identifier means the factorize task not run, do nothing + } + Ok(vec![]) + } + EntryParam::EntryDependency(dep) => { + let dep_id = *dep.id(); + module_graph.add_dependency(dep.clone()); + Ok(vec![Box::new(FactorizeTask { + module_factory: context + .dependency_factories + .get(dep.dependency_type()) + .expect("should have dependency_factories") + .clone(), + original_module_identifier: None, + original_module_source: None, + issuer: None, + original_module_context: None, + dependency: dep, + dependencies: vec![dep_id], + is_entry: true, + resolve_options: None, + resolver_factory: context.resolver_factory.clone(), + loader_resolver_factory: context.loader_resolver_factory.clone(), + options: context.compiler_options.clone(), + plugin_driver: context.plugin_driver.clone(), + cache: context.cache.clone(), + current_profile: context + .compiler_options + .profile + .then(Box::::default), + })]) + } + } + } +} diff --git a/crates/rspack_core/src/compiler/module_executor.rs b/crates/rspack_core/src/compiler/module_executor/execute.rs similarity index 69% rename from crates/rspack_core/src/compiler/module_executor.rs rename to crates/rspack_core/src/compiler/module_executor/execute.rs index 0990a54e098..7c46627f183 100644 --- a/crates/rspack_core/src/compiler/module_executor.rs +++ b/crates/rspack_core/src/compiler/module_executor/execute.rs @@ -1,20 +1,19 @@ -use std::sync::{atomic::AtomicU32, Arc}; -use std::{hash::BuildHasherDefault, iter::once}; +use std::{iter::once, sync::atomic::AtomicU32}; -use dashmap::DashMap; use rayon::prelude::*; use rspack_error::Result; -use rspack_identifier::{Identifiable, IdentifierSet}; -use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet, FxHasher}; +use rspack_identifier::IdentifierSet; +use rustc_hash::FxHashSet as HashSet; +use tokio::{runtime::Handle, sync::oneshot::Sender}; -use crate::cache::Cache; +// use tokio::sync::Sender use crate::{ + compiler::make::repair::MakeTaskContext, + utils::task_loop::{Task, TaskResult, TaskType}, Chunk, ChunkGraph, ChunkKind, CodeGenerationDataAssetInfo, CodeGenerationDataFilename, - CodeGenerationResult, Dependency, DependencyType, EntryDependency, EntryOptions, Entrypoint, - ModuleFactory, RuntimeSpec, SourceType, + CodeGenerationResult, CompilationAsset, CompilationAssets, DependencyId, EntryOptions, + Entrypoint, RuntimeSpec, SourceType, }; -use crate::{Compilation, CompilationAsset, MakeParam}; -use crate::{CompilerOptions, Context, ResolverFactory, SharedPluginDriver}; static EXECUTE_MODULE_ID: AtomicU32 = AtomicU32::new(0); pub type ExecuteModuleId = u32; @@ -29,65 +28,38 @@ pub struct ExecuteModuleResult { pub id: ExecuteModuleId, } -#[derive(Debug, Default)] -pub struct ModuleExecutor { - pub assets: DashMap, +#[derive(Debug)] +pub struct ExecuteTask { + pub entry_dep_id: DependencyId, + pub public_path: Option, + pub base_uri: Option, + pub result_sender: Sender<(Result, CompilationAssets)>, } -impl ModuleExecutor { - #[allow(clippy::too_many_arguments)] - pub async fn import_module( - &self, - options: Arc, - plugin_driver: SharedPluginDriver, - resolver_factory: Arc, - loader_resolver_factory: Arc, - cache: Arc, - dependency_factories: HashMap>, - - request: String, - public_path: Option, - base_uri: Option, - original_module_context: Option, - ) -> Result { - let mut compilation = Compilation::new( - options, - plugin_driver, - resolver_factory, - loader_resolver_factory, - None, - cache, - None, - ); - compilation.dependency_factories = dependency_factories; +impl Task for ExecuteTask { + fn get_task_type(&self) -> TaskType { + TaskType::Sync + } - let mut mg = compilation.get_module_graph_mut(); - let dep_id = { - let dep = EntryDependency::new( - request, - original_module_context.unwrap_or(Context::from("")), - ); - let dep_id = *dep.id(); - mg.add_dependency(Box::new(dep)); - dep_id - }; + fn sync_run(self: Box, context: &mut MakeTaskContext) -> TaskResult { + let Self { + entry_dep_id, + public_path, + base_uri, + result_sender, + } = *self; - compilation - .make(vec![MakeParam::new_force_build_dep_param(dep_id, None)]) - .await?; + let mut compilation = context.transform_to_temp_compilation(); let id = EXECUTE_MODULE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let mg = compilation.get_module_graph_mut(); - let module = mg - .get_module_by_dependency_id(&dep_id) + let entry_module_identifier = mg + .get_module_by_dependency_id(&entry_dep_id) .expect("should have module") .identifier(); - let mut queue = vec![module]; - let mut modules: std::collections::HashSet< - rspack_identifier::Identifier, - BuildHasherDefault, - > = HashSet::default(); + let mut queue = vec![entry_module_identifier]; + let mut modules = HashSet::default(); while let Some(m) = queue.pop() { modules.insert(m); @@ -137,7 +109,11 @@ impl ModuleExecutor { let chunk = compilation.chunk_by_ukey.add(chunk); let chunk_ukey = chunk.ukey; - chunk_graph.connect_chunk_and_entry_module(chunk.ukey, module, entrypoint.ukey); + chunk_graph.connect_chunk_and_entry_module( + chunk.ukey, + entry_module_identifier, + entrypoint.ukey, + ); entrypoint.connect_chunk(chunk); entrypoint.set_runtime_chunk(chunk.ukey); entrypoint.set_entry_point_chunk(chunk.ukey); @@ -168,14 +144,16 @@ impl ModuleExecutor { compilation.code_generation_modules(&mut None, false, modules.par_iter().copied())?; - compilation - .process_runtime_requirements( - modules.clone(), - once(chunk_ukey), - once(chunk_ukey), - compilation.plugin_driver.clone(), - ) - .await?; + Handle::current().block_on(async { + compilation + .process_runtime_requirements( + modules.clone(), + once(chunk_ukey), + once(chunk_ukey), + compilation.plugin_driver.clone(), + ) + .await + })?; let runtime_modules = compilation .chunk_graph @@ -215,7 +193,12 @@ impl ModuleExecutor { .plugin_driver .compilation_hooks .execute_module - .call(&module, &runtime_modules, &codegen_results, &id); + .call( + &entry_module_identifier, + &runtime_modules, + &codegen_results, + &id, + ); let module_graph = compilation.get_module_graph(); let mut execute_result = match exports { @@ -265,18 +248,14 @@ impl ModuleExecutor { Err(e) => Err(e), }; + let assets = std::mem::take(compilation.assets_mut()); if let Ok(ref mut result) = execute_result { - let assets = std::mem::take(compilation.assets_mut()); - for (key, value) in assets { - result.assets.insert(key.clone()); - self.assets.insert(key, value); - } + result.assets = assets.keys().cloned().collect::>(); } - - for error in compilation.get_errors() { - error.render_report(true)?; - } - - execute_result + context.recovery_from_temp_compilation(compilation); + result_sender + .send((execute_result, assets)) + .expect("should send result success"); + Ok(vec![]) } } diff --git a/crates/rspack_core/src/compiler/module_executor/mod.rs b/crates/rspack_core/src/compiler/module_executor/mod.rs new file mode 100644 index 00000000000..28344df5cc2 --- /dev/null +++ b/crates/rspack_core/src/compiler/module_executor/mod.rs @@ -0,0 +1,178 @@ +mod ctrl; +mod entry; +mod execute; +mod overwrite; + +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; +pub use execute::ExecuteModuleId; +use rspack_error::Result; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, +}; + +use self::{ + ctrl::{CtrlTask, Event}, + entry::EntryParam, + execute::{ExecuteModuleResult, ExecuteTask}, + overwrite::OverwriteTask, +}; +use super::make::{repair::MakeTaskContext, update_module_graph_with_artifact, MakeArtifact}; +use crate::{ + task_loop::run_task_loop_with_event, Compilation, CompilationAsset, Context, Dependency, + DependencyId, EntryDependency, MakeParam, +}; + +#[derive(Debug, Default)] +pub struct ModuleExecutor { + request_dep_map: DashMap, + make_artifact: MakeArtifact, + + event_sender: Option>, + stop_receiver: Option>, + assets: DashMap, +} + +impl ModuleExecutor { + pub async fn hook_before_make( + &mut self, + compilation: &Compilation, + global_params: &Vec, + ) { + let mut make_artifact = std::mem::take(&mut self.make_artifact); + let mut params = vec![]; + for param in global_params { + if matches!(param, MakeParam::DeletedFiles(_)) { + params.push(param.clone()); + } + if matches!(param, MakeParam::ModifiedFiles(_)) { + params.push(param.clone()); + } + } + if !make_artifact.make_failed_dependencies.is_empty() { + let deps = std::mem::take(&mut make_artifact.make_failed_dependencies); + params.push(MakeParam::ForceBuildDeps(deps)); + } + if !make_artifact.make_failed_module.is_empty() { + let modules = std::mem::take(&mut make_artifact.make_failed_module); + params.push(MakeParam::ForceBuildModules(modules)); + } + + make_artifact = if let Ok(artifact) = + update_module_graph_with_artifact(compilation, make_artifact, params).await + { + artifact + } else { + MakeArtifact::default() + }; + + let mut ctx = MakeTaskContext::new(compilation, make_artifact); + let (event_sender, event_receiver) = unbounded_channel(); + let (stop_sender, stop_receiver) = oneshot::channel(); + self.event_sender = Some(event_sender.clone()); + self.stop_receiver = Some(stop_receiver); + + tokio::spawn(async move { + let _ = run_task_loop_with_event( + &mut ctx, + vec![Box::new(CtrlTask::new(event_receiver))], + |_, task| { + Box::new(OverwriteTask { + origin_task: task, + event_sender: event_sender.clone(), + }) + }, + ); + + stop_sender + .send(ctx.transform_to_make_artifact()) + .expect("should success"); + }); + } + + pub async fn hook_before_process_assets(&mut self, compilation: &mut Compilation) { + let sender = std::mem::take(&mut self.event_sender); + sender + .expect("should have sender") + .send(Event::Stop()) + .expect("should success"); + + let stop_receiver = std::mem::take(&mut self.stop_receiver); + if let Ok(make_artifact) = stop_receiver.expect("should have receiver").await { + self.make_artifact = make_artifact; + } else { + panic!("receive make artifact failed"); + } + + let assets = std::mem::take(&mut self.assets); + for (filename, asset) in assets { + compilation.emit_asset(filename, asset); + } + + let diagnostics = std::mem::take(&mut self.make_artifact.diagnostics); + compilation.push_batch_diagnostic(diagnostics); + + compilation + .file_dependencies + .extend(self.make_artifact.file_dependencies.iter().cloned()); + compilation + .context_dependencies + .extend(self.make_artifact.context_dependencies.iter().cloned()); + compilation + .missing_dependencies + .extend(self.make_artifact.missing_dependencies.iter().cloned()); + compilation + .build_dependencies + .extend(self.make_artifact.build_dependencies.iter().cloned()); + } + + #[allow(clippy::too_many_arguments)] + pub async fn import_module( + &self, + request: String, + public_path: Option, + base_uri: Option, + original_module_context: Option, + ) -> Result { + let sender = self + .event_sender + .as_ref() + .expect("should have event sender"); + let (param, dep_id) = match self.request_dep_map.entry(request.clone()) { + Entry::Vacant(v) => { + let dep = EntryDependency::new( + request.clone(), + original_module_context.unwrap_or(Context::from("")), + ); + let dep_id = *dep.id(); + v.insert(dep_id); + (EntryParam::EntryDependency(Box::new(dep)), dep_id) + } + Entry::Occupied(v) => { + let dep_id = *v.get(); + (EntryParam::DependencyId(dep_id, sender.clone()), dep_id) + } + }; + + let (tx, rx) = oneshot::channel(); + sender + .send(Event::ExecuteModule( + param, + ExecuteTask { + entry_dep_id: dep_id, + public_path, + base_uri, + result_sender: tx, + }, + )) + .expect("should success"); + let (execute_result, assets) = rx.await.expect("should receiver success"); + + for (key, value) in assets { + self.assets.insert(key, value); + } + + execute_result + } +} diff --git a/crates/rspack_core/src/compiler/module_executor/overwrite.rs b/crates/rspack_core/src/compiler/module_executor/overwrite.rs new file mode 100644 index 00000000000..14bb424ac45 --- /dev/null +++ b/crates/rspack_core/src/compiler/module_executor/overwrite.rs @@ -0,0 +1,92 @@ +use tokio::sync::mpsc::UnboundedSender; + +use super::ctrl::Event; +use crate::{ + compiler::make::repair::{ + add::AddTask, factorize::FactorizeResultTask, process_dependencies::ProcessDependenciesTask, + MakeTaskContext, + }, + utils::task_loop::{Task, TaskResult, TaskType}, +}; + +pub struct OverwriteTask { + pub origin_task: Box>, + pub event_sender: UnboundedSender, +} + +#[async_trait::async_trait] +impl Task for OverwriteTask { + fn get_task_type(&self) -> TaskType { + self.origin_task.get_task_type() + } + + fn sync_run(self: Box, context: &mut MakeTaskContext) -> TaskResult { + let Self { + origin_task, + event_sender, + } = *self; + // process dependencies + if let Some(process_dependencies_task) = origin_task + .as_any() + .downcast_ref::() + { + let original_module_identifier = process_dependencies_task.original_module_identifier; + let res = origin_task.sync_run(context)?; + event_sender + .send(Event::FinishModule(original_module_identifier, res.len())) + .expect("should success"); + return Ok(res); + } + + // factorize result task + if let Some(factorize_result_task) = origin_task.as_any().downcast_ref::() + { + let dep_id = factorize_result_task + .dependencies + .first() + .cloned() + .expect("should have dep_id"); + let original_module_identifier = factorize_result_task.original_module_identifier; + let res = origin_task.sync_run(context)?; + if res.is_empty() { + event_sender + .send(Event::FinishDeps(original_module_identifier, dep_id, None)) + .expect("should success"); + } + return Ok(res); + } + // add task + if let Some(add_task) = origin_task.as_any().downcast_ref::() { + let dep_id = add_task + .dependencies + .first() + .cloned() + .expect("should have dep_id"); + let original_module_identifier = add_task.original_module_identifier; + let target_module_identifier = add_task.module.identifier(); + + let res = origin_task.sync_run(context)?; + if res.is_empty() { + event_sender + .send(Event::FinishDeps( + original_module_identifier, + dep_id, + Some(target_module_identifier), + )) + .expect("should success"); + } else { + event_sender + .send(Event::StartBuild(target_module_identifier)) + .expect("should success"); + } + return Ok(res); + } + + // other task + origin_task.sync_run(context) + } + + async fn async_run(self: Box) -> TaskResult { + self.origin_task.async_run().await + } +} diff --git a/crates/rspack_core/src/utils/task_loop.rs b/crates/rspack_core/src/utils/task_loop.rs index d285ef637dd..17357deeb9d 100644 --- a/crates/rspack_core/src/utils/task_loop.rs +++ b/crates/rspack_core/src/utils/task_loop.rs @@ -1,4 +1,5 @@ use std::{ + any::Any, collections::VecDeque, sync::{ atomic::{AtomicBool, Ordering}, @@ -7,7 +8,11 @@ use std::{ }; use rspack_error::Result; -use tokio::sync::mpsc::{self, error::TryRecvError}; +use rspack_util::ext::AsAny; +use tokio::{ + runtime::Handle, + sync::mpsc::{self, error::TryRecvError}, +}; /// Result returned by task /// @@ -26,11 +31,11 @@ pub enum TaskType { /// /// See test for more example #[async_trait::async_trait] -pub trait Task: Send { +pub trait Task: Send + Any + AsAny { /// Return the task type /// - /// return `TaskType::Sync` will run `self::sync_run` - /// return `TaskType::Async` will run `self::async_run` + /// Return `TaskType::Sync` will run `self::sync_run` + /// Return `TaskType::Async` will run `self::async_run` fn get_task_type(&self) -> TaskType; /// Sync task process @@ -50,6 +55,15 @@ pub trait Task: Send { pub fn run_task_loop( ctx: &mut Ctx, init_tasks: Vec>>, +) -> Result<()> { + run_task_loop_with_event(ctx, init_tasks, |_, task| task) +} + +/// Run task loop with event +pub fn run_task_loop_with_event( + ctx: &mut Ctx, + init_tasks: Vec>>, + before_task_run: impl Fn(&mut Ctx, Box>) -> Box>, ) -> Result<()> { // create channel to receive async task result let (tx, mut rx) = mpsc::unbounded_channel::>(); @@ -65,6 +79,7 @@ pub fn run_task_loop( } if let Some(task) = task { + let task = before_task_run(ctx, task); match task.get_task_type() { TaskType::Async => { let tx = tx.clone(); @@ -90,7 +105,16 @@ pub fn run_task_loop( } } - match rx.try_recv() { + let data = if queue.is_empty() && active_task_count != 0 { + Handle::current().block_on(async { + let res = rx.recv().await.expect("should recv success"); + Ok(res) + }) + } else { + rx.try_recv() + }; + + match data { Ok(r) => { active_task_count -= 1; // merge async task result