diff --git a/CADETPythonSimulator/__init__.py b/CADETPythonSimulator/__init__.py index 400bea1..d0bf1da 100644 --- a/CADETPythonSimulator/__init__.py +++ b/CADETPythonSimulator/__init__.py @@ -1,3 +1,3 @@ -# Version information +"""Version information.""" name = "CADET-Python-Simulator" __version__ = "0.0.1" diff --git a/CADETPythonSimulator/cake_compressibility.py b/CADETPythonSimulator/cake_compressibility.py index 0ae5e84..6a6c493 100644 --- a/CADETPythonSimulator/cake_compressibility.py +++ b/CADETPythonSimulator/cake_compressibility.py @@ -21,6 +21,7 @@ def specific_resistance(self, delta_p: float) -> float: ------- float Specific pressure difference. + """ return @@ -33,6 +34,7 @@ class NoCakeCompressibility(CakeCompressibilityBase): ---------- cake_resistance: float Constant value of cake resistance factor. + """ cake_resistance = UnsignedFloat() @@ -50,6 +52,7 @@ def specific_resistance(self, delta_p: float) -> float: ------- float Specific pressure difference. + """ return self.cake_resistance @@ -64,6 +67,7 @@ class LinearCakeCompressibility(CakeCompressibilityBase): Base value of cake resistance factor. cake_resistance_linear: float Slope of cake resistance factor. + """ cake_resistance_base = UnsignedFloat() @@ -82,5 +86,6 @@ def specific_resistance(self, delta_p: float) -> float: ------- float Specific pressure difference. + """ return self.cake_resistance_base + self.cake_resistance_linear * delta_p diff --git a/CADETPythonSimulator/componentsystem.py b/CADETPythonSimulator/componentsystem.py index 383121e..1aba402 100644 --- a/CADETPythonSimulator/componentsystem.py +++ b/CADETPythonSimulator/componentsystem.py @@ -5,6 +5,7 @@ from CADETPythonSimulator.exception import CADETPythonSimError from functools import wraps + class CPSSpecies(Structure): """ Species class. @@ -34,6 +35,7 @@ class CPSSpecies(Structure): density = UnsignedFloat() molecular_volume = UnsignedFloat() + class CPSComponent(Component): """ Information about single component. @@ -100,6 +102,7 @@ def __init__(self, raise CADETPythonSimError("Could not determine number of species") def add_species(self, species, *args, **kwargs): + """Add a species to the component System.""" if not isinstance(species, CPSSpecies): species = CPSSpecies(species, *args, **kwargs) self._species.append(species) @@ -119,6 +122,7 @@ def molecular_weight(self): """List of float or None: The molecular weights of the subspecies.""" return [spec.molecular_weight for spec in self.species] + class CPSComponentSystem(ComponentSystem): """ Component System Class. diff --git a/CADETPythonSimulator/coupling_interface.py b/CADETPythonSimulator/coupling_interface.py index f8039ec..3961db0 100644 --- a/CADETPythonSimulator/coupling_interface.py +++ b/CADETPythonSimulator/coupling_interface.py @@ -15,7 +15,7 @@ def get_coupled_state( """Calculate new state for destination_unit.""" -class AverageCoupling(CouplingInterface): +class WeightedAverageCoupling(CouplingInterface): """Implements the Coupling Interface for average Coupling.""" def get_coupled_state(self, @@ -26,7 +26,7 @@ def get_coupled_state(self, ret = np.zeros(origin_list[0][0][state].shape) rate_tot = 0 for list_state, rate in origin_list: - ret += list_state[state]*rate + ret += list_state[state] * rate rate_tot += rate ret /= rate_tot diff --git a/CADETPythonSimulator/residual.py b/CADETPythonSimulator/residual.py index 7dac7e5..1adcfab 100644 --- a/CADETPythonSimulator/residual.py +++ b/CADETPythonSimulator/residual.py @@ -36,7 +36,6 @@ def calculate_residual_volume_cstr( return V_dot - Q_in + Q_out - def calculate_residual_concentration_cstr( c: np.ndarray, c_dot: np.ndarray, @@ -72,14 +71,12 @@ def calculate_residual_concentration_cstr( return c_dot * V + V_dot * c - Q_in * c_in + Q_out * c - def calculate_residual_visc_cstr(): """Calculate the residual of the Viscosity equation of the CSTR.""" warnings.warn("Viscosity of CSTR not yet implemented") return 0 - def calculate_residual_cake_vol_def( V_dot_f: float, rejection: np.ndarray, @@ -106,7 +103,6 @@ def calculate_residual_cake_vol_def( """ return -V_dot_C + np.sum(rejection * molar_volume * c_in * V_dot_f) - def calculate_residual_press_easy_def( V_dot_Perm: float, V_C: float, @@ -144,8 +140,6 @@ def calculate_residual_press_easy_def( return -V_dot_Perm + deltap * A *hyd_resistance - - def calculate_residual_visc_def(): """Calculate the residual of the Viscosity equation of the CSTR.""" warnings.warn("Viscosity of def not yet implemented") diff --git a/CADETPythonSimulator/solver.py b/CADETPythonSimulator/solver.py index b3e4209..1d86156 100644 --- a/CADETPythonSimulator/solver.py +++ b/CADETPythonSimulator/solver.py @@ -20,10 +20,9 @@ def __init__(self, system: SystemBase, sections: list[dict]): self._system: Optional[SystemBase] = system self._setup_sections(sections) - def _setup_sections(self, sections): """ - Check Connections. + Set up sections. Converts dict sections into addict.Dicts. Checks if start and end times are continusly @@ -41,7 +40,6 @@ def _setup_sections(self, sections): previous_end = section.end self.sections = sections - def initialize_solver(self, solver: str = 'ida') -> NoReturn: """ Initialize solver. @@ -63,7 +61,7 @@ def system(self) -> SystemBase: return self._system @system.setter - def system(self, system:SystemBase) -> NoReturn: + def system(self, system: SystemBase) -> NoReturn: """Setter for System.""" self._system = system @@ -146,8 +144,8 @@ def write_solution( for sol_tuple in state_dict.values(): itp = it + sol_tuple["values"].shape[1] - y = y_history[:,it:itp] - ydot = y_dot_history[:,it:itp] + y = y_history[:, it:itp] + ydot = y_dot_history[:, it:itp] sol_tuple["values"] = np.concatenate((sol_tuple["values"], y)) sol_tuple["derivatives"] = np.concatenate(( @@ -221,18 +219,16 @@ def get_section_solution_times(self, section: Dict) -> np.ndarray: Parameters ---------- section : Dict - Section Dict that describes the Parameters + Section dict that describes the Parameters """ - # TODO: How to get section_solution_times from section.start, section.end, if + # TODO: How to get section_solution_times from section.start, section.end, if # user_solution times are provided? time_resolution = section.get("time_resolution", 1.0) start = section.start end = section.end return np.arange(start, end, time_resolution) - - def _update_unit_operation_parameters( self, start: float, @@ -240,8 +236,8 @@ def _update_unit_operation_parameters( unit_operation_parameters: dict[ UnitOperationBase | str, dict[str, npt.ArrayLike] - ] - ) -> np.ndarray: + ] + ) -> np.ndarray: """ Update time dependent unit operation parameters. @@ -266,27 +262,3 @@ def _update_unit_operation_parameters( raise CADETPythonSimError(f"Unit {unit} is not Part of the System.") unit.update_parameters(start, end, parameters) - - # @property - # def port_mapping(self) -> dict[int, str]: - # """dict: Mapping of port indices to corresponding state entries.""" - # # TODO: Let this be handled by the SystemSolver? - # port_mapping = defaultdict(dict) - - # counter = 0 - # for mapped_state, n_ports in self.inlet_ports.items(): - # for port in range(n_ports): - # port_mapping['inlet'][counter] = {} - # port_mapping['inlet'][counter]['mapped_state'] = mapped_state - # port_mapping['inlet'][counter]['port_index'] = port - # counter += 1 - - # counter = 0 - # for mapped_state, n_ports in self.outlet_ports.items(): - # for port in range(n_ports): - # port_mapping['outlet'][counter] = {} - # port_mapping['outlet'][counter]['mapped_state'] = mapped_state - # port_mapping['outlet'][counter]['port_index'] = port - # counter += 1 - - # return dict(port_mapping) diff --git a/CADETPythonSimulator/state.py b/CADETPythonSimulator/state.py index 9710714..caaca33 100644 --- a/CADETPythonSimulator/state.py +++ b/CADETPythonSimulator/state.py @@ -41,6 +41,7 @@ def __init__( n_inlet_ports: int = 0, n_outlet_ports: int = 0, ) -> NoReturn: + """Init a state.""" self.name = name self.dimensions = dimensions self.entries = entries diff --git a/CADETPythonSimulator/system.py b/CADETPythonSimulator/system.py index ed92f1f..4860517 100644 --- a/CADETPythonSimulator/system.py +++ b/CADETPythonSimulator/system.py @@ -7,7 +7,10 @@ from scikits.odes.dae import dae from CADETPythonSimulator.state import State, state_factory -from CADETPythonSimulator.coupling_interface import CouplingInterface, AverageCoupling +from CADETPythonSimulator.coupling_interface import ( + CouplingInterface, + WeightedAverageCoupling +) from CADETProcess.dataStructure import Structure from CADETPythonSimulator.exception import NotInitializedError, CADETPythonSimError @@ -30,7 +33,6 @@ def __init__(self, unit_operations: list[UnitOperationBase]): = None self._setup_unit_operations(unit_operations) - @property def unit_operations(self) -> dict[str, UnitOperationBase]: """dict: Unit operations, indexed by name.""" @@ -254,7 +256,18 @@ def compute_residual( def couple_unit_operations( self, ) -> NoReturn: - """Couple unit operations for set parameters.""" + """ + Couple unit operations for set parameters. + + Iterates first over all rows of the connectivity-matrix which is the + destination. If the total flow is 0, the row is ignored. + + Then it iterates over every origin flow by iterating over the row itself and + saves the state and the rate into a touple that gets added to a list. + + With this list, the new state for destination_unit is created with the + coupled_state_func. And afterward is set. + """ for destination_port_index, Q_destinations in enumerate(self.connectivity): Q_destination_total = sum(Q_destinations) if Q_destination_total == 0: @@ -264,8 +277,6 @@ def couple_unit_operations( self.destination_index_unit_operations[destination_port_index] destination_unit = self.unit_operations[destination_info['name']] destination_port = destination_info['port'] - - unit_Q_list = [] for origin_port_index, Q_destination in enumerate(Q_destinations): if Q_destination == 0: @@ -274,13 +285,11 @@ def couple_unit_operations( origin_info = self.origin_index_unit_operations[origin_port_index] origin_unit = self.unit_operations[origin_info['name']] origin_port = origin_info['port'] - unit_Q_list.append( (origin_unit.get_outlet_state_flat(origin_port), Q_destination) - ) + ) s_new = self.coupled_state_func(unit_Q_list) - destination_unit.set_inlet_state_flat( s_new, destination_port ) @@ -304,7 +313,6 @@ def coupled_state_func(self, unit_Q_list: list[dict, float]) -> dict: ret[state] = calc_method.get_coupled_state(unit_Q_list, state) return ret - def _compute_connectivity_matrix(self, connections: list) -> np.ndarray: # TODO: This could be the setter for `connectivity` # Note, maybe we already adapt the interface s.t. we compute this matrix, @@ -363,8 +371,6 @@ def _compute_connectivity_matrix(self, connections: list) -> np.ndarray: self._connectivity = connections_matrix - - class FlowSystem(SystemBase): """ SystemBase Class. @@ -376,8 +382,8 @@ class FlowSystem(SystemBase): def __init__(self, unit_operations: list[UnitOperationBase]): """Construct FlowSystem Object.""" self.coupling_state_structure={ - 'c': AverageCoupling(), - 'viscosity': AverageCoupling() + 'c': WeightedAverageCoupling(), + 'viscosity': WeightedAverageCoupling() } super().__init__(unit_operations) @@ -394,9 +400,9 @@ def set_rates(self) -> NoReturn: for dest_i, Q_n in enumerate(self.connectivity): unit_operation = self.destination_index_unit_operations[dest_i]['name'] unit_port = self.destination_index_unit_operations[dest_i]['port'] - self.unit_operations[unit_operation].set_Q_in(unit_port, np.sum(Q_n)) + self.unit_operations[unit_operation].set_Q_in_port(unit_port, np.sum(Q_n)) for origin_i, Q_n in enumerate(self.connectivity.T): unit_operation = self.origin_index_unit_operations[origin_i]['name'] unit_port = self.origin_index_unit_operations[origin_i]['port'] - self.unit_operations[unit_operation].set_Q_out(unit_port, np.sum(Q_n)) + self.unit_operations[unit_operation].set_Q_out_port(unit_port, np.sum(Q_n)) diff --git a/CADETPythonSimulator/system_solver.py b/CADETPythonSimulator/system_solver.py deleted file mode 100644 index 964f2d0..0000000 --- a/CADETPythonSimulator/system_solver.py +++ /dev/null @@ -1,258 +0,0 @@ -from typing import NoReturn - -from addict import Dict -import numpy as np -from scikits.odes.dae import dae - -from CADETProcess.dataStructure import Structure -from CADETPythonSimulator.exception import NotInitializedError -from CADETPythonSimulator.unit_operation import UnitOperationBase - - -class SystemSolver(Structure): - def __init__(self, unit_operations: list[UnitOperationBase], sections: list[dict]): - self.initialize_solver() - - self._setup_unit_operations(unit_operations) - self._setup_sections(sections) - - def _setup_unit_operations(self, unit_operations): - self._unit_operations: list[UnitOperationBase] = unit_operations - - self.unit_slices: dict[UnitOperationBase, slice] = {} - start_index = 0 - for unit in self.unit_operations: - end_index = start_index + unit.n_dof - self.unit_slices[unit] = slice(start_index, end_index) - start_index = end_index - - # dict with [origin_index]{unit, port} - origin_index_unit_operations = Dict() - # Nested dict with [unit_operations][ports]: origin_index in connectivity matrix - origin_unit_ports = Dict() - origin_counter = 0 - for i_unit, unit in enumerate(self.unit_operations): - for port in range(unit.n_outlet_ports): - origin_unit_ports[i_unit][port] = origin_counter - origin_index_unit_operations[origin_counter] = { - 'unit': i_unit, 'port': port - } - origin_counter += 1 - self.origin_unit_ports = origin_unit_ports - self.origin_index_unit_operations = origin_index_unit_operations - self.n_origin_ports = origin_counter - - # dict with [origin_index]{unit, port} - destination_index_unit_operations = Dict() - # Nested dict with [unit_operations][ports]: dest*_index in connectivity matrix - destination_unit_ports = Dict() - destination_counter = 0 - for i_unit, unit in enumerate(self.unit_operations): - for port in range(unit.n_inlet_ports): - destination_unit_ports[i_unit][port] = destination_counter - destination_index_unit_operations[destination_counter] = { - 'unit': i_unit, 'port': port - } - destination_counter += 1 - self.destination_unit_ports = destination_unit_ports - self.destination_index_unit_operations = destination_index_unit_operations - self.n_destination_ports = destination_counter - - def _setup_sections(self, sections): - # TODO: Check section continuity. - - self.sections = sections - - - def initialize_solver(self, solver: str = 'ida') -> NoReturn: - """ - Initialize solver. - - Parameters - ---------- - solver : str, optional - Solver to use for integration. The default is `ida`. - - """ - if solver not in ['ida']: - raise ValueError(f"{solver} is not a supported solver.") - - self.solver = dae(solver, self.compute_residual) - - def initialize_solution_recorder(self) -> NoReturn: - """ - Initialize the solution recorder for all unit_operations. - - Iterates over each unit in the system and initializes an empty numpy array for - each state variable within the unit. The structure and size of the array for - each state variable are determined by the unit's state structure. - """ - self.unit_solutions: dict[UnitOperationBase, dict] = {} - - for unit in self.unit_operations: - self.unit_solutions[unit]: dict[str, np.ndarray] = {} - for state_name, state in unit.states_dict.items(): - self.unit_solutions[unit][state_name] = np.empty((0, *state.shape)) - self.unit_solutions[unit][f"{state_name}_dot"] = np.empty((0, *state.shape)) - - def write_solution(self, y: np.ndarray, y_dot: np.ndarray) -> NoReturn: - """ - Update the solution recorder for each unit with the current state. - - Iterates over each unit, to extract the relevant portions of `y` and `y_dot`. - The current state of each unit is determined by splitting `y` according to each - unit's requirements. - - Parameters - ---------- - y : np.ndarray - The current complete state of the system as a NumPy array. - y_dot : np.ndarray - The current complete derivative of the system's state as a NumPy array. - - """ - for unit, unit_slice in self.unit_slices.items(): - current_state = unit.y_split - - for state, value in current_state.items(): - previous_states = self.unit_solutions[unit][state] - self.unit_solutions[unit][state] = np.vstack(( - previous_states, - value.s.reshape((1, previous_states.shape[-1])) - )) - - current_state_dot = unit.y_dot_split - for state, value in current_state_dot.items(): - previous_states_dot = self.unit_solutions[unit][f"{state}_dot"] - self.unit_solutions[unit][f"{state}_dot"] = np.vstack(( - previous_states_dot, - value.s.reshape((1, previous_states_dot.shape[-1])) - )) - - def solve(self) -> NoReturn: - """Simulate the system.""" - self.initialize_solution_recorder() - - y_initial = self.y - y_initial_dot = self.y_dot - self.write_solution(y_initial, y_initial_dot) - - previous_end = self.sections[0].start - for section in self.sections: - if section.start <= section.end: - raise ValueError("Section end must be larger than section start.") - if section.start != previous_end: - raise ValueError("Sections times must be without gaps.") - - self.update_section_states( - section.start, - section.end, - section.section_states, - ) - self.couple_unit_operations( - section.connections, - y_initial, - y_initial_dot - ) - - section_solution_times = self.get_section_solution_times(section) - y, y_dot = self.solve_section( - section_solution_times, y_initial, y_initial_dot - ) - - self.write_solution(y, y_dot) - - y_initial = y - y_initial_dot = y_dot - - def solve_section( - self, - section_solution_times: np.ndarray, - y_initial: np.ndarray, - y_initial_dot: np.ndarray - ) -> tuple[np.ndarray, np.ndarray]: - """ - Solve a time section of the differential-algebraic equation system. - - This method uses the solver initialized by the system to compute the states and - their derivatives over a specified section. - - Parameters - ---------- - section_solution_times : np.ndarray - The time points at which the solution is sought. - y_initial : np.ndarray - Initial values of the state variables at the start of the section. - y_initial_dot : np.ndarray - Initial derivatives of the state variables at the start of the section. - - Returns - ------- - tuple[np.ndarray, np.ndarray] - A tuple containing two NumPy arrays: the first array contains the computed - values of the state variables (y) at each time point in - `section_solution_times`, and the second array contains the derivatives of - these state variables (y_dot) at each time point. - - """ - output = self.solver.solve(section_solution_times, y_initial, y_initial_dot) - y = output.values.y - y_dot = output.values.ydot - - return y, y_dot - - - def get_section_solution_times(self, section: dict) -> np.ndarray: - # TODO: How to get section_solution_times from section.start, section.end, if user_solution times are provided? - raise NotImplementedError() - - def _update_section_states( - self, - start: float, - end: float, - section_states: dict[UnitOperationBase, dict] - ) -> np.ndarray: - """ - Update time dependent unit operation parameters. - - Parameters - ---------- - start: float - Start time of the section. - end: float - End time of the section. - section_states : dict[UnitOperation, dict] - Unit operation parameters for the next section. - - """ - for unit, parameters in section_states.items(): - # TODO: Check if unit is part of SystemSolver - if isinstance(unit, str): - unit = self.unit_operations_dict[unit] - - unit.update_section_dependent_parameters(start, end, parameters) - - # @property - # def port_mapping(self) -> dict[int, str]: - # """dict: Mapping of port indices to corresponding state entries.""" - # # TODO: Let this be handled by the SystemSolver? - # port_mapping = defaultdict(dict) - - # counter = 0 - # for mapped_state, n_ports in self.inlet_ports.items(): - # for port in range(n_ports): - # port_mapping['inlet'][counter] = {} - # port_mapping['inlet'][counter]['mapped_state'] = mapped_state - # port_mapping['inlet'][counter]['port_index'] = port - # counter += 1 - - # counter = 0 - # for mapped_state, n_ports in self.outlet_ports.items(): - # for port in range(n_ports): - # port_mapping['outlet'][counter] = {} - # port_mapping['outlet'][counter]['mapped_state'] = mapped_state - # port_mapping['outlet'][counter]['port_index'] = port - # counter += 1 - - # return dict(port_mapping) - diff --git a/CADETPythonSimulator/unit_operation.py b/CADETPythonSimulator/unit_operation.py index 5980b3c..20a0b49 100644 --- a/CADETPythonSimulator/unit_operation.py +++ b/CADETPythonSimulator/unit_operation.py @@ -200,16 +200,36 @@ def Q_out(self, Q_out: npt.ArrayLike): ) self._Q_out = np.array(Q_out) - def set_Q_in(self, port: int, Q_in: float): - """Set a portspecific Q_in.""" - if not port < self.n_inlet_ports and port >=0: + def set_Q_in_port(self, port: int, Q_in: float): + """ + Set a portspecific Q_in. + + Parameters + ---------- + port : int + port to set Q + Q_in : float + rate to set + + """ + if not port < self.n_inlet_ports and port >= 0: raise f"""Port {port} is not inbetween 0 and {self.n_inlet_ports}""" self._Q_in[port] = Q_in - def set_Q_out(self, port: int, Q_out: float): - """Set s portspecific Q_out.""" - if not port < self.n_outlet_ports and port >=0: - raise f"""Port {port} is not inbetween 0 and {self.n_outlet_ports}""" + def set_Q_out_port(self, port: int, Q_out: float): + """ + Set a portspecific Q_out. + + Parameters + ---------- + port : int + port to set Q + Q_out : float + rate to set + + """ + if not port < self.n_outlet_ports and port >= 0: + raise f"""Port {port} is not inbetween 0 and {self.n_outlet_ports}""" self._Q_out[port] = Q_out @property @@ -508,15 +528,13 @@ def compute_residual( ---------- t : float Time at which to evaluate the residual. - residual : np.ndarray - Residual of the unit operation. """ - # Inlet DOFs are simply copied to the residual. t_poly = np.array([1, t, t**2, t**3]) self.residuals['outlet']['c'] = self.c_poly @ t_poly self.residuals['outlet']['viscosity'] = self.states['outlet']['viscosity'] + class Outlet(UnitOperationBase): """System outlet.""" @@ -573,9 +591,9 @@ def compute_residual( """ c_in = self.states['inlet']['c'] - c_in_dot = self.state_derivatives['inlet']['c'] + # c_in_dot = self.state_derivatives['inlet']['c'] - viscosity_in = self.states['inlet']['viscosity'] + # viscosity_in = self.states['inlet']['viscosity'] c = self.states['bulk']['c'] c_dot = self.state_derivatives['bulk']['c'] @@ -600,6 +618,7 @@ def compute_residual( self.residuals['inlet']['viscosity'] = calculate_residual_visc_cstr() + class DeadEndFiltration(UnitOperationBase): """ Dead end filtration model. @@ -649,18 +668,18 @@ class DeadEndFiltration(UnitOperationBase): def compute_residual( self, t: float, - ) -> NoReturn: + ) -> NoReturn: """Calculate the Residuum for DEF.""" Q_in = self.Q_in[0] Q_out = self.Q_out[0] c_in = self.states['cake']['c'] - c_in_dot = self.state_derivatives['cake']['c'] + # c_in_dot = self.state_derivatives['cake']['c'] V_C = self.states['cake']['cakevolume'] V_dot_C = self.state_derivatives['cake']['cakevolume'] - V_p = self.states['cake']['permeate'] + # V_p = self.states['cake']['permeate'] Q_p = self.state_derivatives['cake']['cakevolume'] viscosity_in = self.states['cake']['viscosity'] @@ -673,7 +692,7 @@ def compute_residual( deltap = self.states['cake']['pressure'] - #parameters + # parameters molecular_weights = self.component_system.molecular_weights molar_volume = self.component_system.molecular_volumes membrane_area = self.parameters['membrane_area'] @@ -681,7 +700,8 @@ def compute_residual( specific_cake_resistance = self.parameters['specific_cake_resistance'] rejection = np.array( - [self.rejection.get_rejection(mw) for mw in molecular_weights]) + [self.rejection.get_rejection(mw) for mw in molecular_weights] + ) # Handle inlet DOFs, which are simply copied to the residual self.residuals['cake']['c'] = c_in @@ -691,7 +711,7 @@ def compute_residual( molar_volume, c_in, V_dot_C - ) + ) self.residuals['cake']['pressure'] = calculate_residual_press_easy_def( Q_p, @@ -701,18 +721,18 @@ def compute_residual( viscosity_in, membrane_resistance, specific_cake_resistance - ) + ) self.residuals['cake']['permeate'] = calculate_residual_volume_cstr( V_C, V_dot_C, Q_in, Q_p - ) + ) self.residuals['cake']['viscosity'] = calculate_residual_visc_def() - new_c_in = (1-rejection)*c_in + new_c_in = (1 - rejection) * c_in self.residuals['permeate']['c'] = calculate_residual_concentration_cstr( c, @@ -722,14 +742,14 @@ def compute_residual( Q_p, Q_out, new_c_in - ) + ) self.residuals['permeate']['Volume'] = calculate_residual_volume_cstr( V, V_dot, Q_p, Q_out - ) + ) self.residuals['permeate']['viscosity'] = calculate_residual_visc_cstr() diff --git a/pyproject.toml b/pyproject.toml index 63033be..e88a7f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,5 +71,6 @@ line-length = 88 indent-width = 4 [tool.ruff.lint] +# preview = true select = ["E", "F", "W", "D"] ignore = ["F401", "D212", "D100"] diff --git a/tests/test_rejection.py b/tests/test_rejection.py index 6ec773d..929d696 100644 --- a/tests/test_rejection.py +++ b/tests/test_rejection.py @@ -21,12 +21,11 @@ TestCaseCutof ] ) - class TestRejection(): + """Test Class to test all rejection Models.""" + def test_get_rejection(self, parameters): - """ - Test to check wheter the get_rejection function works as intended - """ + """Test to check wheter the get_rejection function works as intended.""" model = parameters["model"](**parameters["model_param"]) solution = [model.get_rejection(weight) for weight in parameters["weights"]] diff --git a/tests/test_residual.py b/tests/test_residual.py index 6781de2..53f3bca 100644 --- a/tests/test_residual.py +++ b/tests/test_residual.py @@ -92,8 +92,10 @@ ] ) class TestResidualConcCSTR(): - def test_calculation_concentration_cstr(self, parameters): + """Class to test the Residual concentration function for cstr.""" + def test_calculation_concentration_cstr(self, parameters): + """Test implementation.""" param_vec_conc = parameters["values"].values() np.testing.assert_array_almost_equal( @@ -169,7 +171,10 @@ def test_calculation_concentration_cstr(self, parameters): ] ) class TestResidualVolCSTR(): + """Test class vor the residual volume function.""" + def test_calculation_cstr(self, parameters): + """Test the implementation.""" param_vec_volume = parameters["values"].values() residual = calculate_residual_volume_cstr(*param_vec_volume) np.testing.assert_equal(residual, parameters["expected"]) @@ -235,7 +240,10 @@ def test_calculation_cstr(self, parameters): ] ) class TestResidualCakeVolDEF(): + """Class for testing the CAKE volume residual fucntion.""" + def test_calculation_def(self, parameters): + """Test the residual cake function.""" param_vec_cake_vol = parameters["values"].values() np.testing.assert_equal( calculate_residual_cake_vol_def(*param_vec_cake_vol), @@ -280,7 +288,10 @@ def test_calculation_def(self, parameters): ] ) class TestResidualPressureDropDEF(): + """Test class for the residual pressure prop of dead end filtration model.""" + def test_calculation_def(self, parameters): + """Test the calculation method.""" param_vec_pressure = parameters["values"].values() residual = calculate_residual_press_easy_def(*param_vec_pressure) np.testing.assert_equal(residual, parameters["expected"]) @@ -307,14 +318,17 @@ def test_calculation_def(self, parameters): ] ) class TestResidualError(): + """Test class for error handling.""" def test_calculation_vol_cstr_error(self, parameters): + """Test error of volume function of cstr.""" param_vec_volume = parameters["values"].values() with pytest.raises(CADETPythonSimError): calculate_residual_volume_cstr(*list(param_vec_volume)[2:6]) def test_calculation_concentration_cstr_error(self, parameters): + """Test error of concentration function of cstr.""" param_vec_volume = parameters["values"].values() with pytest.raises(CADETPythonSimError): diff --git a/tests/test_solver.py b/tests/test_solver.py index 6629cb4..e8f26a2 100644 --- a/tests/test_solver.py +++ b/tests/test_solver.py @@ -23,9 +23,13 @@ def values(self): """Values Property to imitate.""" return self._test_dict + # %% System Fixtures class SolverFixture(Solver): + """Solver Fixture Class for testing Solver.""" + def __init__(self, system=None, sections=None, *args, **kwargs): + """Init of the Solver Fixture with pre set sections.""" if sections is None: sections = [ { @@ -63,13 +67,17 @@ def __init__(self, system=None, sections=None, *args, **kwargs): ] if system is None: - system=SystemFixture() + system = SystemFixture() system.initialize() super().__init__(system, sections, *args, **kwargs) + class SystemFixture(FlowSystem): + """System Fixture Class for Testing purposes.""" + def __init__(self, unit_operations=None, *args, **kwargs): + """Init System Fixture with set untitoperations when not given.""" if unit_operations is None: unit_operations = [ InletFixture(), @@ -84,6 +92,8 @@ def __init__(self, unit_operations=None, *args, **kwargs): # %% State Structure solver_fixture_obj = SolverFixture() rej_obj = solver_fixture_obj.system.unit_operations['dead_end_filtration'].rejection + + @pytest.mark.parametrize( "solver, expected", [ @@ -93,13 +103,13 @@ def __init__(self, unit_operations=None, *args, **kwargs): 'n_dof': 16, 'unit_solution': { 'inlet': { - 'outlet':{ + 'outlet': { 'c': { 'values': np.array([[0., 1.], [0., 2.]]), - 'derivatives': np.array([[ 0., 2.], [0., 4.]]) + 'derivatives': np.array([[0., 2.], [0., 4.]]) }, 'viscosity': { - 'values': np.array([[ 2.], [4.]]), + 'values': np.array([[2.], [4.]]), 'derivatives': np.array([[4.], [8.]]), } } @@ -107,19 +117,19 @@ def __init__(self, unit_operations=None, *args, **kwargs): 'dead_end_filtration': { 'cake': { 'c': { - 'values': np.array([[ 3., 4.], [6., 8.]]), - 'derivatives': np.array([[ 6., 8.], [12., 16.]]), + 'values': np.array([[3., 4.], [6., 8.]]), + 'derivatives': np.array([[6., 8.], [12., 16.]]), }, 'viscosity': { - 'values': np.array([[ 5.], [10.]]), + 'values': np.array([[5.], [10.]]), 'derivatives': np.array([[10.], [20.]]), }, 'pressure': { - 'values': np.array([[ 6.], [12.]]), + 'values': np.array([[6.], [12.]]), 'derivatives': np.array([[12.], [24.]]), }, - 'cakevolume':{ + 'cakevolume': { 'values': np.array([[7.], [14.]]), 'derivatives': np.array([[14.], [28.]]), }, @@ -227,9 +237,8 @@ def test_solution_recorder(self, solver: Solver, expected): solution = np.arange(solver._system.n_dof).reshape((-1, solver._system.n_dof)) - y = solution - ydot = 2* solution + ydot = 2 * solution t = [0] solver.write_solution(t, y, ydot) @@ -251,6 +260,7 @@ def test_solution_recorder(self, solver: Solver, expected): ) def test_parameters(self, solver: Solver, expected): + """Test updating of parameters.""" for i_sec, section in enumerate(solver.sections): solver._update_unit_operation_parameters( section['start'], section['end'], section['section_states'] diff --git a/tests/test_state.py b/tests/test_state.py index 967db91..1179729 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -333,7 +333,10 @@ ] ) class TestState(): + """Test state class.""" + def test_state_dimensions(self, state_config, expected): + """Test if state parameters are set correctly.""" state = State(**state_config) assert state.dimension_shape == expected['dimension_shape'] assert state.n_dimensions == expected['n_dimensions'] @@ -347,6 +350,7 @@ def test_state_dimensions(self, state_config, expected): assert state.outlet_port_mapping == expected['outlet_port_mapping'] def test_s_split(self, state_config, expected): + """Tests if state spltitting works as intended.""" state = State(**state_config) new_state = np.arange(state.n_dof, dtype=float).reshape(state.shape) diff --git a/tests/test_system.py b/tests/test_system.py index ee9e977..1dc4652 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -86,8 +86,10 @@ ] ) class TestSystem(): + """Test Class for system.""" def test_system_structure(self, system, expected): + """Test system structure.""" assert system.n_dof == expected['n_dof'] @@ -111,6 +113,8 @@ def test_system_structure(self, system, expected): # TODO: Add tests with multiple ports. ) class TestSystemConnectivity(): + """Test system connectivity class.""" + def test_connections_matrix( self, system: SystemBase, @@ -118,6 +122,7 @@ def test_connections_matrix( expected_matrix, expected_state ): + """Test for computing the connectivity.""" system._compute_connectivity_matrix(connections) np.testing.assert_almost_equal(system._connectivity, expected_matrix) @@ -128,7 +133,7 @@ def test_coupling( expected_matrix, expected_state ): - + """Test for updating and coupling the system connectivity.""" y = np.arange(system.n_dof) y_dot = 2 * y diff --git a/tests/test_unit_operation.py b/tests/test_unit_operation.py index 21811d9..887f477 100644 --- a/tests/test_unit_operation.py +++ b/tests/test_unit_operation.py @@ -18,7 +18,10 @@ # %% Unit Operation Fixtures class TwoComponentFixture(CPSComponentSystem): + """Component Fixture class wiht two pre set components.""" + def __init__(self, *args, **kwargs): + """Initialize the component with parameters.""" super().__init__(*args, **kwargs) self.add_component('A', molecular_weight=1e3, density=1e3, molecular_volume=1) @@ -26,20 +29,27 @@ def __init__(self, *args, **kwargs): class UnitOperationFixture(UnitOperationBase): + """Unit Operation Fixture Class for testing purpose.""" + class_cps = TwoComponentFixture() def __init__(self, component_system, name, *args, **kwargs): + """Initialize the unit operation.""" if component_system is None: component_system = UnitOperationFixture.class_cps super().__init__(component_system, name, *args, **kwargs) def add_section(self, *args, **kwargs): + """Add section depending on unit operation.""" pass def initialize(self) -> NoReturn: + """Initialize unit operation dependend for testing purpose.""" super().initialize() self.add_section() class InletFixture(UnitOperationFixture, Inlet): + """Inlet fixture class for testing purpose, inherits from UnitOperationFixture.""" + def __init__( self, component_system=None, @@ -48,7 +58,8 @@ def __init__( viscosity=1e-3, *args, **kwargs - ): + ): + """Initialize class for inlet fixture.""" super().__init__(component_system, name, *args, **kwargs) if c_poly is None: @@ -58,22 +69,31 @@ def __init__( self.viscosity = viscosity def add_section(self, c_poly=None, start=0, end=1): + """For testing purpose, c_poly is set.""" if c_poly is None: c_poly = self.c_poly self.update_parameters(start, end, {'c_poly': c_poly}) class OutletFixture(UnitOperationFixture, Outlet): + """Oulet fixture class for testing purpose, inherits from UnitOperationFixture.""" + def __init__(self, component_system=None, name='outlet', *args, **kwargs): + """Initialize the outlet fixture.""" super().__init__(component_system, name, *args, **kwargs) class CstrFixture(UnitOperationFixture, Cstr): + """Cstr fixture class for testing purpose, inherits from UnitOperationFixture.""" + def __init__(self, component_system=None, name='cstr', *args, **kwargs): + """Initialize the cstr fixture.""" super().__init__(component_system, name, *args, **kwargs) class DeadEndFiltrationFixture(UnitOperationFixture, DeadEndFiltration): + """DEF fixture class for testing purpose, inherits from UnitOperationFixture.""" + def __init__(self, component_system=None, name='dead_end_filtration', @@ -83,7 +103,8 @@ def __init__(self, rejection=StepCutOff(cutoff_weight=0), *args, **kwargs - ): + ): + """Initialize DEF fixture with default parameter and default rejection.""" super().__init__(component_system, name, *args, **kwargs) self.membrane_area = membrane_area @@ -93,6 +114,8 @@ def __init__(self, class CrossFlowFiltrationFixture(UnitOperationFixture, CrossFlowFiltration): + """CFF fixture class for testing purpose, inherits from UnitOperationFixture.""" + def __init__(self, component_system=None, name='cross_flow_filtration', @@ -100,7 +123,8 @@ def __init__(self, membrane_resistance=1e-9, *args, **kwargs - ): + ): + """Initialize CFF fixture with default parameter and defaultrejection.""" super().__init__(component_system, name, *args, **kwargs) self.membrane_area = membrane_area @@ -296,17 +320,22 @@ def __init__(self, ] ) class TestUnitStateStructure: + """Test class for unit state structure.""" + def test_initialize(self, unit_operation: UnitOperationBase, expected: dict): + """Check exception raising while not initialized.""" with pytest.raises(Exception): unit_operation.states def test_state_structure(self, unit_operation: UnitOperationBase, expected: dict): + """Initialize the unit operation and test structure.""" unit_operation.initialize() assert unit_operation.n_inlet_ports == expected['n_inlet_ports'] assert unit_operation.n_outlet_ports == expected['n_outlet_ports'] assert unit_operation.n_dof == expected['n_dof'] def test_states(self, unit_operation: UnitOperationBase, expected: dict): + """Test state and directly state setting.""" y_new = np.arange(unit_operation.n_dof, dtype=float) unit_operation.y = y_new @@ -314,6 +343,7 @@ def test_states(self, unit_operation: UnitOperationBase, expected: dict): np.testing.assert_equal(state.s, expected['states'][name]) def test_set_inlet_state(self, unit_operation: UnitOperationBase, expected: dict): + """Test set_inlet_state_flat function.""" s_in = { 'c': [.1, .2], 'viscosity': [.3], @@ -330,6 +360,7 @@ def test_set_inlet_state(self, unit_operation: UnitOperationBase, expected: dict np.testing.assert_array_equal(state_slice, slice_information['value']) def test_get_outlet_state(self, unit_operation: UnitOperationBase, expected: dict): + """Test getter for outlet state.""" if unit_operation.n_outlet_ports == 0: with pytest.raises(Exception): unit_operation.get_outlet_port_state(0) @@ -477,6 +508,8 @@ def test_get_outlet_state(self, unit_operation: UnitOperationBase, expected: dic ] ) class TestUnitResidual(): + """Test class for resdiual related functions of unit operations.""" + def test_unit_residual( self, monkeypatch,