-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'FormalLanguageConstrainedPathQuerying:main' into main
- Loading branch information
Showing
4 changed files
with
372 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import random | ||
from copy import deepcopy | ||
|
||
import pytest | ||
from to_program_parser import ( | ||
WELL_TYPED, | ||
ILL_TYPED, | ||
GraphProgram, | ||
GrammarProgram, | ||
QueryProgram, | ||
to_program_parser, | ||
) | ||
from fixtures import graph | ||
from grammars_constants import GRAMMARS_DIFFERENT | ||
from networkx import MultiDiGraph | ||
from pyformlang.cfg import CFG | ||
from helper import generate_rnd_start_and_final, generate_rnd_dense_graph | ||
from constants import LABELS | ||
|
||
try: | ||
from project.task7 import cfpq_with_matrix | ||
from project.task12 import typing_program, exec_program | ||
except ImportError: | ||
pytestmark = pytest.mark.skip("Task 12 is not ready to test!") | ||
|
||
|
||
class TestTypeInference: | ||
@pytest.mark.parametrize("program", WELL_TYPED) | ||
def test_well_typed(self, program: str) -> None: | ||
assert typing_program(program) | ||
|
||
@pytest.mark.parametrize("program", ILL_TYPED) | ||
def test_ill_typed(self, program: str) -> None: | ||
assert not typing_program(program) | ||
|
||
@pytest.mark.parametrize("grammar", GRAMMARS_DIFFERENT) | ||
def test_exec_simple(self, graph: MultiDiGraph, grammar: CFG): | ||
start_nodes, final_nodes = generate_rnd_start_and_final(graph) | ||
graph_prog = GraphProgram(deepcopy(graph)) | ||
cfg_prog = GrammarProgram(deepcopy(grammar)) | ||
query = QueryProgram( | ||
graph_prog, cfg_prog, deepcopy(start_nodes), deepcopy(final_nodes) | ||
) | ||
program = query.full_program() | ||
assert typing_program(deepcopy(program)) | ||
cfpq_from_prog = exec_program(deepcopy(program))[query.result_name] | ||
cfpq_from_algo = cfpq_with_matrix( | ||
deepcopy(grammar), | ||
deepcopy(graph), | ||
deepcopy(start_nodes), | ||
deepcopy(final_nodes), | ||
) | ||
assert cfpq_from_prog == cfpq_from_algo | ||
|
||
@pytest.mark.parametrize("queries_count", [1, 3, 5]) | ||
def test_exec_many_graphs_many_queries(self, queries_count): | ||
query_list = [] | ||
for i in range(queries_count): | ||
graph = generate_rnd_dense_graph(1, 40, LABELS) | ||
start_nodes, final_nodes = generate_rnd_start_and_final(deepcopy(graph)) | ||
grammar_prog = GrammarProgram(random.choice(GRAMMARS_DIFFERENT)) | ||
graph_prog = GraphProgram(deepcopy(graph)) | ||
query_list.append( | ||
QueryProgram( | ||
graph_prog, | ||
grammar_prog, | ||
deepcopy(start_nodes), | ||
deepcopy(final_nodes), | ||
) | ||
) | ||
program, name_result = to_program_parser(query_list) | ||
assert typing_program(deepcopy(program)) | ||
result_dict: dict = exec_program(deepcopy(program)) | ||
for var, res in result_dict.items(): | ||
query = name_result[var] | ||
query_full_program = query.full_program() | ||
assert typing_program(deepcopy(query_full_program)) | ||
separate_res = exec_program(deepcopy(query_full_program)) | ||
assert separate_res == res | ||
assert res == cfpq_with_matrix( | ||
query.get_grammar(), | ||
query.get_graph(), | ||
query.start_nodes, | ||
query.final_nodes, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
from copy import copy, deepcopy | ||
|
||
import pyformlang as pl | ||
from pyformlang.cfg import Variable, Terminal | ||
import networkx as nx | ||
from constants import LABEL | ||
|
||
|
||
class FreshVar: | ||
var_counter = 0 | ||
|
||
@classmethod | ||
def generate_fresh(cls, var: str) -> str: | ||
cls.var_counter += 1 | ||
return f"{var}{cls.var_counter}" | ||
|
||
|
||
def _nonterminal_to_string(nonterminal: Variable) -> str: | ||
return nonterminal.to_text().lower() | ||
|
||
|
||
def _terminal_to_string(terminal: Terminal) -> str: | ||
""" | ||
convert terminal symbol into char | ||
:param terminal: terminal symbol | ||
:return: an object view of "x" | ||
""" | ||
terminal_s = terminal.to_text().lower() | ||
return f'"{terminal_s}"' | ||
|
||
|
||
class GraphProgram: | ||
def __init__(self, graph: nx.MultiDiGraph): | ||
self.graph = graph.copy() | ||
self.name = ( | ||
FreshVar.generate_fresh(graph.name) | ||
if graph.name != "" | ||
else FreshVar.generate_fresh("g") | ||
) | ||
|
||
def __str__(self): | ||
program = f"\nlet {self.name} is graph" | ||
for node_from, node_to, data in self.graph.edges(data=True): | ||
program += ( | ||
f'\nadd edge ({node_from}, "{data[LABEL]}", {node_to}) to {self.name}' | ||
) | ||
return program | ||
|
||
|
||
class GrammarProgram: | ||
EPS = '"a"^[0 .. 0]' | ||
|
||
def __init__(self, cfg: pl.cfg.CFG): | ||
self.grammar = copy(cfg) | ||
self.nonterminal_names = {} | ||
for production in cfg.productions: | ||
if production.head not in self.nonterminal_names.keys(): | ||
self.nonterminal_names[production.head] = FreshVar.generate_fresh( | ||
_nonterminal_to_string(deepcopy(production.head)) | ||
) | ||
self.start_nonterminal_name = self.nonterminal_names[cfg.start_symbol] | ||
|
||
def _object_to_string(self, cfg_object: Variable | Terminal) -> str: | ||
""" | ||
convert nonterminal or terminal symbol into program representation | ||
:param cfg_object: terminal or nonterminal | ||
:return: an object view of "x", if object is terminal or x, if object is nonterminal | ||
""" | ||
if isinstance(cfg_object, Variable): | ||
return self.nonterminal_names[cfg_object] | ||
return _terminal_to_string(cfg_object) | ||
|
||
def _objs_to_expr(self, objects: list[pl.cfg.production.CFGObject]) -> str: | ||
if len(objects) == 0: | ||
return self.EPS | ||
return " . ".join(map(self._object_to_string, objects)) | ||
|
||
def _objs_alts(self, objects: list[list[pl.cfg.production.CFGObject]]) -> str: | ||
return " | ".join(map(self._objs_to_expr, objects)) | ||
|
||
def __str__(self) -> str: | ||
res = "" | ||
vars_dict: dict[pl.cfg.Variable, list[list[pl.cfg.production.CFGObject]]] = {} | ||
for production in self.grammar.productions: | ||
head = production.head | ||
body = production.body | ||
if head in vars_dict.keys(): | ||
vars_dict[head].append(body) | ||
else: | ||
vars_dict[head] = [body] | ||
for nonterminal in vars_dict.keys(): | ||
res += f"\nlet {self.nonterminal_names[nonterminal]} = {self._objs_alts(vars_dict[nonterminal])}" | ||
return res | ||
|
||
|
||
class QueryProgram: | ||
def __init__( | ||
self, | ||
graph_program: GraphProgram, | ||
grammar_program: GrammarProgram, | ||
start_nodes: set[int], | ||
final_nodes: set[int] = None, | ||
): | ||
if final_nodes is None: | ||
final_nodes = set() | ||
self.graph_program = graph_program | ||
self.grammar_program = grammar_program | ||
self.result_name = FreshVar.generate_fresh("r") | ||
self.start_nodes = start_nodes | ||
self.final_nodes = final_nodes | ||
|
||
def get_graph(self): | ||
return self.graph_program.graph | ||
|
||
def get_grammar(self): | ||
return self.grammar_program.grammar | ||
|
||
def query_program(self) -> str: | ||
""" | ||
if you want only want to get query | ||
:return: just select expression | ||
""" | ||
query_name = self.grammar_program.start_nonterminal_name | ||
start_set_expr = f"[{', '.join(map(str, self.start_nodes))}]" | ||
if len(self.final_nodes) == 0: | ||
return ( | ||
f"\nlet {self.result_name} = for v in {start_set_expr} return u, v where u reachable from v in " | ||
f"{self.graph_program.name} by {query_name}" | ||
) | ||
final_set_expr = f"[{', '.join(map(str, self.final_nodes))}]" | ||
return ( | ||
f"\nlet {self.result_name} = for v in {start_set_expr} for u in {final_set_expr} return u, v where u " | ||
f"reachable from v in {self.graph_program.name} by {query_name}" | ||
) | ||
|
||
def full_program(self) -> str: | ||
""" | ||
if you want to work with query as meaningful object | ||
:return: fully query with predefined graph and grammar | ||
""" | ||
return f"\n{self.graph_program}\n{self.grammar_program}\n{self.query_program()}" | ||
|
||
|
||
def to_program_parser( | ||
query_list: list[QueryProgram], | ||
) -> tuple[str, dict[str, QueryProgram]]: | ||
result_program = "" | ||
res_name_query = {} | ||
grammar_set = set() | ||
graph_set = set() | ||
for query in query_list: | ||
# if graph is already defined then it is not necessary to define it again | ||
if query.graph_program not in graph_set: | ||
result_program += str(query.graph_program) | ||
graph_set.add(query.graph_program) | ||
# same with grammar | ||
if query.grammar_program not in grammar_set: | ||
result_program += str(query.grammar_program) | ||
grammar_set.add(query.grammar_program) | ||
result_program += query.query_program() | ||
res_name_query.update({query.result_name: query}) | ||
return result_program, res_name_query | ||
|
||
|
||
WELL_TYPED = [ | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let q = ("s" . "f") ^ [1..] | ||
let g = q & p""", | ||
""" | ||
let a = ("a" . b) | "a" ^ [0] | ||
let b = a . "b" | ||
""", | ||
""" | ||
let q = "a" . p | ||
let p = "b" . r | ||
let r = ("c" . r) | "c" ^ [0] | ||
""", | ||
""" | ||
let p = (1,"a",2) | ||
let g is graph | ||
remove edge p from g | ||
""", | ||
""" | ||
let p = 1 | ||
let g is graph | ||
remove vertex p from g | ||
""", | ||
""" | ||
let p = [1,2] | ||
let g is graph | ||
remove vertices p from g | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let g is graph | ||
let r1 = | ||
return v | ||
where u reachable from v in g by p | ||
let q = "s" . q . "f" | "e" | ||
let r2 = | ||
for v in r1 | ||
return u,v | ||
where u reachable from v in g by q | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let g is graph | ||
remove vertices | ||
return v | ||
where u reachable from v in g by p | ||
from g | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let g is graph | ||
let q = "c" ^ [1..] | ||
let r = | ||
for v in | ||
return v | ||
where u reachable from v in g by p | ||
return u,v | ||
where v reachable from u in g by q | ||
""", | ||
] | ||
|
||
ILL_TYPED = [ | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let q = "s" . q . "f" | "e" | ||
let g = q & p | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let q = ("s" . "f") ^ [1..] | ||
let g = [q, p] | ||
""", | ||
""" | ||
let p = "a" . "b" | "c" | ||
let q = ("s" . "f") ^ [1..] | ||
let g = [q, p] | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let g is graph | ||
let r1 = | ||
return u,v | ||
where u reachable from v in g by p | ||
let q = "s" . q . "f" | "e" | ||
let r2 = | ||
for v in r1 | ||
return u,v | ||
where u reachable from v in g by q | ||
""", | ||
""" | ||
let p = "a" . p . "b" | "c" | ||
let g is graph | ||
remove edge p from g | ||
""", | ||
""" | ||
let p = (1,"a",2) | ||
let g is graph | ||
remove vertex p from g | ||
""", | ||
""" | ||
let p = 1 | ||
let g is graph | ||
remove vertices p from g | ||
""", | ||
""" | ||
let p = 1 | ||
let g is graph | ||
let x = | ||
return v | ||
where v reachable from u in g by p | ||
""", | ||
] |