forked from trueagi-io/hyperon-experimental
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_custom_space.py
156 lines (118 loc) · 5.52 KB
/
test_custom_space.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import unittest
from hyperon import *
from test_common import HyperonTestCase
class CustomSpace(AbstractSpace):
def __init__(self, unwrap=True):
super().__init__()
self.atoms_list = []
self.unwrap = unwrap
# NOTE: this is a naive implementation barely good enough to pass the tests
# Don't take this as a guide to implementing a space query function
def query(self, query_atom):
# Extract only the variables from the query atom
query_vars = list(filter(lambda atom: atom.get_metatype() == AtomKind.VARIABLE, query_atom.iterate()))
# Match the query atom against every atom in the space
# BindingsSet() creates a binding set with the only matching result
# We use BindingsSet.empty() to support multiple results
new_bindings_set = BindingsSet.empty()
for space_atom in self.atoms_list:
match_results = space_atom.match_atom(query_atom)
# Merge in the bindings from this match, after we narrow the match_results to
# only include variables vars in the query atom
for bindings in match_results.iterator():
bindings.narrow_vars(query_vars)
if not bindings.is_empty():
# new_bindings_set.merge_into(bindings) would work with BindingsSet(), but
# it would return an empty result for multiple alternatives and merge bindings
# for different variables from alternative branches, which would be a funny
# modification of query, but with no real use case
# new_bindings_set.push(bindings) adds an alternative binding to the binding set
new_bindings_set.push(bindings)
return new_bindings_set
def add(self, atom):
self.atoms_list.append(atom)
def remove(self, atom):
if atom in self.atoms_list:
self.atoms_list.remove(atom)
return True
else:
return False
def replace(self, from_atom, to_atom):
if from_atom in self.atoms_list:
self.atoms_list.remove(from_atom)
self.atoms_list.append(to_atom)
return True
else:
return False
def atom_count(self):
return len(self.atoms_list)
def atoms_iter(self):
return iter(self.atoms_list)
class CustomSpaceTest(HyperonTestCase):
def test_custom_space(self):
test_space = CustomSpace()
test_space.test_attrib = "Test Space Payload Attrib"
kb = SpaceRef(test_space)
kb.add_atom(S("a"))
kb.add_atom(S("b"))
self.assertEqual(kb.atom_count(), 2)
self.assertEqual(kb.get_payload().test_attrib, "Test Space Payload Attrib")
self.assertEqualNoOrder(kb.get_atoms(), [S("a"), S("b")])
def test_remove(self):
kb = SpaceRef(CustomSpace())
kb.add_atom(S("a"))
kb.add_atom(S("b"))
kb.add_atom(S("c"))
self.assertTrue(kb.remove_atom(S("b")))
self.assertFalse(kb.remove_atom(S("bogus")))
self.assertEqualNoOrder(kb.get_atoms(), [S("a"), S("c")])
def test_replace(self):
kb = SpaceRef(CustomSpace())
kb.add_atom(S("a"))
kb.add_atom(S("b"))
kb.add_atom(S("c"))
self.assertTrue(kb.replace_atom(S("b"), S("d")))
self.assertEqualNoOrder(kb.get_atoms(), [S("a"), S("d"), S("c")])
def test_query(self):
kb = SpaceRef(CustomSpace())
kb.add_atom(E(S("A"), S("B")))
kb.add_atom(E(S("C"), S("D")))
# Checking that multiple matches can be returned
kb.add_atom(E(S("A"), S("E")))
result = kb.query(E(S("A"), V("x")))
self.assertEqualNoOrder(result, [{"x": S("B")}, {"x": S("E")}])
def test_atom_containing_space(self):
m = MeTTa(env_builder=Environment.test_env())
# Make a little space and add it to the MeTTa interpreter's space
little_space = SpaceRef(CustomSpace())
little_space.add_atom(E(S("A"), S("B")))
space_atom = G(little_space)
m.space().add_atom(E(S("little-space"), space_atom))
# Make sure we can get the little space back, and then query it
kb_result = m.space().query(E(S("little-space"), V("s")))
result_atom = kb_result[0].get("s")
self.assertEqual(result_atom, space_atom)
result = result_atom.get_object().query(E(S("A"), V("v")))
self.assertEqualNoOrder(result, [{"v": S("B")}])
# Add the MeTTa space to the little space for some space recursion
little_space.add_atom(E(S("big-space"), G(m.space())))
def test_match_nested_custom_space(self):
nested = SpaceRef(CustomSpace())
nested.add_atom(E(S("A"), S("B")))
space_atom = G(nested)
runner = MeTTa(env_builder=Environment.test_env())
runner.tokenizer().register_token("nested", lambda token: space_atom)
result = runner.run("!(match nested (A $x) $x)")
self.assertEqual([[S("B")]], result)
def test_runner_with_custom_space(self):
test_space = CustomSpace()
test_space.test_attrib = "Test Space Payload Attrib"
space = SpaceRef(test_space)
space.add_atom(E(S("="), S("key"), S("val")))
metta = MeTTa(space=space, env_builder=Environment.test_env())
self.assertEqual(metta.space().get_payload().test_attrib, "Test Space Payload Attrib")
self.assertEqualMettaRunnerResults(metta.run("!(match &self (= key $val) $val)"),
[[S("val")]]
)
if __name__ == "__main__":
unittest.main()