Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
TeamSPoon committed Nov 27, 2024
1 parent 545470f commit 30e0141
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 97 deletions.
71 changes: 40 additions & 31 deletions python/hyperon/atoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ def _priv_gnd_get_object(atom):
return SpaceRef._from_cspace(hp.atom_get_space(atom.catom))
elif typ == S('Bool') or typ == S('Number'):
converter = ConvertingSerializer()
hp.atom_gnd_serialize(atom.catom, converter)
if converter.value is None:
raise RuntimeError(f"Could not convert atom {atom}")
try:
res = hp.atom_gnd_serialize(atom.catom, converter)
except Exception as e:
raise RuntimeError(f"Could not convert atom {atom} to Python value, exception caught: {e}")
if res != SerialResult.OK or converter.value is None:
raise RuntimeError(f"Could not convert atom {atom} to Python value")
else:
return ValueObject(converter.value)
else:
raise TypeError(f"Cannot get_object of unsupported non-C {atom}")
raise TypeError(f"Cannot get Python object of unsupported non-C atom {atom}")


def G(object, type=AtomType.UNDEFINED):
Expand Down Expand Up @@ -311,6 +314,35 @@ class MettaError(Exception):
, but we don't want to output Python error stack."""
pass

def unwrap_args(atoms):
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
if hasattr(a, 'get_object'):
args.append(a.get_object().content)
else:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
return args, kwargs

class OperationObject(GroundedObject):
"""
An OperationObject represents an operation as a grounded object, allowing for more
Expand Down Expand Up @@ -385,32 +417,7 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
"""
# type-check?
if self.unwrap:
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
try:
args.append(a.get_object().content)
except:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
args, kwargs = unwrap_args(atoms)
try:
result = self.op(*args, **kwargs)
except MettaError as e:
Expand All @@ -422,7 +429,9 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
return [ValueAtom(result, res_typ)]
else:
result = self.op(*atoms)
if not isinstance(result, list):
try:
iter(result)
except TypeError:
raise RuntimeError("Grounded operation `" + self.name + "` should return list")
return result

Expand Down
1 change: 1 addition & 0 deletions python/hyperon/exts/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .agent_base import AgentObject
from .agent_base import agent_atoms
from .agent_base import BaseListeningAgent
93 changes: 91 additions & 2 deletions python/hyperon/exts/agents/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,46 @@
'''
This is very preliminary and incomplete PoC version.
However, it is put to exts, because metta-motto depends on it.
Reagrding threading:
- Generic threading for metta can be introduced with
parallel and sequential composition, for-comprehension, etc.
Agents could be built on top of this functionality. However,
this piece of code was driven by metta-motto demands.
- Two main cases for agents are:
-- Immediate call with inputs to get outputs
-- Asynchronous events and responses
Supporting both cases in one implementation is more convenient,
because both of them can be needed simultaneously in certain
domains (e.g. metta-motto)
- Implementation can be quite different.
-- Agents could be started explicitly
-- They could inherint from StreamMethod
-- Other methods could be called directly without StreamMethod wrapper
All these nuances are to be fleshed out
'''

import threading
from queue import Queue
class StreamMethod(threading.Thread):
def __init__(self, method, args):
super().__init__() #daemon=True
self._result = Queue()
self.method = method
self.args = args

def run(self):
for r in self.method(*self.args):
self._result.put(r)

def __iter__(self):
return self

def __next__(self):
if self._result.empty() and not self.is_alive():
raise StopIteration
return self._result.get()


class AgentObject:

