diff --git a/paynt/synthesizer/statistic.py b/paynt/synthesizer/statistic.py index 11b391aed..716c310fa 100644 --- a/paynt/synthesizer/statistic.py +++ b/paynt/synthesizer/statistic.py @@ -203,7 +203,7 @@ def get_summary_synthesis(self): return f"feasible: {feasible}" def get_summary_evaluation(self): - if not type(self.family_to_evaluation[0] == paynt.synthesizer.synthesizer.FamilyEvaluation): + if not type(self.family_to_evaluation[0]) == paynt.synthesizer.synthesizer.FamilyEvaluation: return "" members_sat = sum( [family.size for family,evaluation in self.family_to_evaluation if evaluation.sat ]) members_total = self.quotient.design_space.size diff --git a/paynt/synthesizer/synthesizer.py b/paynt/synthesizer/synthesizer.py index 61e60bc39..dded1fafd 100644 --- a/paynt/synthesizer/synthesizer.py +++ b/paynt/synthesizer/synthesizer.py @@ -102,7 +102,14 @@ def synthesize_one(self, family): ''' to be overridden ''' pass - def synthesize(self, family=None, optimum_threshold=None, print_stats=True): + def synthesize(self, family=None, optimum_threshold=None, return_all=False, print_stats=True): + ''' + :param family family of assignment to search in + :param optimum_threshold known value of the optimum value + :param return_all if True and the synthesis returns a family, all assignments will be returned instead of an + arbitrary one + :param print_stats if True, synthesis stats will be printed upon completion + ''' if family is None: family = self.quotient.design_space family.constraint_indices = self.quotient.specification.all_constraint_indices() @@ -114,14 +121,13 @@ def synthesize(self, family=None, optimum_threshold=None, print_stats=True): logger.info("synthesis initiated, design space: {}".format(family.size)) self.stat.start(family) assignment = self.synthesize_one(family) - if assignment is not None and assignment.size > 1: + if assignment is not None and assignment.size > 1 and not return_all: assignment = assignment.pick_any() self.stat.finished_synthesis(assignment) - logger.info("synthesis finished") + logger.info("synthesis finished, printing synthesized assignment below:") + logger.info(assignment) - if assignment is not None: - logger.info("printing synthesized assignment below:") - logger.info(assignment) + if assignment is not None and assignment.size == 1: model = self.quotient.build_assignment(assignment) mc_result = model.check_specification(self.quotient.specification) logger.info(f"double-checking specification satisfiability: {mc_result}") diff --git a/paynt/verification/property.py b/paynt/verification/property.py index 3d1f3dfc0..0eaedb42f 100644 --- a/paynt/verification/property.py +++ b/paynt/verification/property.py @@ -137,7 +137,6 @@ def can_be_improved(self): def negate(self): negated_formula = self.property.raw_formula.clone() - prop_negated = self.copy() negated_formula.comparison_type = { stormpy.ComparisonType.LESS: stormpy.ComparisonType.GEQ, stormpy.ComparisonType.LEQ: stormpy.ComparisonType.GREATER, @@ -241,7 +240,15 @@ def can_be_improved(self): return not( not self.reward and self.minimizing and self.threshold == 0 ) def negate(self): - raise TypeError("negation of optimality properties is not supported") + negated_formula = self.property.raw_formula.clone() + negate_optimality_type = { + stormpy.OptimizationDirection.Minimize: stormpy.OptimizationDirection.Maximize, + stormpy.OptimizationDirection.Maximize: stormpy.OptimizationDirection.Minimize + }[negated_formula.optimality_type] + negated_formula.set_optimality_type(negate_optimality_type) + stormpy_property_negated = stormpy.core.Property(self.property.name, negated_formula) + property_negated = OptimalityProperty(stormpy_property_negated,self.discount_factor,self.epsilon) + return property_negated class DoubleOptimalityProperty(OptimalityProperty): diff --git a/paynt_pomdp_sketch.py b/paynt_pomdp_sketch.py index 35522428d..08471c3ef 100644 --- a/paynt_pomdp_sketch.py +++ b/paynt_pomdp_sketch.py @@ -52,20 +52,56 @@ def investigate_hole_assignment(pomdp_sketch, hole_assignment): return fsc +def compute_qvalues(pomdp_sketch, dtmc_sketch): + print() + subfamily = dtmc_sketch.design_space + dtmc_sketch.build(subfamily) + print("computing Q-values...") + qvalues = pomdp_sketch.compute_qvalues_for_product_submdp(subfamily.mdp) + print("Q-values computed") + +def evaluate_all(dtmc_sketch): + ''' To each singleton environment, assign a value corresponding to the specification satisfiability. ''' + print() + synthesizer = paynt.synthesizer.synthesizer_onebyone.SynthesizerOneByOne(dtmc_sketch) + family_to_value = synthesizer.evaluate(keep_value_only=True) + + # pick the worst family + import numpy + values = numpy.array([value for family,value in family_to_value]) + if dtmc_sketch.get_property().minimizing: + worst_index = values.argmax() + else: + worst_index = values.argmin() + + worst_family,worst_value = family_to_value[worst_index] + print("the worst family has value {}, printing the family below:".format(worst_value)) + print(worst_family) + +def synthesize_worst(dtmc_sketch): + '''Identify assignment with the worst specification satisfiability.''' + print() + dtmc_sketch.specification = dtmc_sketch.specification.negate() + dtmc_sketch.specification.optimality.epsilon = 0.05 + synthesizer = paynt.synthesizer.synthesizer_ar.SynthesizerAR(dtmc_sketch) + worst_assignment = synthesizer.synthesize(return_all=True) + + profiling = False if profiling: profiler = cProfile.Profile() profiler.enable() -# random.seed(42) + +# random.seed(11) # enable PAYNT logging paynt.cli.setup_logger() # load sketch -# project_path="models/pomdp/sketches/obstacles" -project_path="models/pomdp/sketches/avoid" +project_path="models/pomdp/sketches/obstacles" +# project_path="models/pomdp/sketches/avoid" pomdp_sketch = load_sketch(project_path) print("specification: ", pomdp_sketch.specification) print("design space:\n", pomdp_sketch.design_space) @@ -77,33 +113,13 @@ def investigate_hole_assignment(pomdp_sketch, hole_assignment): # investigate this hole assignment and return an FSC fsc = investigate_hole_assignment(pomdp_sketch, hole_assignment) -dtmc_sketch = pomdp_sketch.build_dtmc_sketch(fsc) - -## Q-values computation -if False: - # apply this FSC to a POMDP sketch, obtaining a DTMC sketch - subfamily = dtmc_sketch.design_space - dtmc_sketch.build(subfamily) - qvalues = pomdp_sketch.compute_qvalues_for_product_submdp(subfamily.mdp) - -## evaluation -if False: - # to each singleton environment, assign a value corresponding to the specification satisfiability - synthesizer = paynt.synthesizer.synthesizer_onebyone.SynthesizerOneByOne(dtmc_sketch) - family_to_value = synthesizer.evaluate(keep_value_only=True, print_stats=False) - - # pick the worst family - import numpy - values = numpy.array([value for family,value in family_to_value]) - if dtmc_sketch.get_property().minimizing: - worst_index = values.argmax() - else: - worst_index = values.argmin() +# apply this FSC to a POMDP sketch, obtaining a DTMC sketch +dtmc_sketch = pomdp_sketch.build_dtmc_sketch(fsc) - worst_family,worst_value = family_to_value[worst_index] - print("the worst family has value {}, printing it below:".format(worst_value)) - print(worst_family) +# compute_qvalues(pomdp_sketch, dtmc_sketch) +# evaluate_all(dtmc_sketch) +# synthesize_worst(dtmc_sketch) if profiling: profiler.disable()