Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task 4 #7

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
from .task3 import intersect_automata, FiniteAutomaton
from .task2 import regex_to_dfa, graph_to_nfa
from .task_1 import graph_info, create_labeled_two_cycle_graph
from .task3 import intersect_automata, FiniteAutomaton

__all__ = [
"graph_info",
"create_labeled_two_cycle_graph",
"regex_to_dfa",
"graph_to_nfa",
"intersect_automata",
"FiniteAutomaton",
"reachability_with_constraints"
"FiniteAutomaton",
"reachability_with_constraints",
]
222 changes: 124 additions & 98 deletions project/task3.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,136 @@
from pyformlang.finite_automaton import (
DeterministicFiniteAutomaton,
NondeterministicFiniteAutomaton,
State,
)
from scipy.sparse import dok_matrix, kron
from networkx import MultiDiGraph
import scipy as sp
from typing import Iterable
from pyformlang.finite_automaton import *
from project.task2 import graph_to_nfa, regex_to_dfa
from typing import Dict
from pyformlang.finite_automaton import Symbol, NondeterministicFiniteAutomaton


def as_set(obj):
if not isinstance(obj, set):
return {obj}
return obj


class FiniteAutomaton:
def __init__(
self,
fa: NondeterministicFiniteAutomaton = None,
*,
matrix=None,
start_states=None,
final_states=None,
state_to_int=None
) -> None:
self.matrix = matrix
self.start_states = start_states
self.final_states = final_states
self.state_to_int = state_to_int
if fa is not None:
self.state_to_int = {v: i for i, v in enumerate(fa.states)}
self.matrix = to_matrix(fa, self.state_to_int)
self.start_states = fa.start_states
self.final_states = fa.final_states

def accepts(self, word: Iterable[Symbol]) -> bool:
return self.to_nfa().accepts(word)

def is_empty(self) -> bool:
return self.to_nfa().is_empty()

def to_nfa(self) -> NondeterministicFiniteAutomaton:
nfa = NondeterministicFiniteAutomaton()

for symbol, matrix in self.matrix.items():
for u, v in zip(*matrix.nonzero()):
nfa.add_transition(
State(self.state_to_int[State(u)]),
symbol,
State(self.state_to_int[State(v)]),
)

for state in self.start_states:
nfa.add_start_state(State(self.state_to_int[State(state)]))
for state in self.final_states:
nfa.add_final_state(State(self.state_to_int[State(state)]))

return nfa


def to_matrix(fa: NondeterministicFiniteAutomaton, state_to_int: Dict[int, int]) -> Dict[Symbol, sp.sparse.dok_matrix]:
result = {}

for symbol in fa.symbols:
result[symbol] = sp.sparse.dok_matrix((len(fa.states), len(fa.states)), dtype=bool)
for v, edges in fa.to_dict().items():
if symbol in edges:
u = edges[symbol]
result[symbol][state_to_int[v], state_to_int[u]] = True

return result
m = None
start = None
final = None
mapping = None

def __init__(self, obj, start=set(), final=set(), mapping=dict()):
if isinstance(
obj, (DeterministicFiniteAutomaton, NondeterministicFiniteAutomaton)
):
matrix = nfa_to_matrix(obj)
self.m = matrix.m
self.start = matrix.start
self.final = matrix.final
self.mapping = matrix.mapping
else:
self.m = obj
self.start = start
self.final = final
self.mapping = mapping

def map_for(self, u):
return self.mapping[State(u)]

def size(self):
return len(self.mapping)

def start_idx(self):
return [self.map_for(i) for i in self.start]

def final_idx(self):
return [self.map_for(i) for i in self.final]

def labels(self):
return self.m.keys()

def accepts(self, word):
nfa = matrix_to_nfa(self)
real_word = "".join(list(word))
return nfa.accepts(real_word)

def is_empty(self):
return len(self.m) == 0 or len(next(self.m.values())) == 0


def nfa_to_matrix(automaton: NondeterministicFiniteAutomaton) -> FiniteAutomaton:
states = automaton.to_dict()
states_num = len(automaton.states)
mapping = {v: i for i, v in enumerate(automaton.states)}
m = dict()
for label in automaton.symbols:
m[label] = dok_matrix((states_num, states_num), dtype=bool)
for u, edges in states.items():
if label in edges:
for v in as_set(edges[label]):
m[label][mapping[u], mapping[v]] = True
return FiniteAutomaton(m, automaton.start_states, automaton.final_states, mapping)


def matrix_to_nfa(automaton: FiniteAutomaton) -> NondeterministicFiniteAutomaton:
nfa = NondeterministicFiniteAutomaton()
for label in automaton.m.keys():
size = automaton.m[label].shape[0]
for u in range(size):
for v in range(size):
if automaton.m[label][u, v]:
nfa.add_transition(
automaton.map_for(u), label, automaton.map_for(v)
)
for s in automaton.start:
nfa.add_start_state(automaton.map_for(s))
for s in automaton.final:
nfa.add_final_state(automaton.map_for(s))
return nfa


