Skip to content

Commit

Permalink
change specification to the minimizing reward
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Andriushchenko committed Nov 23, 2023
1 parent 2c14e1a commit 1e7a389
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 64 deletions.
33 changes: 12 additions & 21 deletions paynt/quotient/pomdp_family.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ def __init__(self, quotient_mdp, coloring, specification, obs_evaluator):
# POMDP manager used for unfolding the memory model into the quotient POMDP
self.product_pomdp_fsc = None

assert not self.specification.has_optimality, \
"expecting specification without the optimality objective"

self.action_labels,self.choice_to_action,state_to_actions = \
paynt.quotient.mdp_family.MdpFamilyQuotientContainer.extract_choice_labels(self.quotient_mdp)

Expand Down Expand Up @@ -186,18 +183,12 @@ def state_to_observation(self):
def observation_is_trivial(self, obs):
return len(self.observation_to_actions[obs])==1

def extract_target_label(self):
spec = self.specification
assert not spec.has_optimality and spec.num_properties == 1, "expecting a single property"
prop = spec.constraints[0]
label = str(prop.formula.subformula.subformula)
return label


def initialize_fsc_unfolder(self, fsc_is_deterministic=False):
if fsc_is_deterministic:
if fsc_is_deterministic and not isinstance(self.product_pomdp_fsc, stormpy.synthesis.ProductPomdpFsc):
self.product_pomdp_fsc = stormpy.synthesis.ProductPomdpFsc(
self.quotient_mdp, self.state_to_observation, self.num_actions, self.choice_to_action)
else:
if not fsc_is_deterministic and not isinstance(self.product_pomdp_fsc, stormpy.synthesis.ProductPomdpRandomizedFsc):
self.product_pomdp_fsc = stormpy.synthesis.ProductPomdpRandomizedFsc(
self.quotient_mdp, self.state_to_observation, self.num_actions, self.choice_to_action)

Expand All @@ -212,13 +203,14 @@ def build_pomdp(self, family):
return SubPomdp(pomdp,self,state_map,choice_map)


def build_dtmc_sketch(self, fsc, negate_specification=False):

This comment has been minimized.

Copy link
@randriu

randriu Nov 23, 2023

Owner

When building a DTMC sketch, negating the specification now has no meaning

def build_dtmc_sketch(self, fsc):
'''
Construct the family of DTMCs representing the execution of the given FSC in different environments.
:param negate_specification if True, a negated specification will be associated with the sketch
'''

# create the product
fsc.check(self.observation_to_actions)
self.initialize_fsc_unfolder(fsc.is_deterministic)
self.product_pomdp_fsc.apply_fsc(fsc.action_function, fsc.update_function)
product = self.product_pomdp_fsc.product
product_choice_to_choice = self.product_pomdp_fsc.product_choice_to_choice
Expand All @@ -240,9 +232,6 @@ def build_dtmc_sketch(self, fsc, negate_specification=False):

# handle specification
product_specification = self.specification.copy()
if negate_specification:
product_specification = product_specification.negate()

dtmc_sketch = paynt.quotient.quotient.DtmcQuotientContainer(product, product_coloring, product_specification)
return dtmc_sketch

Expand All @@ -254,7 +243,7 @@ def translate_path_to_trace(self, dtmc_sketch, dtmc, path):
product_choice = dtmc.quotient_choice_map[dtmc_state]
choice = self.product_pomdp_fsc.product_choice_to_choice[product_choice]
if choice == invalid_choice:
# randomized FSC: we are in the intermediate states, continue to the next one
# randomized FSC: we are in the intermediate state, move on to the next one
continue

