-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
156 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,115 +1,136 @@ | ||
from pyformlang.finite_automaton import ( | ||
DeterministicFiniteAutomaton, | ||
NondeterministicFiniteAutomaton, | ||
State, | ||
) | ||
from scipy.sparse import dok_matrix, kron | ||
from networkx import MultiDiGraph | ||
import scipy as sp | ||
from typing import Iterable | ||
from pyformlang.finite_automaton import * | ||
from project.task2 import graph_to_nfa, regex_to_dfa | ||
from typing import Dict | ||
from pyformlang.finite_automaton import Symbol, NondeterministicFiniteAutomaton | ||
|
||
|
||
def as_set(obj): | ||
if not isinstance(obj, set): | ||
return {obj} | ||
return obj | ||
|
||
|
||
class FiniteAutomaton: | ||
def __init__( | ||
self, | ||
fa: NondeterministicFiniteAutomaton = None, | ||
*, | ||
matrix=None, | ||
start_states=None, | ||
final_states=None, | ||
state_to_int=None | ||
) -> None: | ||
self.matrix = matrix | ||
self.start_states = start_states | ||
self.final_states = final_states | ||
self.state_to_int = state_to_int | ||
if fa is not None: | ||
self.state_to_int = {v: i for i, v in enumerate(fa.states)} | ||
self.matrix = to_matrix(fa, self.state_to_int) | ||
self.start_states = fa.start_states | ||
self.final_states = fa.final_states | ||
|
||
def accepts(self, word: Iterable[Symbol]) -> bool: | ||
return self.to_nfa().accepts(word) | ||
|
||
def is_empty(self) -> bool: | ||
return self.to_nfa().is_empty() | ||
|
||
def to_nfa(self) -> NondeterministicFiniteAutomaton: | ||
nfa = NondeterministicFiniteAutomaton() | ||
|
||
for symbol, matrix in self.matrix.items(): | ||
for u, v in zip(*matrix.nonzero()): | ||
nfa.add_transition( | ||
State(self.state_to_int[State(u)]), | ||
symbol, | ||
State(self.state_to_int[State(v)]), | ||
) | ||
|
||
for state in self.start_states: | ||
nfa.add_start_state(State(self.state_to_int[State(state)])) | ||
for state in self.final_states: | ||
nfa.add_final_state(State(self.state_to_int[State(state)])) | ||
|
||
return nfa | ||
|
||
|
||
def to_matrix( | ||
fa: NondeterministicFiniteAutomaton, state_to_int: Dict[int, int] | ||
) -> Dict[Symbol, sp.sparse.dok_matrix]: | ||
result = {} | ||
|
||
for symbol in fa.symbols: | ||
result[symbol] = sp.sparse.dok_matrix( | ||
(len(fa.states), len(fa.states)), dtype=bool | ||
) | ||
for v, edges in fa.to_dict().items(): | ||
if symbol in edges: | ||
u = edges[symbol] | ||
result[symbol][state_to_int[v], state_to_int[u]] = True | ||
|
||
return result | ||
m = None | ||
start = None | ||
final = None | ||
mapping = None | ||
|
||
def __init__(self, obj, start=set(), final=set(), mapping=dict()): | ||
if isinstance( | ||
obj, (DeterministicFiniteAutomaton, NondeterministicFiniteAutomaton) | ||
): | ||
matrix = nfa_to_matrix(obj) | ||
self.m = matrix.m | ||
self.start = matrix.start | ||
self.final = matrix.final | ||
self.mapping = matrix.mapping | ||
else: | ||
self.m = obj | ||
self.start = start | ||
self.final = final | ||
self.mapping = mapping | ||
|
||
def map_for(self, u): | ||
return self.mapping[State(u)] | ||
|
||
def size(self): | ||
return len(self.mapping) | ||
|
||
def start_idx(self): | ||
return [self.map_for(i) for i in self.start] | ||
|
||
def final_idx(self): | ||
return [self.map_for(i) for i in self.final] | ||
|
||
def labels(self): | ||
return self.m.keys() | ||
|
||
def accepts(self, word): | ||
nfa = matrix_to_nfa(self) | ||
real_word = "".join(list(word)) | ||
return nfa.accepts(real_word) | ||
|
||
def is_empty(self): | ||
return len(self.m) == 0 or len(next(self.m.values())) == 0 | ||
|
||
|
||
def nfa_to_matrix(automaton: NondeterministicFiniteAutomaton) -> FiniteAutomaton: | ||
states = automaton.to_dict() | ||
states_num = len(automaton.states) | ||
mapping = {v: i for i, v in enumerate(automaton.states)} | ||
m = dict() | ||
for label in automaton.symbols: | ||
m[label] = dok_matrix((states_num, states_num), dtype=bool) | ||
for u, edges in states.items(): | ||
if label in edges: | ||
for v in as_set(edges[label]): | ||
m[label][mapping[u], mapping[v]] = True | ||
return FiniteAutomaton(m, automaton.start_states, automaton.final_states, mapping) | ||
|
||
|
||
def matrix_to_nfa(automaton: FiniteAutomaton) -> NondeterministicFiniteAutomaton: | ||
nfa = NondeterministicFiniteAutomaton() | ||
for label in automaton.m.keys(): | ||
size = automaton.m[label].shape[0] | ||
for u in range(size): | ||
for v in range(size): | ||
if automaton.m[label][u, v]: | ||
nfa.add_transition( | ||
automaton.map_for(u), label, automaton.map_for(v) | ||
) | ||
for s in automaton.start: | ||
nfa.add_start_state(automaton.map_for(s)) | ||
for s in automaton.final: | ||
nfa.add_final_state(automaton.map_for(s)) | ||
return nfa | ||
|
||
|
||
def transitive_closure(automaton: FiniteAutomaton): | ||
if len(automaton.m.values()) == 0: | ||
return dok_matrix((0, 0), dtype=bool) | ||
adj = sum(automaton.m.values()) | ||
for i in range(adj.shape[0]): | ||
adj += adj @ adj | ||
return adj | ||
|
||
|
||
def intersect_automata( | ||
atomaton1: FiniteAutomaton, atomaton2: FiniteAutomaton | ||
automaton1: FiniteAutomaton, automaton2: FiniteAutomaton | ||
) -> FiniteAutomaton: | ||
matrix = {} | ||
start_states = set() | ||
final_states = set() | ||
state_to_int = {} | ||
symbols = set(atomaton1.matrix.keys()) & set(atomaton2.matrix.keys()) | ||
|
||
for symbol in symbols: | ||
matrix[symbol] = sp.sparse.kron( | ||
atomaton1.matrix[symbol], atomaton2.matrix[symbol], "csr" | ||
) | ||
|
||
for u in atomaton1.state_to_int: | ||
for v in atomaton2.state_to_int: | ||
k = ( | ||
atomaton1.state_to_int[u] * len(atomaton2.state_to_int) | ||
+ atomaton2.state_to_int[v] | ||
) | ||
state_to_int[k] = k | ||
|
||
if u in atomaton1.start_states and v in atomaton2.start_states: | ||
start_states.add(State(k)) | ||
|
||
if u in atomaton1.final_states and v in atomaton2.final_states: | ||
final_states.add(State(k)) | ||
|
||
return FiniteAutomaton( | ||
matrix=matrix, | ||
start_states=start_states, | ||
final_states=final_states, | ||
state_to_int=state_to_int, | ||
) | ||
labels = automaton1.m.keys() & automaton2.m.keys() | ||
m = dict() | ||
start = set() | ||
final = set() | ||
mapping = dict() | ||
for label in labels: | ||
m[label] = kron(automaton1.m[label], automaton2.m[label], "csr") | ||
for u, i in automaton1.mapping.items(): | ||
for v, j in automaton2.mapping.items(): | ||
k = len(automaton2.mapping) * i + j | ||
mapping[k] = k | ||
if u in automaton1.start and v in automaton2.start: | ||
start.add(State(k)) | ||
if u in automaton1.final and v in automaton2.final: | ||
final.add(State(k)) | ||
return FiniteAutomaton(m, start, final, mapping) | ||
|
||
|
||
def paths_ends( | ||
graph: MultiDiGraph, start_nodes: set[int], final_nodes: set[int], regex: str | ||
) -> list: | ||
nfa = graph_to_nfa(graph) | ||
dfa = regex_to_dfa(regex) | ||
automaton = intersect_automata(nfa, dfa) | ||
automaton_paths = automaton.to_nfa().get_paths(start_nodes, final_nodes) | ||
return automaton_paths | ||
) -> list[tuple[object, object]]: | ||
nfa = nfa_to_matrix(graph_to_nfa(graph, start_nodes, final_nodes)) | ||
dfa = nfa_to_matrix(regex_to_dfa(regex)) | ||
intersec = intersect_automata(nfa, dfa) | ||
clos = transitive_closure(intersec) | ||
mapping = {v: i for i, v in nfa.mapping.items()} | ||
size = len(dfa.mapping) | ||
res = list() | ||
for u, v in zip(*clos.nonzero()): | ||
if u in intersec.start and v in intersec.final: | ||
res.append((mapping[u // size], mapping[v // size])) | ||
return res |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,39 @@ | ||
import numpy as np | ||
from scipy.sparse import csr_matrix | ||
from scipy.sparse.csgraph import breadth_first_order | ||
from scipy.sparse import * | ||
from project.task3 import FiniteAutomaton | ||
|
||
|
||
def reachability_with_constraints( | ||
fa: FiniteAutomaton, constraints_fa: FiniteAutomaton | ||
) -> dict[int, set[int]]: | ||
num_states = fa.num_states | ||
transitions = fa.transitions | ||
adjacency_matrix = np.zeros((num_states, num_states), dtype=bool) | ||
for state, next_state in transitions: | ||
adjacency_matrix[state, next_state] = True | ||
|
||
csr_adjacency_matrix = csr_matrix(adjacency_matrix) | ||
|
||
reachable_states = {} | ||
|
||
for start_state in range(num_states): | ||
_, predecessors = breadth_first_order( | ||
csr_adjacency_matrix, i_start=start_state, return_predecessors=True | ||
) | ||
|
||
reachable_set = set([start_state]) | ||
|
||
for pred_state, _ in enumerate(predecessors): | ||
if pred_state != start_state and predecessors[pred_state] != -9999: | ||
reachable_set.add(pred_state) | ||
reachable_states[start_state] = reachable_set | ||
|
||
return reachable_states | ||
m = constraints_fa.size() | ||
n = fa.size() | ||
|
||
def diagonalise(mat): | ||
res = dok_matrix(mat.shape, dtype=bool) | ||
for i in range(mat.shape[0]): | ||
for j in range(mat.shape[0]): | ||
if mat[j, i]: | ||
res[i] += mat[j] | ||
return res | ||
|
||
labels = fa.labels() & constraints_fa.labels() | ||
res = {s: set() for s in fa.start} | ||
adj = { | ||
label: block_diag((constraints_fa.m[label], fa.m[label])) for label in labels | ||
} | ||
for v in fa.start_idx(): | ||
front = dok_matrix((m, m + n), dtype=bool) | ||
for i in constraints_fa.start_idx(): | ||
front[i, i] = True | ||
for i in range(m): | ||
front[i, v + m] = True | ||
for _ in range(m * n): | ||
front = sum( | ||
[dok_matrix((m, m + n), dtype=bool)] | ||
+ [diagonalise(front @ adj[label]) for label in labels] | ||
) | ||
for i in constraints_fa.final_idx(): | ||
for j in fa.final_idx(): | ||
if front[i, j + m]: | ||
res[v].add(j) | ||
return res |