diff --git a/paynt/cli.py b/paynt/cli.py index 63a215944..f5a96a337 100644 --- a/paynt/cli.py +++ b/paynt/cli.py @@ -17,7 +17,6 @@ from .quotient.storm_pomdp_control import StormPOMDPControl - import click import sys import os @@ -114,9 +113,6 @@ def setup_logger(log_path = None): @click.option("--export-fsc-paynt", type=click.Path(), default=None, help="path to output file for SAYNT inductive FSC") -#@click.option("--storm-parallel", is_flag=True, default=False, -# help="run storm analysis in parallel (can only be used together with --storm-pomdp-analysis flag)") - @click.option( "--ce-generator", default="storm", diff --git a/paynt/quotient/models.py b/paynt/quotient/models.py index 2b96458af..e767dd916 100644 --- a/paynt/quotient/models.py +++ b/paynt/quotient/models.py @@ -270,5 +270,5 @@ def check_specification(self, specification, constraint_indices = None, short_ev constraints_result, optimality_result = super().check_specification(specification,constraint_indices,short_evaluation) if optimality_result is not None and optimality_result.improving_assignment is not None and double_check: optimality_result.improving_assignment, optimality_result.improving_value = self.quotient_container.double_check_assignment(optimality_result.improving_assignment) - print(optimality_result.improving_assignment, optimality_result.improving_value) + # print(optimality_result.improving_assignment, optimality_result.improving_value) return MdpSpecificationResult(constraints_result, optimality_result) diff --git a/paynt/quotient/quotient_pomdp.py b/paynt/quotient/quotient_pomdp.py index e8392e45d..49304fced 100644 --- a/paynt/quotient/quotient_pomdp.py +++ b/paynt/quotient/quotient_pomdp.py @@ -88,7 +88,7 @@ def __init__(self, pomdp, specification, decpomdp_manager=None): agent_obs = decpomdp_manager.joint_observations[obs][0] agent_obs_label = decpomdp_manager.agent_observation_labels[0][agent_obs] self.observation_labels.append(agent_obs_label) - logger.debug(f"Observation labels: {self.observation_labels}") + # logger.debug(f"Observation labels: {self.observation_labels}") # compute actions available at each observation self.actions_at_observation = [0] * self.observations @@ -712,25 +712,21 @@ def policy_size(self, assignment): return size_gamma + size_delta - # constructs pomdp from the quotient MDP + # constructs pomdp from the quotient MDP, used for computing POMDP abstraction bounds def get_family_pomdp(self, mdp): no_obs = self.pomdp.nr_observations - #print(mdp.model) tm = mdp.model.transition_matrix components = stormpy.storage.SparseModelComponents(tm, mdp.model.labeling, mdp.model.reward_models) full_observ_list = [] - #full_choice_labels = [] for state in range(self.pomdp.nr_states): obs = self.pomdp.get_observation(state) for mem in range(self.observation_memory_size[obs]): full_observ_list.append(obs + mem * no_obs) - #full_choice_labels.append(list(range(self.pomdp.get_nr_available_actions(state)))) - - #print(full_choice_labels) choice_labeling = stormpy.storage.ChoiceLabeling(mdp.choices) + # assign observations to states observ_list = [] choice_labels = [] for state in range(mdp.model.nr_states): @@ -739,12 +735,11 @@ def get_family_pomdp(self, mdp): actions = [action for action in range(mdp.model.get_nr_available_actions(state))] choice_labels.append(actions) - # LABELING + # construct labeling labels_list = [item for sublists in choice_labels for item in sublists] labels = list(set(labels_list)) for label in labels: choice_labeling.add_label(str(label)) - for choice in range(mdp.choices): choice_labeling.add_label_to_choice(str(labels_list[choice]), choice) @@ -754,7 +749,4 @@ def get_family_pomdp(self, mdp): pomdp = stormpy.storage.SparsePomdp(components) pomdp = stormpy.pomdp.make_canonic(pomdp) - #stormpy.export_to_drn(pomdp, "pomdp-test.out") - #print(pomdp) - return pomdp diff --git a/paynt/quotient/storm_pomdp_control.py b/paynt/quotient/storm_pomdp_control.py index 4251e7627..ce250bd7b 100644 --- a/paynt/quotient/storm_pomdp_control.py +++ b/paynt/quotient/storm_pomdp_control.py @@ -7,11 +7,9 @@ from os import makedirs - from threading import Thread from time import sleep - import logging logger = logging.getLogger(__name__) @@ -68,15 +66,13 @@ def get_storm_result(self): self.parse_results(self.quotient) self.update_data() - #print(self.result_dict) - #print(self.storm_bounds) - if self.s_queue is not None: self.s_queue.put((self.result_dict, self.storm_bounds)) # run Storm POMDP analysis for given model and specification # TODO: discuss Storm options def run_storm_analysis(self): + # TODO rework options if self.storm_options == "cutoff": options = self.get_cutoff_options(100000) elif self.storm_options == "clip2": @@ -107,9 +103,6 @@ def run_storm_analysis(self): belmc = stormpy.pomdp.BeliefExplorationModelCheckerDouble(self.pomdp, options) - #self.paynt_export = [] - #print(self.paynt_export) - logger.info("starting Storm POMDP analysis") storm_timer = Timer() storm_timer.start() @@ -121,30 +114,19 @@ def run_storm_analysis(self): size = self.get_belief_controller_size(result, self.paynt_fsc_size) if self.get_result is not None: - #print(result.induced_mc_from_scheduler) - #print(result.lower_bound) - #print(result.upper_bound) # TODO not important for the paper but it would be nice to have correct FSC here as well - #print(self.get_belief_controller_size(result, self.paynt_fsc_size)) if self.storm_options == "overapp": print(f'-----------Storm----------- \ \nValue = {value} | Time elapsed = {round(storm_timer.read(),1)}s | FSC size = {size}\n', flush=True) - print(".....") - print(result.upper_bound) - print(result.lower_bound) + #print(".....") + #print(result.upper_bound) + #print(result.lower_bound) else: print(f'-----------Storm----------- \ \nValue = {value} | Time elapsed = {round(storm_timer.read(),1)}s | FSC size = {size}\nFSC (dot) = {result.induced_mc_from_scheduler.to_dot()}\n', flush=True) exit() - # debug - #print(result.induced_mc_from_scheduler) - #print(result.lower_bound) - #print(result.upper_bound) - #print(result.cutoff_schedulers[1]) - #for sc in result.cutoff_schedulers: - # print(sc) print(f'-----------Storm----------- \ \nValue = {value} | Time elapsed = {round(storm_timer.read(),1)}s | FSC size = {size}\nFSC (dot) = {result.induced_mc_from_scheduler.to_dot()}\n', flush=True) @@ -154,13 +136,13 @@ def run_storm_analysis(self): else: self.storm_bounds = self.latest_storm_result.lower_bound - + # setup interactive Storm belief model checker def interactive_storm_setup(self): - global belmc - #belmc = stormpy.pomdp.pomdp.create_interactive_mc(MarkovChain.environment, self.pomdp, False) + global belmc # needs to be global for threading to work correctly options = self.get_interactive_options() belmc = stormpy.pomdp.BeliefExplorationModelCheckerDouble(self.pomdp, options) + # start interactive belief model checker, this function is called only once to start the storm thread. To resume Storm computation 'interactive_storm_resume' is used def interactive_storm_start(self, storm_timeout): self.storm_thread = Thread(target=self.interactive_run, args=(belmc,)) control_thread = Thread(target=self.interactive_control, args=(belmc, True, storm_timeout,)) @@ -173,7 +155,7 @@ def interactive_storm_start(self, storm_timeout): self.belief_explorer = belmc.get_interactive_belief_explorer() - + # resume interactive belief model checker, should be called only after belief model checker was previously started def interactive_storm_resume(self, storm_timeout): control_thread = Thread(target=self.interactive_control, args=(belmc, False, storm_timeout,)) @@ -182,26 +164,22 @@ def interactive_storm_resume(self, storm_timeout): control_thread.join() + # terminate interactive belief model checker def interactive_storm_terminate(self): belmc.terminate_unfolding() self.storm_thread.join() + # this function represents the storm thread in SAYNT def interactive_run(self, belmc): logger.info("starting Storm POMDP analysis") result = belmc.check(self.spec_formulas[0], self.paynt_export) # calls Storm + # to get here Storm exploration has to end either by constructing finite belief MDP or by outside termination self.storm_terminated = True if result.induced_mc_from_scheduler is not None: value = result.upper_bound if self.quotient.specification.optimality.minimizing else result.lower_bound size = self.get_belief_controller_size(result, self.paynt_fsc_size) - # debug - #print(result.induced_mc_from_scheduler) - #print(result.lower_bound) - #print(result.upper_bound) - #print(result.cutoff_schedulers[1]) - #for sc in result.cutoff_schedulers: - # print(sc) print(f'-----------Storm----------- \ \nValue = {value} | Time elapsed = {round(self.saynt_timer.read(),1)}s | FSC size = {size}\n', flush=True) @@ -222,17 +200,19 @@ def interactive_run(self, belmc): logger.info("Storm POMDP analysis completed") + # ensures correct execution of one loop of Storm exploration def interactive_control(self, belmc, start, storm_timeout): if belmc.has_converged(): logger.info("Storm already terminated.") return + # Update cut-off FSC values provided by PAYNT if not start: logger.info("Updating FSC values in Storm") - #explorer = belmc.get_interactive_belief_explorer() self.belief_explorer.set_fsc_values(self.paynt_export) belmc.continue_unfolding() + # wait for Storm to start exploring while not belmc.is_exploring(): if belmc.has_converged(): break @@ -244,20 +224,14 @@ def interactive_control(self, belmc, start, storm_timeout): logger.info("Pausing Storm") belmc.pause_unfolding() + # wait for the result to be constructed from the explored belief MDP while not belmc.is_result_ready(): - sleep(2) + sleep(1) result = belmc.get_interactive_result() value = result.upper_bound if self.quotient.specification.optimality.minimizing else result.lower_bound size = self.get_belief_controller_size(result, self.paynt_fsc_size) - # debug - #print(result.induced_mc_from_scheduler) - #print(result.lower_bound) - #print(result.upper_bound) - #print(result.cutoff_schedulers[1]) - #for sc in result.cutoff_schedulers: - # print(sc) print(f'-----------Storm----------- \ \nValue = {value} | Time elapsed = {round(self.saynt_timer.read(),1)}s | FSC size = {size}\n', flush=True) @@ -276,32 +250,29 @@ def interactive_control(self, belmc, start, storm_timeout): self.parse_results(self.quotient) self.update_data() - #print(self.result_dict) - #print(self.result_dict_no_cutoffs) - #print(self.is_storm_better) - - + ######## + # Different options for Storm below (would be nice to make this more succint) def get_cutoff_options(self, belief_states=100000): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(False, True) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = belief_states - options.use_grid_clipping = False + options.use_clipping = False return options def get_overapp_options(self, belief_states=20000000): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(True, False) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = belief_states - options.use_grid_clipping = False + options.use_clipping = False return options def get_refine_options(self, step_limit=0): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(False, True) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = 0 options.size_threshold_factor = 2 - options.use_grid_clipping = False + options.use_clipping = False options.gap_threshold_init = 0 options.refine_precision = 0 if step_limit > 0: @@ -311,39 +282,25 @@ def get_refine_options(self, step_limit=0): def get_clip2_options(self): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(False, True) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = 0 - #options.size_threshold_factor = 2 - options.use_grid_clipping = True - #options.exploration_time_limit = 1 - #options.clipping_threshold_init = 1 + options.use_clipping = True options.clipping_grid_res = 2 options.gap_threshold_init = 0 - #options.refine_precision = 0 - #options.refine = True - #options.exploration_heuristic = - #options.preproc_minmax_method = stormpy.MinMaxMethod.policy_iteration return options def get_clip4_options(self): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(False, True) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = 0 - #options.size_threshold_factor = 2 - options.use_grid_clipping = True - #options.exploration_time_limit = 1 - #options.clipping_threshold_init = 1 + options.use_clipping = True options.clipping_grid_res = 4 options.gap_threshold_init = 0 - #options.refine_precision = 0 - #options.refine = True - #options.exploration_heuristic = - #options.preproc_minmax_method = stormpy.MinMaxMethod.policy_iteration return options def get_interactive_options(self): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(False, True) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = 0 options.skip_heuristic_schedulers = False options.interactive_unfolding = True @@ -351,45 +308,33 @@ def get_interactive_options(self): options.refine = False options.cut_zero_gap = False if self.storm_options == "clip2": - options.use_grid_clipping = True + options.use_clipping = True options.clipping_grid_res = 2 elif self.storm_options == "clip4": - options.use_grid_clipping = True + options.use_clipping = True options.clipping_grid_res = 4 return options + + # End of options + ######## - # Over-approximation + # computes over-approximation for the given POMDP + # this can be used to compute bounds for POMDP abstraction + # TODO discuss the best options for this use case @staticmethod def storm_pomdp_analysis(model, formulas): options = stormpy.pomdp.BeliefExplorationModelCheckerOptionsDouble(True, False) - options.use_explicit_cutoff = True + options.use_state_elimination_cutoff = False options.size_threshold_init = 1000000 - options.use_grid_clipping = False + options.use_clipping = False options.exploration_time_limit = 60 belmc = stormpy.pomdp.BeliefExplorationModelCheckerDouble(model, options) - #logger.info("starting Storm POMDP analysis") result = belmc.check(formulas[0], []) # calls Storm - #logger.info("Storm POMDP analysis completed") - - # debug - #print(result.lower_bound) - #print(result.upper_bound) return result - - # Probably not neccessary with the introduction of paynt result dict - def parse_result(self, quotient): - if self.is_storm_better and self.latest_storm_result is not None: - self.parse_storm_result(quotient) - else: - if self.latest_paynt_result is not None: - self.parse_paynt_result(quotient) - else: - self.result_dict = {} - self.result_dict_paynt = {} - self.result_dict_no_cutoffs = self.result_dict - + + # parse the current Storm and PAYNT results if they are available def parse_results(self, quotient): if self.latest_storm_result is not None: self.parse_storm_result(quotient) @@ -400,21 +345,7 @@ def parse_results(self, quotient): if self.latest_paynt_result is not None: self.parse_paynt_result(quotient) else: - self.result_dict_paynt = {} - - def join_results(self, use_cutoffs=True): - if use_cutoffs: - for obs in range(self.quotient.observations): - if obs in self.result_dict.keys(): - if obs in self.result_dict_paynt.keys(): - for action in self.result_dict_paynt[obs]: - if action not in self.result_dict[obs]: - self.result_dict[obs].append(action) - else: - if obs in self.result_dict_paynt.keys(): - self.result_dict[obs] = self.result_dict_paynt[obs] - - + self.result_dict_paynt = {} # parse Storm results into a dictionary def parse_storm_result(self, quotient): @@ -428,9 +359,6 @@ def parse_storm_result(self, quotient): result_no_cutoffs = {x:[] for x in range(quotient.observations)} for state in self.latest_storm_result.induced_mc_from_scheduler.states: - # debug - #print(state.id, state.labels, get_choice_label(state.id)) - # TODO what if there were no labels in the model? if get_choice_label(state.id) == set(): continue @@ -438,6 +366,7 @@ def parse_storm_result(self, quotient): # parse non cut-off states if 'cutoff' not in state.labels and 'clipping' not in state.labels: for label in state.labels: + # observation based on prism observables if '[' in label: simplified_label = self.quotient.simplify_label(label) observation = self.quotient.observation_labels.index(simplified_label) @@ -454,6 +383,7 @@ def parse_storm_result(self, quotient): if index >= 0 and index not in result_no_cutoffs[int(observation)]: result_no_cutoffs[int(observation)].append(index) + # explicit observation index elif 'obs_' in label: _, observation = label.split('_') @@ -484,10 +414,8 @@ def parse_storm_result(self, quotient): else: if len(cutoff_epxloration) == 0: continue - - # debug - #print(cutoff_epxloration) - + + # obtain what cut-off scheduler was used if 'sched_' in list(get_choice_label(state.id))[0]: _, scheduler_index = list(get_choice_label(state.id))[0].split('_') @@ -518,7 +446,6 @@ def parse_storm_result(self, quotient): if len(result_no_cutoffs[obs]) == 0: del result_no_cutoffs[obs] - #logger.info("Result dictionary is based on result from Storm") self.result_dict = result self.result_dict_no_cutoffs = result_no_cutoffs @@ -546,6 +473,7 @@ def parse_choice_string(self, choice_string, probability_bound=0): return result + # parse PAYNT result to a dictionart def parse_paynt_result(self, quotient): result = {x:[] for x in range(quotient.observations)} @@ -568,83 +496,15 @@ def parse_paynt_result(self, quotient): #logger.info("Result dictionary is based on result from PAYNT") self.result_dict_paynt = result - # returns the main family that will be explored first - def get_main_restricted_family(self, family, quotient, use_cutoffs=True): - - if not self.is_storm_better: - result_dict = self.result_dict_paynt - elif use_cutoffs: - result_dict = self.result_dict - else: - result_dict = self.result_dict_no_cutoffs + # main family contains only the actions considered by respective FSC (most usually Storm result) + def get_main_restricted_family(self, family, result_dict): if result_dict == {}: return family - # go through each observation of interest restricted_family = family.copy() - for obs in range(quotient.observations): - - num_actions = quotient.actions_at_observation[obs] - num_updates = quotient.pomdp_manager.max_successor_memory_size[obs] - - act_obs_holes = quotient.observation_action_holes[obs] - mem_obs_holes = quotient.observation_memory_holes[obs] - act_num_holes = len(act_obs_holes) - mem_num_holes = len(mem_obs_holes) - - if act_num_holes == 0: - continue - - all_actions = [action for action in range(num_actions)] - selected_actions = [all_actions.copy() for _ in act_obs_holes] - - #all_updates = [update for update in range(num_updates)] - #selected_updates = [all_updates.copy() for _ in mem_obs_holes] - - # Action restriction - if obs not in result_dict.keys(): - selected_actions = [[0] for _ in act_obs_holes] - else: - selected_actions = [result_dict[obs] for _ in act_obs_holes] - - selected_updates = [[0] for hole in mem_obs_holes] - - # Apply action restrictions - for index in range(act_num_holes): - hole = act_obs_holes[index] - actions = selected_actions[index] - options = [] - for action in actions: - if action not in restricted_family[hole].options: - continue - options.append(action) - if len(options) == 0: - options = [0] - restricted_family[hole].assume_options(options) - - # Apply memory restrictions - #for index in range(mem_num_holes): - # hole = mem_obs_holes[index] - # updates = selected_updates[index] - # options = [] - # for update in updates: - # options.append(update) - # restricted_family[hole].assume_options(options) - - #print(restricted_family) - logger.info("Main family based on data from Storm: reduced design space from {} to {}".format(family.size, restricted_family.size)) - - return restricted_family - - def get_main_restricted_family_new(self, family, result_dict): - - if result_dict == {}: - return family - # go through each observation of interest - restricted_family = family.copy() for obs in range(self.quotient.observations): for hole in self.quotient.observation_action_holes[obs]: @@ -658,13 +518,14 @@ def get_main_restricted_family_new(self, family, result_dict): restricted_family[hole].assume_options(selected_actions) - #print(restricted_family) logger.info("Main family based on data from Storm: reduced design space from {} to {}".format(family.size, restricted_family.size)) return restricted_family # returns dictionary containing restrictions for easy creation of subfamilies + # creating this restrictions list saves some memory compared to constructing all of the families + # corresponding families are then created only when needed def get_subfamilies_restrictions(self, family, result_dict): if result_dict == {}: @@ -678,11 +539,6 @@ def get_subfamilies_restrictions(self, family, result_dict): act_obs_holes = self.quotient.observation_action_holes[observ] restricted_holes_list.extend(act_obs_holes) - - #explored_hole_list = [] - - # debug - #subfamilies_size = 0 for hole in restricted_holes_list: @@ -700,80 +556,9 @@ def get_subfamilies_restrictions(self, family, result_dict): subfamilies_restriction.append({"hole": hole, "restriction": restriction}) - # debug - #print(obs, subfamily.size, subfamily) - #subfamilies_size += subfamily.size - - # debug - #print(subfamilies_size) - return subfamilies_restriction - - - # returns dictionary containing restrictions for easy creation of subfamilies - # BROKEN NOW !!!!!!!!!!!!!!!!! - # def get_subfamilies_restrictions_symmetry_breaking(self, quotient, use_cutoffs=True): - - # if use_cutoffs or not(self.is_storm_better): - # result_dict = self.result_dict - # else: - # result_dict = self.result_dict_no_cutoffs - - # subfamilies = [] - - # for obs in result_dict.keys(): - - # if len(result_dict[obs]) == quotient.actions_at_observation[obs]: - # continue - - # subfamilies.append({"holes": quotient.observation_action_holes[obs], "restriction": result_dict[obs]}) - - # # debug - # #print(obs, subfamily.size, subfamily) - # #subfamilies_size += subfamily.size - - # # debug - # #print(subfamilies_size) - - # return subfamilies - - - def get_subfamilies_dict(self, restrictions, family): - - if len(restrictions) == 0: - return [] - - subfamilies = [] - - for i in range(len(restrictions)): - subfamily = [] - for j in range(i+1): - if i != j: - subfamily.append(restrictions[j]) - else: - actions = [action for action in family[restrictions[j]["hole"]].options if action not in restrictions[j]["restriction"]] - subfamily.append({"hole": restrictions[j]["hole"], "restriction": actions}) - - subfamilies.append(subfamily) - - for subfamily in subfamilies: - holes = [x["hole"] for x in subfamily] - - for obs in range(self.quotient.observations): - num_actions = self.quotient.actions_at_observation[obs] - act_obs_holes = self.quotient.observation_action_holes[obs] - for index in range(len(act_obs_holes)): - hole = act_obs_holes[index] - restriction = [] - if hole not in holes: - for action in range(num_actions): - if action in family[hole].options: - restriction.append(action) - subfamily.append({"hole": hole, "restriction": restriction}) - - return subfamilies - + # constructs the families given by the restrictions list def get_subfamilies(self, restrictions, family): subfamilies = [] @@ -794,6 +579,8 @@ def get_subfamilies(self, restrictions, family): return subfamilies + # returns True if the current best FSC from Storm requires more memory + # returns False otherwise def is_memory_needed(self): if len(self.memory_vector) == 0: return False @@ -805,7 +592,7 @@ def is_memory_needed(self): break return memory_needed - + # update all of the important data structures according to the current results def update_data(self): if self.paynt_bounds is None and self.storm_bounds is None: @@ -886,17 +673,8 @@ def get_belief_controller_size(self, storm_result, paynt_fsc_size=None): for action in actions: if action not in observation_actions[observation]: observation_actions[observation].append(action) - print(observation_actions) randomized_schedulers_size += sum(list([len(support) for support in observation_actions.values()])) * 3 - #debug - #print(non_frontier_states) - #print(belief_mc.nr_transitions) - #print(fsc_size) - #print(randomized_schedulers_size) - result_size = non_frontier_states + belief_mc.nr_transitions + fsc_size + randomized_schedulers_size return result_size - - diff --git a/paynt/synthesizer/synthesizer_ar_storm.py b/paynt/synthesizer/synthesizer_ar_storm.py index 91c7e1a48..bed775bad 100644 --- a/paynt/synthesizer/synthesizer_ar_storm.py +++ b/paynt/synthesizer/synthesizer_ar_storm.py @@ -8,7 +8,7 @@ import logging logger = logging.getLogger(__name__) - +# Abstraction Refinement + Storm splitting class SynthesizerARStorm(Synthesizer): # family exploration order: True = DFS, False = BFS @@ -31,15 +31,18 @@ class SynthesizerARStorm(Synthesizer): def method_name(self): return "AR" + # performs splitting in family according to Storm result + # main families contain only those actions that were considered by best found Storm FSC def storm_split(self, families): subfamilies = [] main_families = [] + # split each family in the current buffer to main family and corresponding subfamilies for family in families: if self.storm_control.use_cutoffs: - main_p = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict) + main_p = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict) else: - main_p = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict_no_cutoffs) + main_p = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict_no_cutoffs) if main_p is None: subfamilies.append(family) @@ -56,35 +59,28 @@ def storm_split(self, families): subfamilies.extend(subfamilies_p) logger.info(f"State after Storm splitting: Main families - {len(main_families)}, Subfamilies - {len(subfamilies)}") + + # if there are no main families we don't have to prioritize search if len(main_families) == 0: main_families = subfamilies subfamilies = [] + return main_families, subfamilies - def analyze_family_ar(self, family): - """ - :return (1) family feasibility (True/False/None) - :return (2) new satisfying assignment (or None) - """ - # logger.debug("analyzing family {}".format(family)) + def verify_family(self, family): self.quotient.build(family) self.stat.iteration_mdp(family.mdp.states) - res = family.mdp.check_specification(self.quotient.specification, constraint_indices = family.constraint_indices, short_evaluation = True) - #print(res.optimality_result.primary) family.analysis_result = res - #print(res) - #print(improving_assignment) - #print(improving_value, can_improve) if family.analysis_result.improving_value is not None: if self.saynt_timer is not None: print(f'-----------PAYNT----------- \ - \nValue = {improving_value} | Time elapsed = {round(self.saynt_timer.read(),1)}s | FSC size = {self.quotient.policy_size(family.analysis_result.improving_assignment)}\n', flush=True) + \nValue = {family.analysis_result.improving_value} | Time elapsed = {round(self.saynt_timer.read(),1)}s | FSC size = {self.quotient.policy_size(family.analysis_result.improving_assignment)}\n', flush=True) if self.storm_control.export_fsc_paynt is not None: makedirs(self.storm_control.export_fsc_paynt, exist_ok=True) with open(self.storm_control.export_fsc_paynt + "/paynt.fsc", "w") as text_file: @@ -93,49 +89,29 @@ def analyze_family_ar(self, family): else: self.stat.new_fsc_found(family.analysis_result.improving_value, family.analysis_result.improving_assignment, self.quotient.policy_size(family.analysis_result.improving_assignment)) self.quotient.specification.optimality.update_optimum(family.analysis_result.improving_value) - # print(res, can_improve) - # print(res.optimality_result.primary.result.get_values()) - - #print(res.optimality_result.primary) - #print(res.optimality_result.secondary) - #if res.optimality_result.primary.value > 20: - # can_improve = False - - if self.quotient.specification.optimality.optimum and can_improve and self.storm_pruning: + # storm pruning runs Storm POMDP over-approximation analysis and on the sub-POMDP given by a family + # this serves as a better abstraction for pruning, however is much more computationally intensive + if self.quotient.specification.optimality.optimum and family.analysis_result.can_improve and self.storm_pruning: family_pomdp = self.quotient.get_family_pomdp(family.mdp) - #print(family_pomdp) storm_res = StormPOMDPControl.storm_pomdp_analysis(family_pomdp, self.quotient.specification.stormpy_formulae()) - #print(storm_res.lower_bound) - #print(storm_res.upper_bound) - #print(storm_res.induced_mc_from_scheduler) - #print(storm_res.cutoff_schedulers[0]) - + # compare computed bounds to the current optimum to see if the family can be pruned if self.quotient.specification.optimality.minimizing: if self.quotient.specification.optimality.optimum <= storm_res.lower_bound: - can_improve = False - #print(self.quotient.specification.optimality.threshold) + family.analysis_result.can_improve = False logger.info(f"Used Storm result to prune a family with Storm value: {storm_res.lower_bound} compared to current optimum {self.quotient.specification.optimality.optimum}. Quotient MDP value: {res.optimality_result.primary.value}") - #else: - # logger.info(f"Storm result: {storm_res.lower_bound}. Lower bounds: {storm_res.upper_bound}. Quotient MDP value: {res.optimality_result.primary.value}") else: if self.quotient.specification.optimality.optimum >= storm_res.upper_bound: - can_improve = False - #print(self.quotient.specification.optimality.threshold) + family.analysis_result.can_improve = False logger.info(f"Used Storm result to prune a family with Storm value: {storm_res.upper_bound} compared to current optimum {self.quotient.specification.optimality.optimum}. Quotient MDP value: {res.optimality_result.primary.value}") - #else: - # logger.info(f"Storm result: {storm_res.upper_bound}. Lower bounds: {storm_res.lower_bound}. Quotient MDP value: {res.optimality_result.primary.value}") - - return can_improve, improving_assignment def synthesize_assignment(self, family): - #try: self.quotient.discarded = 0 satisfying_assignment = None @@ -143,7 +119,9 @@ def synthesize_assignment(self, family): while families: + # check whether PAYNT should be paused if self.s_queue is not None: + # if the queue is non empty, pause for PAYNT was requested if not self.s_queue.empty(): if satisfying_assignment is not None: self.storm_control.latest_paynt_result = satisfying_assignment @@ -154,18 +132,21 @@ def synthesize_assignment(self, family): logger.info("Pausing synthesis") self.s_queue.get() self.stat.synthesis_time.stop() + # check for the signal that PAYNT can be resumed or terminated while self.s_queue.empty(): sleep(1) status = self.s_queue.get() if status == "resume": logger.info("Resuming synthesis") if self.storm_control.is_storm_better: + # if the result found by Storm is better and needs more memory end the current synthesis and add memory if self.storm_control.is_memory_needed(): logger.info("Additional memory needed") return satisfying_assignment else: logger.info("Applying family split according to Storm results") families, self.subfamilies_buffer = self.storm_split(families) + # if Storm's result is not better continue with the synthesis normally else: logger.info("PAYNT's value is better. Prioritizing synthesis results") self.stat.synthesis_time.start() @@ -174,24 +155,21 @@ def synthesize_assignment(self, family): logger.info("Terminating controller synthesis") return satisfying_assignment - #print(len(families)) - if SynthesizerARStorm.exploration_order_dfs: family = families.pop(-1) else: family = families.pop(0) - #print(family) - # simulate sequential family.parent_info = None - can_improve,improving_assignment = self.analyze_family_ar(family) - if improving_assignment is not None: - satisfying_assignment = improving_assignment - #print(satisfying_assignment) - if can_improve == False: + self.verify_family(family) + if family.analysis_result.improving_assignment is not None: + satisfying_assignment = family.analysis_result.improving_assignment + # family can be pruned + if family.analysis_result.can_improve == False: self.explore(family) + # if there are no more families in the main buffer coninue the exploration in the subfamilies if not families and self.subfamilies_buffer: logger.info("Main family synthesis done") logger.info(f"Subfamilies buffer contains: {len(self.subfamilies_buffer)} families") @@ -199,16 +177,9 @@ def synthesize_assignment(self, family): self.subfamilies_buffer = [] continue - #print("split", family) # undecided subfamilies = self.quotient.split(family, Synthesizer.incomplete_search) families = families + subfamilies - #except: - # if satisfying_assignment: - # extracted_result = self.quotient.extract_policy(satisfying_assignment) - # print(satisfying_assignment) - # print(extracted_result) - # exit() return satisfying_assignment diff --git a/paynt/synthesizer/synthesizer_pomdp.py b/paynt/synthesizer/synthesizer_pomdp.py index 94649cacd..540b74835 100644 --- a/paynt/synthesizer/synthesizer_pomdp.py +++ b/paynt/synthesizer/synthesizer_pomdp.py @@ -84,7 +84,7 @@ def __init__(self, quotient, method, storm_control): self.storm_control.pomdp = self.quotient.pomdp self.storm_control.spec_formulas = self.quotient.specification.stormpy_formulae() self.synthesis_terminate = False - self.synthesizer = SynthesizerARStorm + self.synthesizer = SynthesizerARStorm # SAYNT only works with abstraction refinement if self.storm_control.iteration_timeout is not None: self.saynt_timer = Timer() self.synthesizer.saynt_timer = self.saynt_timer @@ -100,13 +100,12 @@ def synthesize(self, family, print_stats = True): assignment = synthesizer.synthesize(family) if print_stats: synthesizer.print_stats() - #print(assignment) self.total_iters += synthesizer.stat.iterations_mdp # Print extract list for every iteration optimum - if assignment: - extracted_result = self.quotient.extract_policy(assignment) - #print(extracted_result) + # if assignment: + # extracted_result = self.quotient.extract_policy(assignment) + # print(extracted_result) return assignment @@ -129,16 +128,19 @@ def strategy_iterative_storm(self, unfold_imperfect_only, unfold_storm=True): if mem_size > 1: obs_memory_dict = {} if self.storm_control.is_storm_better: + # Storm's result is better and it needs memory if self.storm_control.is_memory_needed(): obs_memory_dict = self.storm_control.memory_vector logger.info(f'Added memory nodes for observation based on Storm data') else: + # consider the cut-off schedulers actions when updating memory if self.unfold_cutoff: for obs in range(self.quotient.observations): if obs in self.storm_control.result_dict: obs_memory_dict[obs] = self.quotient.observation_memory_size[obs] + 1 else: obs_memory_dict[obs] = self.quotient.observation_memory_size[obs] + # only consider the induced DTMC without cut-off states else: for obs in range(self.quotient.observations): if obs in self.storm_control.result_dict_no_cutoffs: @@ -153,7 +155,6 @@ def strategy_iterative_storm(self, unfold_imperfect_only, unfold_storm=True): else: obs_memory_dict[obs] = 1 logger.info(f'Increase memory in all imperfect observation') - print(obs_memory_dict) self.quotient.set_memory_from_dict(obs_memory_dict) else: logger.info("Synthesizing optimal k={} controller ...".format(mem_size) ) @@ -164,34 +165,29 @@ def strategy_iterative_storm(self, unfold_imperfect_only, unfold_storm=True): family = self.quotient.design_space + # if Storm's result is better, use it to obtain main family that considers only the important actions if self.storm_control.is_storm_better: + # consider the cut-off schedulers actions if self.storm_control.use_cutoffs: - main_family = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict) + main_family = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict) if self.incomplete_exploration == True: subfamily_restrictions = [] else: subfamily_restrictions = self.storm_control.get_subfamilies_restrictions(family, self.storm_control.result_dict) + # only consider the induced DTMC actions without cut-off states else: - main_family = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict_no_cutoffs) + main_family = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict_no_cutoffs) if self.incomplete_exploration == True: subfamily_restrictions = [] else: subfamily_restrictions = self.storm_control.get_subfamilies_restrictions(family, self.storm_control.result_dict_no_cutoffs) subfamilies = self.storm_control.get_subfamilies(subfamily_restrictions, family) + # if PAYNT is better continue normally else: main_family = family subfamilies = [] - # debug - print(self.storm_control.result_dict, "\n") - print(self.storm_control.result_dict_no_cutoffs) - #print(main_family) - #print(subfamily_restrictions) - #print(subfamilies) - #print(main_family.size) - #break - self.synthesizer.subfamilies_buffer = subfamilies self.synthesizer.unresticted_family = family @@ -212,6 +208,7 @@ def strategy_iterative_storm(self, unfold_imperfect_only, unfold_storm=True): #break + # main SAYNT loop def iterative_storm_loop(self, timeout, paynt_timeout, storm_timeout, iteration_limit=0): self.interactive_queue = Queue() self.synthesizer.s_queue = self.interactive_queue @@ -246,14 +243,12 @@ def iterative_storm_loop(self, timeout, paynt_timeout, storm_timeout, iteration_ print("\n------------------------------------\n") print("PAYNT results: ") - #print(self.storm_control.latest_paynt_result) print(self.storm_control.paynt_bounds) print("controller size: {}".format(self.storm_control.paynt_fsc_size)) print() print("Storm results: ") - #print(self.storm_control.latest_storm_result.induced_mc_from_scheduler) print(self.storm_control.storm_bounds) print("controller size: {}".format(self.storm_control.belief_controller_size)) print("\n------------------------------------\n") @@ -271,6 +266,7 @@ def iterative_storm_loop(self, timeout, paynt_timeout, storm_timeout, iteration_ self.saynt_timer.stop() + # run PAYNT POMDP synthesis with a given timeout def run_synthesis_timeout(self, timeout): self.interactive_queue = Queue() self.synthesizer.s_queue = self.interactive_queue @@ -290,7 +286,7 @@ def run_synthesis_timeout(self, timeout): paynt_thread.join() - # iterative strategy using Storm analysis to enhance the synthesis + # PAYNT POMDP synthesis that uses pre-computed results from Storm as guide def strategy_storm(self, unfold_imperfect_only, unfold_storm=True): ''' @param unfold_imperfect_only if True, only imperfect observations will be unfolded @@ -316,12 +312,14 @@ def strategy_storm(self, unfold_imperfect_only, unfold_storm=True): obs_memory_dict = self.storm_control.memory_vector logger.info(f'Added memory nodes for observation based on Storm data') else: + # consider the cut-off schedulers actions when updating memory if self.unfold_cutoff: for obs in range(self.quotient.observations): if obs in self.storm_control.result_dict: obs_memory_dict[obs] = self.quotient.observation_memory_size[obs] + 1 else: obs_memory_dict[obs] = self.quotient.observation_memory_size[obs] + # only consider the induced DTMC without cut-off states else: for obs in range(self.quotient.observations): if obs in self.storm_control.result_dict_no_cutoffs: @@ -336,7 +334,6 @@ def strategy_storm(self, unfold_imperfect_only, unfold_storm=True): else: obs_memory_dict[obs] = 1 logger.info(f'Increase memory in all imperfect observation') - print(obs_memory_dict) self.quotient.set_memory_from_dict(obs_memory_dict) else: logger.info("Synthesizing optimal k={} controller ...".format(mem_size) ) @@ -347,33 +344,29 @@ def strategy_storm(self, unfold_imperfect_only, unfold_storm=True): family = self.quotient.design_space + # if Storm's result is better, use it to obtain main family that considers only the important actions if self.storm_control.is_storm_better: + # consider the cut-off schedulers actions if self.storm_control.use_cutoffs: - main_family = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict) + main_family = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict) if self.incomplete_exploration == True: subfamily_restrictions = [] else: subfamily_restrictions = self.storm_control.get_subfamilies_restrictions(family, self.storm_control.result_dict) + # only consider the induced DTMC actions without cut-off states else: - main_family = self.storm_control.get_main_restricted_family_new(family, self.storm_control.result_dict_no_cutoffs) + main_family = self.storm_control.get_main_restricted_family(family, self.storm_control.result_dict_no_cutoffs) if self.incomplete_exploration == True: subfamily_restrictions = [] else: subfamily_restrictions = self.storm_control.get_subfamilies_restrictions(family, self.storm_control.result_dict_no_cutoffs) subfamilies = self.storm_control.get_subfamilies(subfamily_restrictions, family) + # if PAYNT is better continue normally else: main_family = family subfamilies = [] - # debug - print(self.storm_control.result_dict, "\n") - print(self.storm_control.result_dict_no_cutoffs) - #print(main_family) - #print(subfamilies) - #print(main_family.size) - #break - self.synthesizer.subfamilies_buffer = subfamilies self.synthesizer.unresticted_family = family @@ -907,43 +900,42 @@ def strategy_expected_uai(self): - def run(self, parallel=False): + def run(self): # choose the synthesis strategy: - if self.use_storm: - logger.info("Storm pomdp option enabled") + logger.info("Storm POMDP option enabled") logger.info("Storm settings: iterative - {}, get_storm_result - {}, storm_options - {}, prune_storm - {}, unfold_strategy - {}, use_storm_cutoffs - {}".format( (self.storm_control.iteration_timeout, self.storm_control.paynt_timeout, self.storm_control.storm_timeout), self.storm_control.get_result, self.storm_control.storm_options, self.incomplete_exploration, (self.unfold_storm, self.unfold_cutoff), self.storm_control.use_cutoffs )) - + # start SAYNT if self.storm_control.iteration_timeout is not None: self.iterative_storm_loop(timeout=self.storm_control.iteration_timeout, paynt_timeout=self.storm_control.paynt_timeout, storm_timeout=self.storm_control.storm_timeout, iteration_limit=0) + # run PAYNT for a time given by 'self.storm_control.get_result' and then run Storm using the best computed FSC at cut-offs elif self.storm_control.get_result is not None: - #TODO probably strategy_storm with unfold storm False if self.storm_control.get_result: self.run_synthesis_timeout(self.storm_control.get_result) self.storm_control.run_storm_analysis() + # run Storm and then use the obtained result to enhance PAYNT synthesis else: self.storm_control.get_storm_result() self.strategy_storm(unfold_imperfect_only=True, unfold_storm=self.unfold_storm) print("\n------------------------------------\n") print("PAYNT results: ") - #print(self.storm_control.latest_paynt_result) print(self.storm_control.paynt_bounds) print("controller size: {}".format(self.storm_control.paynt_fsc_size)) print() print("Storm results: ") - #print(self.storm_control.latest_storm_result.induced_mc_from_scheduler) print(self.storm_control.storm_bounds) print("controller size: {}".format(self.storm_control.belief_controller_size)) print("\n------------------------------------\n") + # Pure PAYNT POMDP synthesis else: # self.strategy_expected_uai() # self.strategy_iterative(unfold_imperfect_only=False) diff --git a/paynt/utils/storm_parallel.py b/paynt/utils/storm_parallel.py deleted file mode 100644 index a734e02b1..000000000 --- a/paynt/utils/storm_parallel.py +++ /dev/null @@ -1,44 +0,0 @@ -from ast import Pass -from unittest import result -import stormpy -import stormpy.synthesis -import stormpy.pomdp - -import time - -from multiprocessing import Process, SimpleQueue - - -import logging -logger = logging.getLogger(__name__) - -# TODO JUST A CONCEPT WE WILL TRY ITERATIVE METHOD FIRST -# class for managing PAYNT and Storm parallel processes -class ParallelControl: - - def __init__(self, synthesizer, storm_control): - self.synthesizer = synthesizer # PAYNT - self.storm_control = storm_control # STORM - - def run(self): - storm_queue = SimpleQueue() - paynt_queue = SimpleQueue() - - self.synthesizer.synthesizer.s_queue = paynt_queue - self.storm_control.s_queue = storm_queue - - storm_process = Process(target=self.storm_control.get_storm_result) - storm_process.start() - - paynt_process = Process(target=self.synthesizer.run, args=(True,)) - paynt_process.start() - - storm_process.join() - - storm_result = storm_queue.get() - paynt_queue.put(storm_result) - - paynt_process.join() - - - #s_queue.close()