Skip to content

Commit

Permalink
разобрался с конфилкатми
Browse files Browse the repository at this point in the history
  • Loading branch information
TreshMom committed Apr 18, 2024
1 parent bcbefae commit 282d451
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 177 deletions.
74 changes: 0 additions & 74 deletions project/task4.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,77 +51,3 @@ def diagonalized(mat):
last_nnz = hash(str(k))

return result


from project.task3 import FiniteAutomaton
from scipy.sparse import dok_matrix, block_diag


def reachability_with_constraints(
finite_automaton: FiniteAutomaton,
constraints_automaton: FiniteAutomaton,
start_to_end_enable: bool = True,
) -> dict[int, set[int]]:
matrices = {}

common_labels = (
finite_automaton.func_to_steps.keys()
& constraints_automaton.func_to_steps.keys()
)
constraints_height, automaton_height = len(
constraints_automaton.map_index_to_state
), len(finite_automaton.map_index_to_state)

for label in common_labels:
constraints_matrix = constraints_automaton.func_to_steps[label]
automaton_matrix = finite_automaton.func_to_steps[label]
matrices[label] = block_diag((constraints_matrix, automaton_matrix))

height = constraints_height
width = constraints_height + automaton_height

reachable_states = {
state.value: set() for state in finite_automaton.map_index_to_state
}

def diagonalize_matrix(matrix):
height = matrix.shape[0]
result = dok_matrix(matrix.shape, dtype=bool)

for i in range(height):
for j in range(height):
if matrix[j, i]:
result[i] += matrix[j]

return result

for start_state in finite_automaton.start_states:
frontier = dok_matrix((height, width), dtype=bool)
for constraints_start_state in constraints_automaton.start_states:
frontier[constraints_start_state, constraints_start_state] = True

for i in range(height):
frontier[i, start_state + constraints_height] = True

for _ in range(constraints_height * automaton_height):
new_frontier = dok_matrix((height, width), dtype=bool)
for label in common_labels:
new_frontier += diagonalize_matrix(frontier @ matrices[label])
frontier = new_frontier
for i in range(height):
if i in constraints_automaton.final_states and frontier[i, i]:
for j in range(automaton_height):
if (
j in finite_automaton.final_states
and frontier[i, j + constraints_height]
):
if (
start_to_end_enable
or finite_automaton.map_index_to_state[start_state]
!= finite_automaton.map_index_to_state[j]
):
reachable_states[
finite_automaton.map_index_to_state[start_state]
].add(finite_automaton.map_index_to_state[j])

return reachable_states
102 changes: 0 additions & 102 deletions project/task6.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,105 +78,3 @@ def cfpq_with_hellings(
}

return final_result


from pyformlang.cfg import CFG
from collections import defaultdict
from collections import deque
from typing import Deque
from typing import Dict
from typing import List
from typing import Set
from typing import Tuple

import pyformlang
import networkx as nx


def cfg_to_weak_normal_form(cfg: CFG) -> CFG:
cfg1 = cfg.eliminate_unit_productions().remove_useless_symbols()
tmp = cfg1._get_productions_with_only_single_terminals()
new_prod = cfg1._decompose_productions(tmp)
return CFG(start_symbol=cfg1.start_symbol, productions=new_prod)


def cfg_from_file(path: str) -> CFG:
with open(path) as f:
return CFG.from_text(f.read())


def _cfpq_with_hellings(
cfg: pyformlang.cfg.CFG, graph: nx.DiGraph
) -> Set[Tuple[int, pyformlang.cfg.Variable, int]]:
cfg = cfg_to_weak_normal_form(cfg)

var_production_body_to_head: Dict[
Tuple[pyformlang.cfg.Variable, pyformlang.cfg.Variable],
Set[pyformlang.cfg.Variable],
] = defaultdict(set)

result: Set[Tuple[int, pyformlang.cfg.Variable, int]] = set()

# dictionaries letting to iterate over `result` elements
# with specified first or last component
start_to_var_and_finish_pairs: Dict[
int, Set[Tuple[pyformlang.cfg.Variable, int]]
] = defaultdict(set)
finish_to_start_and_var_pairs: Dict[
int, Set[Tuple[int, pyformlang.cfg.Variable]]
] = defaultdict(set)

unhandled: Deque[Tuple[int, pyformlang.cfg.Variable, int]] = deque()

def register_reachability(start, var, finish):
triple = (start, var, finish)
if triple not in result:
result.add(triple)
unhandled.append(triple)
start_to_var_and_finish_pairs[start].add((var, finish))
finish_to_start_and_var_pairs[finish].add((start, var))

edges_grouped_by_label: Dict[str, List[Tuple[int, int]]] = defaultdict(list)
for start, finish, attributes in graph.edges.data():
edges_grouped_by_label[attributes["label"]].append((start, finish))

for production in cfg.productions:
match production.body:
case [] | [pyformlang.cfg.Epsilon()]:
for node in graph.nodes:
register_reachability(node, production.head, node)
case [pyformlang.cfg.Terminal() as terminal]:
for start, finish in edges_grouped_by_label[terminal.value]:
register_reachability(start, production.head, finish)
case [pyformlang.cfg.Variable() as var1, pyformlang.cfg.Variable() as var2]:
var_production_body_to_head[(var1, var2)].add(production.head)

while unhandled:
(node1, var1, node2) = unhandled.popleft()
for start, var, finish in [
(node0, var, node2)
for (node0, var0) in finish_to_start_and_var_pairs[node1]
for var in var_production_body_to_head[(var0, var1)]
] + [
(node1, var, node3)
for (var2, node3) in start_to_var_and_finish_pairs[node2]
for var in var_production_body_to_head[(var1, var2)]
]:
register_reachability(start, var, finish)
return result


def cfpq_with_hellings(
cfg: pyformlang.cfg.CFG,
graph: nx.DiGraph,
start_nodes: Set[int] = None,
final_nodes: Set[int] = None,
) -> set[Tuple[int, int]]:
return {
(start, finish)
for (start, var, finish) in _cfpq_with_hellings(cfg, graph)
if var == cfg.start_symbol
and (start_nodes is None or start in start_nodes)
and (final_nodes is None or finish in final_nodes)
and start != finish
}
1 change: 0 additions & 1 deletion tests/autotests/test_task6.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def test_rpq_cfpq(self, graph, regex_str, cfg_list) -> None:
rpq: dict[int, set[int]] = reachability_with_constraints(
FiniteAutomaton(graph_to_nfa(graph, start_nodes, final_nodes)),
FiniteAutomaton(regex_to_dfa(regex_str)),
start_to_end_enable=False,
)
rpq_set = set()
for node_from, nodes_to in rpq.items():
Expand Down

0 comments on commit 282d451

Please sign in to comment.