product_state = dtmc.quotient_state_map[dtmc_state]
Expand Down Expand Up @@ -291,9 +280,11 @@ def compute_witnessing_traces(self, dtmc_sketch, satisfying_assignment, num_trac

# assuming a single probability reachability property
spec = dtmc_sketch.specification
assert not spec.has_optimality and spec.num_properties == 1 and not spec.constraints[0].reward, \
"expecting a single reachability probability constraint"
prop = dtmc_sketch.specification.constraints[0]
assert spec.num_properties == 1, "expecting a single property"
prop = spec.all_properties()[0]
if prop.is_reward:
logger.warning("WARNING: specification is a reward property, but generated traces \
will be based on transition probabilities")

target_label = self.extract_target_label()
target_states = dtmc.model.labeling.get_states(target_label)
Expand Down
71 changes: 28 additions & 43 deletions paynt_pomdp_sketch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# using PAYNT for POMDP sketches
# using Paynt for POMDP sketches

import paynt.cli
import paynt.parser.sketch

import paynt.quotient.pomdp_family
import paynt.synthesizer.synthesizer_all

import os
import random
Expand All @@ -14,28 +12,33 @@ def load_sketch(project_path):
sketch_path = os.path.join(project_path, "sketch.templ")
properties_path = os.path.join(project_path, "sketch.props")
pomdp_sketch = paynt.parser.sketch.Sketch.load_sketch(sketch_path, properties_path)

This comment has been minimized.

Copy link
@randriu

randriu Nov 23, 2023

Owner

Upon building the quotient for the family of POMDPs, we check whether all deadlock states are the target ones

target_label = pomdp_sketch.extract_target_label()
target_states = pomdp_sketch.quotient_mdp.labeling.get_states(target_label)
for state in pomdp_sketch.quotient_mdp.labeling.get_states("deadlock"):
assert state in target_states, "have deadlock in a non-target state {}".format(state)

return pomdp_sketch


def investigate_hole_assignment(pomdp_sketch, hole_assignment, fsc_is_deterministic):
def investigate_hole_assignment(pomdp_sketch, hole_assignment):
print("investigating hole assignment: ", hole_assignment)
pomdp = pomdp_sketch.build_pomdp(hole_assignment)
state_is_absorbing = pomdp_sketch.identify_absorbing_states(pomdp.model)
print("trivial observations: ", [obs for obs in range(pomdp_sketch.num_observations) if pomdp_sketch.observation_is_trivial(obs)])

# return a random k-FSC
# create a random k-FSC
fsc_is_deterministic = True
num_nodes = 3
fsc = paynt.quotient.pomdp_family.FSC(num_nodes, pomdp_sketch.num_observations, fsc_is_deterministic)
fsc.fill_trivial_actions(pomdp_sketch.observation_to_actions)
# fsc.fill_trivial_updates(pomdp_sketch.observation_to_actions)
for node in range(num_nodes):
for obs in range(pomdp_sketch.num_observations):
# random deterministic FSC
available_actions = pomdp_sketch.observation_to_actions[obs]
if fsc.is_deterministic:
fsc.action_function[node][obs] = random.choice(pomdp_sketch.observation_to_actions[obs])
# random deterministic FSC
fsc.action_function[node][obs] = random.choice(available_actions)
else:
# random randomized FSC
available_actions = pomdp_sketch.observation_to_actions[obs]
num_samples = min(len(available_actions),node+1)
sampled_actions = random.sample(available_actions,num_samples)
prob = 1/num_samples
Expand All @@ -47,19 +50,15 @@ def investigate_hole_assignment(pomdp_sketch, hole_assignment, fsc_is_determinis
return fsc


# enable PAYNT logging
# random.seed(42)
paynt.cli.setup_logger()

# need to specify beforehand whether FSCs will be deterministic or not
# since we initialize the FSC unfolder beforehand
fsc_is_deterministic = False
# enable PAYNT logging
paynt.cli.setup_logger()

# load sketch
project_path="models/pomdp/sketches/obstacles"
# project_path="models/pomdp/sketches/avoid"
pomdp_sketch = load_sketch(project_path)

This comment has been minimized.

Copy link
@randriu

randriu Nov 23, 2023

Owner

We no longer need to initialize the unfolder beforehand: when applying an FSC, Paynt will detect whether the applied FSC is deterministic or not and, if necessary, switch to the correct unfolder

pomdp_sketch.initialize_fsc_unfolder(fsc_is_deterministic)
print("specification: ", pomdp_sketch.specification)
print("design space:\n", pomdp_sketch.design_space)
print("number of holes: ", pomdp_sketch.design_space.num_holes)
Expand All @@ -70,31 +69,17 @@ def investigate_hole_assignment(pomdp_sketch, hole_assignment, fsc_is_determinis
hole_assignment = pomdp_sketch.design_space.assume_options_copy(hole_options)

# investigate this hole assignment and return an FSC
fsc = investigate_hole_assignment(pomdp_sketch, hole_assignment, fsc_is_deterministic)

# investigate this FSC and obtain a list of families of POMDPs for which this FSC is violating
# apply FSC to a PODMP sketch, obtaining a DTMC sketch
# we are negating the specification since we are looking for violating DTMCs
dtmc_sketch = pomdp_sketch.build_dtmc_sketch(fsc, negate_specification=True)
synthesizer = paynt.synthesizer.synthesizer_all.SynthesizerAll(dtmc_sketch)
violating_families = synthesizer.synthesize()
if not violating_families:
print("all POMDPs were satisfied")
exit()

print("found {} violating families, printing below:".format(len(violating_families)))
print([str(family) for family in violating_families])

# pick 1 assignment and generate violating traces
# pick first
# violating_assignment = violating_families[0].pick_any()
# pick random assignment from random family
violating_assignment = random.choice(violating_families).pick_random()

num_violating_traces = 5
trace_max_length = 11 # length of a trace in a DTMC with unreachable target
violating_traces = pomdp_sketch.compute_witnessing_traces(dtmc_sketch,violating_assignment,num_violating_traces,trace_max_length)
print("constructed {} violating traces, printing below:".format(len(violating_traces)))
for trace in violating_traces:
print(trace)
fsc = investigate_hole_assignment(pomdp_sketch, hole_assignment)

# apply this FSC to a POMDP sketch, obtaining a DTMC sketch
dtmc_sketch = pomdp_sketch.build_dtmc_sketch(fsc)

# to each (sub-family of) environment(s), assign a value corresponding to the minimum specification satisfiability
# TODO

# pick the worst environment
# TODO
worst_family = pomdp_sketch.design_space.pick_random()

print("printing the worst family below:")
print(worst_family)

0 comments on commit 1e7a389

Please sign in to comment.