'''
Expand Down Expand Up @@ -58,6 +96,9 @@ def _try_unwrap(self, val):
return repr(val)

def __init__(self, path=None, atoms={}, include_paths=None, code=None):
if path is None and code is None:
# purely Python agent
return
# The first argument is either path or code when called from MeTTa
if isinstance(path, ExpressionAtom):# and path != E():
code = path
Expand Down Expand Up @@ -106,17 +147,65 @@ def __call__(self, atom):
)
return self._metta.evaluate_atom(atom)

def is_daemon(self):
return hasattr(self, 'daemon') and self.daemon is True

def __metta_call__(self, *args):
call = True
method = self.__call__
if len(args) > 0 and isinstance(args[0], SymbolAtom):
n = args[0].get_name()
if n[0] == '.' and hasattr(self, n[1:]):
method = getattr(self, n[1:])
args = args[1:]
call = False
if self._unwrap:
return OperationObject(f"{method}", method).execute(*args)
return method(*args)
method = OperationObject(f"{method}", method).execute
st = StreamMethod(method, args)
st.start()
# We don't return the stream here; otherwise it will be consumed immediately.
# If the agent itself would be StreamMethod, its results could be accessbile.
# Here, they are lost (TODO?).
if call and self.is_daemon():
return [E()]
return st

class BaseListeningAgent(AgentObject):
def __init__(self, path=None, atoms={}, include_paths=None, code=None):
super().__init__(path, atoms, include_paths, code)
self.messages = Queue()
self.running = False
self._output = []
self.lock = threading.RLock()

def start(self, *args):
if not args:
args = ()
self.running = True
st = StreamMethod(self.messages_processor, args)
st.start()

def message_processor(self, message, *args):
return []

def messages_processor(self, *args):
while self.running:
if not self.messages.empty():
m = self.messages.get()
with self.lock:
self._output = self.message_processor(m, *args)
return []

def stop(self):
self.running = False
return []

def input(self, msg):
self.messages.put(msg)
return []

def get_output(self):
return self._output

@register_atoms(pass_metta=True)
def agent_atoms(metta):
Expand Down
69 changes: 69 additions & 0 deletions python/hyperon/exts/agents/tests/test_agents.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from hyperon import MeTTa
from hyperon.exts.agents import AgentObject
from queue import Queue
from time import sleep

class MyAgent(AgentObject):
'''
Expand All @@ -20,3 +22,70 @@ def __call__(self, a, b):
! (&agent 7 8)
! (&agent .subs 4)
'''))

# =================================

class Agent1(AgentObject):
def __call__(self):
for x in range(10):
yield x
sleep(1)
class Agent2(AgentObject):
def __call__(self, xs):
for x in xs:
yield x*2
class Agent3(AgentObject):
def __call__(self, xs):
for x in xs:
print("Result: ", x)

m = MeTTa()
m.register_atom('new-agent-1', Agent1.agent_creator_atom())
m.register_atom('new-agent-2', Agent2.agent_creator_atom())
m.register_atom('new-agent-3', Agent3.agent_creator_atom())
print(m.run('''
! ((new-agent-3) ((new-agent-2) ((new-agent-1))))
'''))

# =================================

class Agnt(AgentObject):
def __init__(self):
self.messages = Queue()
self.running = False
self.output = Queue()
self.daemon = True
def __call__(self):
self.running = True
cnt = 0
while self.running:
if self.messages.empty():
self.output.put(f"Waiting {cnt}")
sleep(2)
cnt += 1
else:
m = self.messages.get()
self.output.put(m[::-1])
def stop(self):
self.running = False
def input(self, msg):
self.messages.put(msg)
def response(self):
if self.output.empty():
return None
return self.output.get()

m = MeTTa()
m.register_atom('agnt', Agnt.agent_creator_atom())
print(m.run('''
! (bind! &a1 (agnt))
! (&a1)
! (println! "Agent is running")
! ((py-atom time.sleep) 1)
! ("First response:" (&a1 .response))
! (&a1 .input "Hello")
! (println! "Agent is receiving messages")
! ((py-atom time.sleep) 2)
! ("Second response:" (&a1 .response))
! (&a1 .stop)
'''))
Loading

0 comments on commit 30e0141

Please sign in to comment.