From 056987fc3e7fe87294aa94bbb40708c06b22900b Mon Sep 17 00:00:00 2001 From: GuiMacielPereira Date: Thu, 7 Nov 2024 17:04:21 +0000 Subject: [PATCH] Create runner class to run routine Put two files in one and using a class simplifies logic (+ much easier to read) --- src/mvesuvio/main/__init__.py | 5 +- src/mvesuvio/main/analysis_runner.py | 46 ------- src/mvesuvio/main/run_routine.py | 173 +++++++++++++++++++++++++++ src/mvesuvio/run_routine.py | 131 -------------------- 4 files changed, 175 insertions(+), 180 deletions(-) delete mode 100644 src/mvesuvio/main/analysis_runner.py create mode 100644 src/mvesuvio/main/run_routine.py delete mode 100644 src/mvesuvio/run_routine.py diff --git a/src/mvesuvio/main/__init__.py b/src/mvesuvio/main/__init__.py index 7a53562..1833d16 100644 --- a/src/mvesuvio/main/__init__.py +++ b/src/mvesuvio/main/__init__.py @@ -87,9 +87,8 @@ def __run_analysis(yes_to_all): environ["MANTIDPROPERTIES"] = path.join( handle_config.VESUVIO_CONFIG_PATH, "Mantid.user.properties" ) - from mvesuvio.main import analysis_runner - - analysis_runner.run(yes_to_all) + from mvesuvio.main.run_routine import Runner + Runner().run() if __name__ == "__main__": diff --git a/src/mvesuvio/main/analysis_runner.py b/src/mvesuvio/main/analysis_runner.py deleted file mode 100644 index 71a70d7..0000000 --- a/src/mvesuvio/main/analysis_runner.py +++ /dev/null @@ -1,46 +0,0 @@ -import time -from pathlib import Path -import importlib -import sys -from mvesuvio.run_routine import runRoutine -from mvesuvio.util import handle_config - - -def run(yes_to_all=False): - inputs_path = Path(handle_config.read_config_var("caching.inputs")) - - ai = import_from_path(inputs_path, "analysis_inputs") - - start_time = time.time() - - wsBackIC = ai.LoadVesuvioBackParameters - wsFrontIC = ai.LoadVesuvioFrontParameters - bckwdIC = ai.BackwardInitialConditions - fwdIC = ai.ForwardInitialConditions - yFitIC = ai.YSpaceFitInitialConditions - userCtr = ai.UserScriptControls - - runRoutine( - userCtr, - wsBackIC, - wsFrontIC, - bckwdIC, - fwdIC, - yFitIC, - yes_to_all, - ) - - end_time = time.time() - print("\nRunning time: ", end_time - start_time, " seconds") - - -def import_from_path(path, name): - spec = importlib.util.spec_from_file_location(name, path) - module = importlib.util.module_from_spec(spec) - sys.modules[name] = module - spec.loader.exec_module(module) - return module - - -if __name__ == "__main__": - run() diff --git a/src/mvesuvio/main/run_routine.py b/src/mvesuvio/main/run_routine.py new file mode 100644 index 0000000..d4de5ca --- /dev/null +++ b/src/mvesuvio/main/run_routine.py @@ -0,0 +1,173 @@ +from mvesuvio.util.process_inputs import ( + completeICFromInputs, + completeYFitIC, +) +from mvesuvio.analysis_fitting import fitInYSpaceProcedure +from mvesuvio.analysis_routines import ( + runIndependentIterativeProcedure, + runJointBackAndForwardProcedure, + runPreProcToEstHRatio, +) +from mvesuvio.util import handle_config + +from mantid.api import mtd +import numpy as np + +import time +from pathlib import Path +import importlib +import sys + + +class Runner: + def __init__(self, yes_to_all=False, running_tests=False) -> None: + self.yes_to_all = yes_to_all + self.running_tests = running_tests + self.inputs_path = Path(handle_config.read_config_var("caching.inputs")) + self.setup() + + def setup(self): + + ai = self.import_from_inputs() + + self.wsBackIC = ai.LoadVesuvioBackParameters + self.wsFrontIC = ai.LoadVesuvioFrontParameters + self.bckwdIC = ai.BackwardInitialConditions + self.fwdIC = ai.ForwardInitialConditions + self.yFitIC = ai.YSpaceFitInitialConditions + self.userCtr = ai.UserScriptControls + + # Set extra attributes from user attributes + completeICFromInputs(self.fwdIC, self.wsFrontIC) + completeICFromInputs(self.bckwdIC, self.wsBackIC) + completeYFitIC(self.yFitIC) + checkInputs(self.userCtr) + + # Names of workspaces to check if they exist to do fitting + self.ws_to_fit_y_space = [] + self.classes_to_fit_y_space = [] + for mode, i_cls in zip(["BACKWARD", "FORWARD"], [self.bckwdIC, self.fwdIC]): + if (self.userCtr.fitInYSpace == mode) | (self.userCtr.fitInYSpace == "JOINT"): + self.ws_to_fit_y_space.append(i_cls.name + '_' + str(i_cls.noOfMSIterations)) + self.classes_to_fit_y_space.append(i_cls) + + self.analysis_result = None + self.fitting_result = None + + + def import_from_inputs(self): + name = "analysis_inputs" + spec = importlib.util.spec_from_file_location(name, self.inputs_path) + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module + spec.loader.exec_module(module) + return module + + + def run(self): + start_time = time.time() + + if not self.userCtr.runRoutine: + return + # Default workflow for procedure + fit in y space + + # If any ws for y fit already loaded + wsInMtd = [ws in mtd for ws in self.ws_to_fit_y_space] # Bool list + if (len(wsInMtd) > 0) and all(wsInMtd): + self.runAnalysisFitting() + return + + checkUserClearWS(self.yes_to_all) # Check if user is OK with cleaning all workspaces + + self.runAnalysisRoutine() + self.runAnalysisFitting() + + end_time = time.time() + print("\nRunning time: ", end_time - start_time, " seconds") + + return self.analysis_result, self.fitting_result # Return results used only in tests + + + def runAnalysisFitting(self): + for wsName, i_cls in zip(self.ws_to_fit_y_space, self.classes_to_fit_y_space): + resYFit = fitInYSpaceProcedure(self.yFitIC, i_cls, wsName) + self.fitting_result = resYFit + + + def runAnalysisRoutine(self): + routine_type = self.userCtr.procedure + + if routine_type is None: + return + + if (routine_type == "BACKWARD") | (routine_type== "JOINT"): + + if isHPresent(self.fwdIC.masses) & (self.bckwdIC.HToMassIdxRatio==0): + runPreProcToEstHRatio(self.bckwdIC, self.fwdIC) + return + + assert isHPresent(self.fwdIC.masses) != ( + self.bckwdIC.HToMassIdxRatio==0 + ), "When H is not present, HToMassIdxRatio has to be set to None" + + if routine_type == "BACKWARD": + res = runIndependentIterativeProcedure(self.bckwdIC, running_tests=self.running_tests) + if routine_type == "FORWARD": + res = runIndependentIterativeProcedure(self.fwdIC, running_tests=self.running_tests) + if routine_type == "JOINT": + res = runJointBackAndForwardProcedure(self.bckwdIC, self.fwdIC) + + self.analysis_result = res + return + + +def checkUserClearWS(yes_to_all=False): + """If any workspace is loaded, check if user is sure to start new procedure.""" + + if not yes_to_all and len(mtd) != 0: + userInput = input( + "This action will clean all current workspaces to start anew. Proceed? (y/n): " + ) + if (userInput == "y") | (userInput == "Y"): + pass + else: + raise KeyboardInterrupt("Run of procedure canceled.") + return + + +def checkInputs(crtIC): + try: + if ~crtIC.runRoutine: + return + except AttributeError: + if ~crtIC.runBootstrap: + return + + for flag in [crtIC.procedure, crtIC.fitInYSpace]: + assert ( + (flag == "BACKWARD") + | (flag == "FORWARD") + | (flag == "JOINT") + | (flag is None) + ), "Option not recognized." + + if (crtIC.procedure != "JOINT") & (crtIC.fitInYSpace is not None): + assert crtIC.procedure == crtIC.fitInYSpace + + +def isHPresent(masses) -> bool: + Hmask = np.abs(masses - 1) / 1 < 0.1 # H mass whithin 10% of 1 au + + if np.any(Hmask): # H present + print("\nH mass detected.\n") + assert ( + len(Hmask) > 1 + ), "When H is only mass present, run independent forward procedure, not joint." + assert Hmask[0], "H mass needs to be the first mass in masses and initPars." + assert sum(Hmask) == 1, "More than one mass very close to H were detected." + return True + else: + return False + +if __name__ == "__main__": + Runner().run() diff --git a/src/mvesuvio/run_routine.py b/src/mvesuvio/run_routine.py deleted file mode 100644 index 9b4809b..0000000 --- a/src/mvesuvio/run_routine.py +++ /dev/null @@ -1,131 +0,0 @@ -from mvesuvio.util.process_inputs import ( - buildFinalWSName, - completeICFromInputs, - completeYFitIC, -) -from mvesuvio.analysis_fitting import fitInYSpaceProcedure -from mvesuvio.analysis_routines import ( - runIndependentIterativeProcedure, - runJointBackAndForwardProcedure, - runPreProcToEstHRatio, -) - -from mantid.api import mtd -import numpy as np - -def runRoutine( - userCtr, - wsBackIC, - wsFrontIC, - bckwdIC, - fwdIC, - yFitIC, - yes_to_all=False, - running_tests=False -): - # Set extra attributes from user attributes - completeICFromInputs(fwdIC, wsFrontIC) - completeICFromInputs(bckwdIC, wsBackIC) - completeYFitIC(yFitIC) - checkInputs(userCtr) - - def runProcedure(): - proc = userCtr.procedure # Shorthad to make it easier to read - - if proc is None: - return - - if (proc == "BACKWARD") | (proc == "JOINT"): - - if isHPresent(fwdIC.masses) & (bckwdIC.HToMassIdxRatio==0): - runPreProcToEstHRatio(bckwdIC, fwdIC) - return - - assert isHPresent(fwdIC.masses) != ( - bckwdIC.HToMassIdxRatio==0 - ), "When H is not present, HToMassIdxRatio has to be set to None" - - if proc == "BACKWARD": - res = runIndependentIterativeProcedure(bckwdIC, running_tests=running_tests) - if proc == "FORWARD": - res = runIndependentIterativeProcedure(fwdIC, running_tests=running_tests) - if proc == "JOINT": - res = runJointBackAndForwardProcedure(bckwdIC, fwdIC) - return res - - # Names of workspaces to be fitted in y space - wsNames = [] - ICs = [] - for mode, IC in zip(["BACKWARD", "FORWARD"], [bckwdIC, fwdIC]): - if (userCtr.fitInYSpace == mode) | (userCtr.fitInYSpace == "JOINT"): - wsNames.append(IC.name + '_' + str(IC.noOfMSIterations)) - ICs.append(IC) - - # Default workflow for procedure + fit in y space - if userCtr.runRoutine: - # Check if final ws are loaded: - wsInMtd = [ws in mtd for ws in wsNames] # Bool list - if (len(wsInMtd) > 0) and all( - wsInMtd - ): # When wsName is empty list, loop doesn't run - for wsName, IC in zip(wsNames, ICs): - resYFit = fitInYSpaceProcedure(yFitIC, IC, wsName) - return None, resYFit # To match return below. - - checkUserClearWS(yes_to_all) # Check if user is OK with cleaning all workspaces - res = runProcedure() - - resYFit = None - for wsName, IC in zip(wsNames, ICs): - resYFit = fitInYSpaceProcedure(yFitIC, IC, wsName) - - return res, resYFit # Return results used only in tests - - -def checkUserClearWS(yes_to_all=False): - """If any workspace is loaded, check if user is sure to start new procedure.""" - - if not yes_to_all and len(mtd) != 0: - userInput = input( - "This action will clean all current workspaces to start anew. Proceed? (y/n): " - ) - if (userInput == "y") | (userInput == "Y"): - pass - else: - raise KeyboardInterrupt("Run of procedure canceled.") - return - - -def checkInputs(crtIC): - try: - if ~crtIC.runRoutine: - return - except AttributeError: - if ~crtIC.runBootstrap: - return - - for flag in [crtIC.procedure, crtIC.fitInYSpace]: - assert ( - (flag == "BACKWARD") - | (flag == "FORWARD") - | (flag == "JOINT") - | (flag is None) - ), "Option not recognized." - - if (crtIC.procedure != "JOINT") & (crtIC.fitInYSpace is not None): - assert crtIC.procedure == crtIC.fitInYSpace - - -def isHPresent(masses) -> bool: - Hmask = np.abs(masses - 1) / 1 < 0.1 # H mass whithin 10% of 1 au - - if np.any(Hmask): # H present - print("\nH mass detected.\n") - assert ( - len(Hmask) > 1 - ), "When H is only mass present, run independent forward procedure, not joint." - assert Hmask[0], "H mass needs to be the first mass in masses and initPars." - assert sum(Hmask) == 1, "More than one mass very close to H were detected." - return True - else: - return False