-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
284 additions
and
404 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,130 +1,175 @@ | ||
from pyformlang.finite_automaton import * | ||
from scipy.sparse import * | ||
from networkx import * | ||
from typing import * | ||
import scipy | ||
from networkx import MultiDiGraph | ||
from networkx.classes.reportviews import NodeView | ||
from pyformlang.finite_automaton import ( | ||
DeterministicFiniteAutomaton, | ||
NondeterministicFiniteAutomaton, | ||
State, | ||
) | ||
from scipy.sparse import dok_matrix, kron | ||
|
||
from project.task2 import regex_to_dfa, graph_to_nfa | ||
|
||
from itertools import product | ||
|
||
|
||
class FiniteAutomaton: | ||
def __init__(self, nka=None): | ||
if nka is None: | ||
return | ||
|
||
map_index_to_state = {s: i for i, s in enumerate(nka.states)} | ||
|
||
self.map_index_to_state = list(nka.states) | ||
|
||
self.start_states = {map_index_to_state[st] for st in nka.start_states} | ||
self.final_states = {map_index_to_state[fi] for fi in nka.final_states} | ||
|
||
self.func_to_steps = {} | ||
|
||
states = nka.to_dict() | ||
n = len(nka.states) | ||
|
||
for symbols in nka.symbols: | ||
self.func_to_steps[symbols] = dok_matrix((n, n), dtype=bool) | ||
for key, value in states.items(): | ||
if symbols in value: | ||
for fi in ( | ||
value[symbols] | ||
if isinstance(value[symbols], set) | ||
else {value[symbols]} | ||
): | ||
self.func_to_steps[symbols][ | ||
map_index_to_state[key], map_index_to_state[fi] | ||
] = True | ||
|
||
def accepts(self, word: Iterable[Symbol]) -> bool: | ||
nka = NondeterministicFiniteAutomaton() | ||
|
||
for key, value in self.func_to_steps.items(): | ||
nka.add_transitions( | ||
[ | ||
(start, key, end) | ||
for (start, end) in product(range(value.shape[0]), repeat=2) | ||
if self.func_to_steps[key][start, end] | ||
] | ||
m = None | ||
start = None | ||
final = None | ||
mapping = None | ||
|
||
def __init__(self, automata, start=None, final=None, mapping=None): | ||
if mapping is None: | ||
mapping = dict() | ||
if final is None: | ||
final = set() | ||
if start is None: | ||
start = set() | ||
|
||
# TODO make this check also in types | ||
if isinstance(automata, DeterministicFiniteAutomaton) or isinstance( | ||
automata, NondeterministicFiniteAutomaton | ||
): | ||
mat = nfa_to_matrix(automata) | ||
self.m, self.start, self.final, self.mapping = ( | ||
mat.m, | ||
mat.start, | ||
mat.final, | ||
mat.mapping, | ||
) | ||
else: | ||
self.m, self.start, self.final, self.mapping = ( | ||
automata, | ||
start, | ||
final, | ||
mapping, | ||
) | ||
|
||
for start_state in self.start_states: | ||
nka.add_start_state(start_state) | ||
def accepts(self, word) -> bool: | ||
nfa = matrix_to_nfa(self) | ||
return nfa.accepts("".join(list(word))) | ||
|
||
for final_state in self.final_states: | ||
nka.add_final_state(final_state) | ||
def is_empty(self) -> bool: | ||
return len(self.m.values()) == 0 | ||
|
||
return nka.accepts(word) | ||
def mapping_for(self, u) -> State: | ||
return self.mapping[State(u)] | ||
|
||
def is_empty(self) -> bool: | ||
if len(self.func_to_steps) == 0: | ||
return True | ||
def size(self): | ||
return len(self.mapping) | ||
|
||
def starts(self): | ||
return [self.mapping_for(t) for t in self.start] | ||
|
||
def ends(self): | ||
return [self.mapping_for(t) for t in self.final] | ||
|
||
def labels(self): | ||
return self.mapping.keys() | ||
|
||
|
||
def nfa_to_matrix(automaton: NondeterministicFiniteAutomaton) -> FiniteAutomaton: | ||
states = automaton.to_dict() | ||
len_states = len(automaton.states) | ||
mapping = {v: i for i, v in enumerate(automaton.states)} | ||
m = dict() | ||
|
||
dka = sum(self.func_to_steps.values()) | ||
def as_set(obj): | ||
if not isinstance(obj, set): | ||
return {obj} | ||
return obj | ||
|
||
for _ in range(dka.shape[0]): | ||
dka += dka @ dka | ||
for label in automaton.symbols: | ||
m[label] = dok_matrix((len_states, len_states), dtype=bool) | ||
for u, edges in states.items(): | ||
if label in edges: | ||
for v in as_set(edges[label]): | ||
m[label][mapping[u], mapping[v]] = True | ||
|
||
for st, fi in product(self.start_states, self.final_states): | ||
if dka[st, fi] != 0: | ||
return False | ||
return FiniteAutomaton(m, automaton.start_states, automaton.final_states, mapping) | ||
|
||
return True | ||
|
||
def matrix_to_nfa(automaton: FiniteAutomaton) -> NondeterministicFiniteAutomaton: | ||
nfa = NondeterministicFiniteAutomaton() | ||
|
||
for label in automaton.m.keys(): | ||
m_size = automaton.m[label].shape[0] | ||
for u in range(m_size): | ||
for v in range(m_size): | ||
if automaton.m[label][u, v]: | ||
nfa.add_transition( | ||
automaton.mapping_for(u), label, automaton.mapping_for(v) | ||
) | ||
|
||
for s in automaton.start: | ||
nfa.add_start_state(automaton.mapping_for(s)) | ||
for s in automaton.final: | ||
nfa.add_final_state(automaton.mapping_for(s)) | ||
|
||
return nfa | ||
|
||
|
||
def intersect_automata( | ||
automaton1: FiniteAutomaton, automaton2: FiniteAutomaton | ||
automaton1: FiniteAutomaton, automaton2: FiniteAutomaton, take_from_mapping=False | ||
) -> FiniteAutomaton: | ||
labels = None | ||
if take_from_mapping: | ||
labels = automaton1.mapping.keys() & automaton2.mapping.keys() | ||
else: | ||
labels = automaton1.m.keys() & automaton2.m.keys() | ||
m = dict() | ||
start = set() | ||
final = set() | ||
mapping = dict() | ||
|
||
commaon_keys = automaton1.func_to_steps.keys() & automaton2.func_to_steps.keys() | ||
finatie_automaton = FiniteAutomaton() | ||
finatie_automaton.func_to_steps = {} | ||
for label in labels: | ||
m[label] = kron(automaton1.m[label], automaton2.m[label], "csr") | ||
|
||
for key in commaon_keys: | ||
finatie_automaton.func_to_steps[key] = kron( | ||
automaton1.func_to_steps[key], automaton2.func_to_steps[key], "csr" | ||
) | ||
for u, i in automaton1.mapping.items(): | ||
for v, j in automaton2.mapping.items(): | ||
|
||
finatie_automaton.start_states = set() | ||
finatie_automaton.final_states = set() | ||
k = len(automaton2.mapping) * i + j | ||
mapping[k] = k | ||
|
||
n_states2 = automaton2.func_to_steps.values().__iter__().__next__().shape[0] | ||
assert isinstance(u, State) | ||
if u in automaton1.start and v in automaton2.start: | ||
start.add(State(k)) | ||
|
||
for m, k in product(automaton1.start_states, automaton2.start_states): | ||
finatie_automaton.start_states.add(m * (n_states2) + k) | ||
if u in automaton1.final and v in automaton2.final: | ||
final.add(State(k)) | ||
|
||
for m, k in product(automaton1.final_states, automaton2.final_states): | ||
finatie_automaton.final_states.add(m * (n_states2) + k) | ||
|
||
return finatie_automaton | ||
return FiniteAutomaton(m, start, final, mapping) | ||
|
||
|
||
def paths_ends( | ||
graph: MultiDiGraph, start_nodes: set[int], final_nodes: set[int], regex: str | ||
) -> list[tuple[int, int]]: | ||
fa1 = FiniteAutomaton(graph_to_nfa(graph, start_nodes, final_nodes)) | ||
fa2 = FiniteAutomaton(regex_to_dfa(regex)) | ||
|
||
finite_automaton = intersect_automata(fa1, fa2) | ||
|
||
if len(finite_automaton.func_to_steps) == 0: | ||
return [] | ||
) -> list[tuple[NodeView, NodeView]]: | ||
automaton_regex = nfa_to_matrix(regex_to_dfa(regex)) | ||
automaton_graph = nfa_to_matrix(graph_to_nfa(graph, start_nodes, final_nodes)) | ||
intersection = intersect_automata(automaton_graph, automaton_regex, True) | ||
fa_closure = make_transitive_closure(intersection) | ||
|
||
size = automaton_regex.size() | ||
result = list() | ||
for u, v in zip(*fa_closure.nonzero()): | ||
if u in intersection.start and v in intersection.final: | ||
result.append( | ||
(automaton_graph.mapping[u // size], automaton_graph.mapping[v // size]) | ||
) | ||
|
||
m = sum(finite_automaton.func_to_steps.values()) | ||
return result | ||
|
||
for _ in range(m.shape[0]): | ||
m += m @ m | ||
|
||
n_states2 = fa2.func_to_steps.values().__iter__().__next__().shape[0] | ||
def make_transitive_closure(fa: FiniteAutomaton): | ||
if fa.is_empty(): | ||
return dok_matrix((0, 0), dtype=bool) | ||
|
||
def convert_to_node(i): | ||
return fa1.map_index_to_state[i // n_states2].value | ||
f = None | ||
for m in fa.m.values(): | ||
f = m if f is None else f + m | ||
|
||
res = [] | ||
for st, fi in product(finite_automaton.start_states, finite_automaton.final_states): | ||
if m[st, fi] != 0: | ||
res.append((convert_to_node(st), convert_to_node(fi))) | ||
p = 0 | ||
while f.count_nonzero() != p: | ||
p = f.count_nonzero() | ||
f += f @ f | ||
|
||
return res | ||
return f |
Oops, something went wrong.