def transitive_closure(automaton: FiniteAutomaton):
if len(automaton.m.values()) == 0:
return dok_matrix((0, 0), dtype=bool)
adj = sum(automaton.m.values())
for i in range(adj.shape[0]):
adj += adj @ adj
return adj


def intersect_automata(
atomaton1: FiniteAutomaton, atomaton2: FiniteAutomaton
automaton1: FiniteAutomaton, automaton2: FiniteAutomaton
) -> FiniteAutomaton:
matrix = {}
start_states = set()
final_states = set()
state_to_int = {}
symbols = set(atomaton1.matrix.keys()) & set(atomaton2.matrix.keys())

for symbol in symbols:
matrix[symbol] = sp.sparse.kron(
atomaton1.matrix[symbol], atomaton2.matrix[symbol], "csr"
)

for u in atomaton1.state_to_int:
for v in atomaton2.state_to_int:
k = (
atomaton1.state_to_int[u] * len(atomaton2.state_to_int)
+ atomaton2.state_to_int[v]
)
state_to_int[k] = k

if u in atomaton1.start_states and v in atomaton2.start_states:
start_states.add(State(k))

if u in atomaton1.final_states and v in atomaton2.final_states:
final_states.add(State(k))

return FiniteAutomaton(
matrix=matrix,
start_states=start_states,
final_states=final_states,
state_to_int=state_to_int,
)
labels = automaton1.m.keys() & automaton2.m.keys()
m = dict()
start = set()
final = set()
mapping = dict()
for label in labels:
m[label] = kron(automaton1.m[label], automaton2.m[label], "csr")
for u, i in automaton1.mapping.items():
for v, j in automaton2.mapping.items():
k = len(automaton2.mapping) * i + j
mapping[k] = k
if u in automaton1.start and v in automaton2.start:
start.add(State(k))
if u in automaton1.final and v in automaton2.final:
final.add(State(k))
return FiniteAutomaton(m, start, final, mapping)


def paths_ends(
graph: MultiDiGraph, start_nodes: set[int], final_nodes: set[int], regex: str
) -> list:
nfa = graph_to_nfa(graph)
dfa = regex_to_dfa(regex)
automaton = intersect_automata(nfa, dfa)
automaton_paths = automaton.to_nfa().get_paths(start_nodes, final_nodes)
return automaton_paths
) -> list[tuple[object, object]]:
nfa = nfa_to_matrix(graph_to_nfa(graph, start_nodes, final_nodes))
dfa = nfa_to_matrix(regex_to_dfa(regex))
intersec = intersect_automata(nfa, dfa)
clos = transitive_closure(intersec)
mapping = {v: i for i, v in nfa.mapping.items()}
size = len(dfa.mapping)
res = list()
for u, v in zip(*clos.nonzero()):
if u in intersec.start and v in intersec.final:
res.append((mapping[u // size], mapping[v // size]))
return res
59 changes: 37 additions & 22 deletions project/task4.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
from typing import Dict, Set
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import breadth_first_order
from .task3 import FiniteAutomaton
from scipy.sparse import *
from project.task3 import FiniteAutomaton

def reachability_with_constraints(fa: FiniteAutomaton,
constraints_fa: FiniteAutomaton) -> Dict[int, Set[int]]:
num_states = fa.num_states()
adjacency_matrix = np.zeros((num_states, num_states), dtype=int)
for transition in fa.transitions:
adjacency_matrix[transition[0], transition[2]] = 1
graph = csr_matrix(adjacency_matrix)
start_states = fa.initial_states
reachable_from_starts = {}
for start_state in start_states:
_, predecessors = breadth_first_order(graph, start_state, return_predecessors=True)
reachable_states = set()
for state in range(num_states):
if predecessors[state] != -9999 and constraints_fa.accepts(state):
reachable_states.add(state)
reachable_from_starts[start_state] = reachable_states

return reachable_from_starts
def reachability_with_constraints(
fa: FiniteAutomaton, constraints_fa: FiniteAutomaton
) -> dict[int, set[int]]:
m = constraints_fa.size()
n = fa.size()

def diagonalise(mat):
res = dok_matrix(mat.shape, dtype=bool)
for i in range(mat.shape[0]):
for j in range(mat.shape[0]):
if mat[j, i]:
res[i] += mat[j]
return res

labels = fa.labels() & constraints_fa.labels()
res = {s: set() for s in fa.start}
adj = {
label: block_diag((constraints_fa.m[label], fa.m[label])) for label in labels
}
for v in fa.start_idx():
front = dok_matrix((m, m + n), dtype=bool)
for i in constraints_fa.start_idx():
front[i, i] = True
for i in range(m):
front[i, v + m] = True
for _ in range(m * n):
front = sum(
[dok_matrix((m, m + n), dtype=bool)]
+ [diagonalise(front @ adj[label]) for label in labels]
)
for i in constraints_fa.final_idx():
for j in fa.final_idx():
if front[i, j + m]:
res[v].add(j)
return res
Loading