From 9d0e654952802f9935a8bc72db6eaabb018cf938 Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Thu, 20 Jun 2024 01:09:38 -0400 Subject: [PATCH 1/5] wip --- agentfile/launchers/local.py | 33 +++++--- agentfile/message_queues/simple.py | 7 +- agentfile/orchestrators/agent.py | 2 - agentfile/services/agent.py | 7 ++ agentfile/services/tool.py | 8 ++ .../agentic_toolservice_local_single.py | 79 +++++++++++++++++++ 6 files changed, 122 insertions(+), 14 deletions(-) create mode 100644 example_scripts/agentic_toolservice_local_single.py diff --git a/agentfile/launchers/local.py b/agentfile/launchers/local.py index 3cf1f669..cda16ebc 100644 --- a/agentfile/launchers/local.py +++ b/agentfile/launchers/local.py @@ -3,6 +3,7 @@ from typing import Any, Callable, Dict, List, Optional from agentfile.services.base import BaseService +from agentfile.services.tool import ToolService from agentfile.control_plane.base import BaseControlPlane from agentfile.message_consumers.base import BaseMessageQueueConsumer from agentfile.message_queues.simple import SimpleMessageQueue @@ -31,10 +32,12 @@ def __init__( services: List[BaseService], control_plane: BaseControlPlane, message_queue: SimpleMessageQueue, + additional_consumers: List[BaseMessageQueueConsumer] = [], publish_callback: Optional[PublishCallback] = None, ) -> None: self.services = services self.control_plane = control_plane + self.additional_consumers = additional_consumers self._message_queue = message_queue self._publisher_id = f"{self.__class__.__qualname__}-{uuid.uuid4()}" self._publish_callback = publish_callback @@ -61,7 +64,7 @@ async def register_consumers( for service in self.services: await self.message_queue.register_consumer(service.as_consumer()) - consumers = consumers or [] + consumers = (consumers or []) + self.additional_consumers for consumer in consumers: await self.message_queue.register_consumer(consumer) @@ -79,6 +82,17 @@ async def alaunch_single(self, initial_task: str) -> None: ) await self.register_consumers([human_consumer]) + # register each service to the control plane + for service in self.services: + if isinstance(service, ToolService): + continue + await self.control_plane.register_service(service.service_definition) + + # start services + bg_tasks = [] + for service in self.services: + bg_tasks.append(asyncio.create_task(service.launch_local())) + # publish initial task await self.publish( QueueMessage( @@ -87,14 +101,11 @@ async def alaunch_single(self, initial_task: str) -> None: data=TaskDefinition(input=initial_task).dict(), ), ) - - # register each service to the control plane - for service in self.services: - await self.control_plane.register_service(service.service_definition) - - # start services - for service in self.services: - asyncio.create_task(service.launch_local()) - # runs until the message queue is stopped by the human consumer - await self.message_queue.start() + mq_task = asyncio.create_task(self.message_queue.start()) + await asyncio.sleep(10) + + # shutdown + for task in bg_tasks: + task.cancel() + mq_task.cancel() diff --git a/agentfile/message_queues/simple.py b/agentfile/message_queues/simple.py index 432add88..fb0e7036 100644 --- a/agentfile/message_queues/simple.py +++ b/agentfile/message_queues/simple.py @@ -63,7 +63,10 @@ async def _publish_to_consumer(self, message: QueueMessage, **kwargs: Any) -> An consumer = self._select_consumer(message) try: await consumer.process_message(message, **kwargs) - except Exception: + except Exception as e: + logger.debug( + f"Failed to publish message of type '{message.type}' to consumer. Message: {str(e)}" + ) raise async def start(self) -> None: @@ -87,11 +90,13 @@ async def register_consumer( if message_type_str not in self.consumers: self.consumers[message_type_str] = {consumer.id_: consumer} + logger.info(f"Consumer {consumer.id_} has been registered.") else: if consumer.id_ in self.consumers[message_type_str]: raise ValueError("Consumer has already been added.") self.consumers[message_type_str][consumer.id_] = consumer + logger.info(f"Consumer {consumer.id_} has been registered.") if message_type_str not in self.queues: self.queues[message_type_str] = deque() diff --git a/agentfile/orchestrators/agent.py b/agentfile/orchestrators/agent.py index b44e5b95..75becac6 100644 --- a/agentfile/orchestrators/agent.py +++ b/agentfile/orchestrators/agent.py @@ -44,14 +44,12 @@ async def get_next_messages( response = await self.llm.apredict_and_call( tools, user_msg=task_def.input, - error_on_no_tool_call=False, ) else: messages = memory.get() response = await self.llm.apredict_and_call( tools_plus_human, chat_history=messages, - error_on_no_tool_call=False, ) # check if there was a tool call diff --git a/agentfile/services/agent.py b/agentfile/services/agent.py index d4ee7a3b..87577d26 100644 --- a/agentfile/services/agent.py +++ b/agentfile/services/agent.py @@ -24,6 +24,12 @@ CONTROL_PLANE_NAME, ) +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logging.basicConfig(level=logging.DEBUG) + class AgentService(BaseService): service_name: str @@ -164,6 +170,7 @@ def as_consumer(self) -> BaseMessageQueueConsumer: ) async def launch_local(self) -> None: + logger.info(f"{self.service_name} launch_local") asyncio.create_task(self.processing_loop()) # ---- Server based methods ---- diff --git a/agentfile/services/tool.py b/agentfile/services/tool.py index a9af0a0a..77a9e1f5 100644 --- a/agentfile/services/tool.py +++ b/agentfile/services/tool.py @@ -1,4 +1,5 @@ import asyncio +import logging import uuid import uvicorn from asyncio import Lock @@ -26,6 +27,10 @@ ServiceDefinition, ) +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logging.basicConfig(level=logging.DEBUG) + class ToolService(BaseService): service_name: str @@ -117,6 +122,9 @@ async def processing_loop(self) -> None: self.tools, tool_call.tool_call_bundle.tool_name ) + logger.info( + f"Processing tool call id {tool_call.id_} with {tool.metadata.name}" + ) tool_output = await tool.acall( *tool_call.tool_call_bundle.tool_args, **tool_call.tool_call_bundle.tool_kwargs, diff --git a/example_scripts/agentic_toolservice_local_single.py b/example_scripts/agentic_toolservice_local_single.py new file mode 100644 index 00000000..1faafe25 --- /dev/null +++ b/example_scripts/agentic_toolservice_local_single.py @@ -0,0 +1,79 @@ +from agentfile.launchers.local import LocalLauncher +from agentfile.services import AgentService, ToolService +from agentfile.tools import MetaServiceTool +from agentfile.control_plane.fastapi import FastAPIControlPlane +from agentfile.message_queues.simple import SimpleMessageQueue +from agentfile.orchestrators.agent import AgentOrchestrator + +from llama_index.core.agent import FunctionCallingAgentWorker +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI + + +LOGGING = False + +if LOGGING: + import logging + import sys + + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) + + +# create an agent +def get_the_secret_fact() -> str: + """Returns the secret fact.""" + return "The secret fact is: A baby llama is called a 'Cria'." + + +tool = FunctionTool.from_defaults(fn=get_the_secret_fact) + + +# create our multi-agent framework components +message_queue = SimpleMessageQueue() +tool_service = ToolService( + message_queue=message_queue, + tools=[tool], + running=True, + step_interval=0.5, +) + +control_plane = FastAPIControlPlane( + message_queue=message_queue, + orchestrator=AgentOrchestrator(llm=OpenAI()), +) + +meta_tool = MetaServiceTool( + tool_metadata=tool.metadata, + message_queue=message_queue, + tool_service_name=tool_service.service_name, +) +worker1 = FunctionCallingAgentWorker.from_tools( + [meta_tool], + llm=OpenAI(), +) +agent1 = worker1.as_agent() +agent_server_1 = AgentService( + agent=agent1, + message_queue=message_queue, + description="Useful for getting the secret fact.", + service_name="secret_fact_agent", +) + +worker2 = FunctionCallingAgentWorker.from_tools([], llm=OpenAI()) +agent2 = worker2.as_agent() +agent_server_2 = AgentService( + agent=agent2, + message_queue=message_queue, + description="Useful for getting random dumb facts.", + service_name="dumb_fact_agent", +) + +# launch it +launcher = LocalLauncher( + [agent_server_1, agent_server_2, tool_service], + control_plane, + message_queue, + additional_consumers=[meta_tool.as_consumer()], +) +launcher.launch_single("What is the secret fact?") From 9b8514f4b4803b456ff7782bd2b6fa1001b4d7d6 Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Thu, 20 Jun 2024 11:56:18 -0400 Subject: [PATCH 2/5] put back error_on_no_tool_call=False --- agentfile/orchestrators/agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agentfile/orchestrators/agent.py b/agentfile/orchestrators/agent.py index 75becac6..b44e5b95 100644 --- a/agentfile/orchestrators/agent.py +++ b/agentfile/orchestrators/agent.py @@ -44,12 +44,14 @@ async def get_next_messages( response = await self.llm.apredict_and_call( tools, user_msg=task_def.input, + error_on_no_tool_call=False, ) else: messages = memory.get() response = await self.llm.apredict_and_call( tools_plus_human, chat_history=messages, + error_on_no_tool_call=False, ) # check if there was a tool call From 2b85858e38439a6e7478d536b76fb3d33b9b7e7d Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Thu, 20 Jun 2024 14:14:26 -0400 Subject: [PATCH 3/5] cleaner shutdown and UX --- agentfile/launchers/local.py | 30 +++++++++++++++++----------- agentfile/tools/meta_service_tool.py | 6 ++++++ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/agentfile/launchers/local.py b/agentfile/launchers/local.py index cda16ebc..edeb2537 100644 --- a/agentfile/launchers/local.py +++ b/agentfile/launchers/local.py @@ -1,9 +1,11 @@ import asyncio import uuid +import signal +import sys + from typing import Any, Callable, Dict, List, Optional from agentfile.services.base import BaseService -from agentfile.services.tool import ToolService from agentfile.control_plane.base import BaseControlPlane from agentfile.message_consumers.base import BaseMessageQueueConsumer from agentfile.message_queues.simple import SimpleMessageQueue @@ -32,12 +34,10 @@ def __init__( services: List[BaseService], control_plane: BaseControlPlane, message_queue: SimpleMessageQueue, - additional_consumers: List[BaseMessageQueueConsumer] = [], publish_callback: Optional[PublishCallback] = None, ) -> None: self.services = services self.control_plane = control_plane - self.additional_consumers = additional_consumers self._message_queue = message_queue self._publisher_id = f"{self.__class__.__qualname__}-{uuid.uuid4()}" self._publish_callback = publish_callback @@ -64,7 +64,7 @@ async def register_consumers( for service in self.services: await self.message_queue.register_consumer(service.as_consumer()) - consumers = (consumers or []) + self.additional_consumers + consumers = consumers or [] for consumer in consumers: await self.message_queue.register_consumer(consumer) @@ -73,6 +73,15 @@ async def register_consumers( def launch_single(self, initial_task: str) -> None: asyncio.run(self.alaunch_single(initial_task)) + def get_shutdown_handler(self, tasks: List[asyncio.Task]) -> Callable: + def signal_handler(sig: Any, frame: Any) -> None: + print("\nShutting down.") + for task in tasks: + task.cancel() + sys.exit(0) + + return signal_handler + async def alaunch_single(self, initial_task: str) -> None: # register human consumer human_consumer = HumanMessageConsumer( @@ -84,8 +93,6 @@ async def alaunch_single(self, initial_task: str) -> None: # register each service to the control plane for service in self.services: - if isinstance(service, ToolService): - continue await self.control_plane.register_service(service.service_definition) # start services @@ -103,9 +110,8 @@ async def alaunch_single(self, initial_task: str) -> None: ) # runs until the message queue is stopped by the human consumer mq_task = asyncio.create_task(self.message_queue.start()) - await asyncio.sleep(10) - - # shutdown - for task in bg_tasks: - task.cancel() - mq_task.cancel() + shutdown_handler = self.get_shutdown_handler([mq_task] + bg_tasks) + loop = asyncio.get_event_loop() + while loop.is_running(): + await asyncio.sleep(0.1) + signal.signal(signal.SIGINT, shutdown_handler) diff --git a/agentfile/tools/meta_service_tool.py b/agentfile/tools/meta_service_tool.py index b33526c4..903c47ae 100644 --- a/agentfile/tools/meta_service_tool.py +++ b/agentfile/tools/meta_service_tool.py @@ -69,6 +69,12 @@ def __init__( self._metadata = tool_metadata self._lock = asyncio.Lock() + # register tool to the message queue + asyncio.run(self.message_queue.register_consumer(self.as_consumer())) + logger.info( + f"Ready to consume messages of type: {self.as_consumer().message_type}." + ) + @classmethod async def from_tool_service( cls, From 92d7976637ca95f3eb0a117be88c81d2bc4ee4a8 Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Thu, 20 Jun 2024 14:59:17 -0400 Subject: [PATCH 4/5] move registration of tool imn acall --- agentfile/tools/meta_service_tool.py | 22 +++++-------------- .../agentic_toolservice_local_single.py | 22 +------------------ tests/test_meta_service_tool.py | 7 +++--- 3 files changed, 11 insertions(+), 40 deletions(-) diff --git a/agentfile/tools/meta_service_tool.py b/agentfile/tools/meta_service_tool.py index 903c47ae..3a2b9fc3 100644 --- a/agentfile/tools/meta_service_tool.py +++ b/agentfile/tools/meta_service_tool.py @@ -26,18 +26,13 @@ logging.basicConfig(level=logging.DEBUG) -class TimeoutException(Exception): - """Raise when polling for results from message queue exceed timeout.""" - - pass - - class MetaServiceTool(MessageQueuePublisherMixin, AsyncBaseTool, BaseModel): tool_call_results: Dict[str, ToolCallResult] = Field(default_factory=dict) timeout: float = Field(default=10.0, description="timeout interval in seconds.") tool_service_name: str = Field(default_factory=str) step_interval: float = 0.1 raise_timeout: bool = False + registered: bool = False _message_queue: BaseMessageQueue = PrivateAttr() _publisher_id: str = PrivateAttr() @@ -69,12 +64,6 @@ def __init__( self._metadata = tool_metadata self._lock = asyncio.Lock() - # register tool to the message queue - asyncio.run(self.message_queue.register_consumer(self.as_consumer())) - logger.info( - f"Ready to consume messages of type: {self.as_consumer().message_type}." - ) - @classmethod async def from_tool_service( cls, @@ -138,10 +127,6 @@ def publish_callback(self) -> Optional[PublishCallback]: def metadata(self) -> ToolMetadata: return self._metadata - @metadata.setter - def metadata(self, value: ToolMetadata) -> None: - self._metadata = value - @property def lock(self) -> asyncio.Lock: return self._lock @@ -190,6 +175,11 @@ async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput: In order to get a ToolOutput result, this will poll the queue until the result is written. """ + if not self.registered: + # register tool to message queue + await self.message_queue.register_consumer(self.as_consumer()) + self.registered = True + tool_call = ToolCall( tool_call_bundle=ToolCallBundle( tool_name=self.metadata.name, tool_args=args, tool_kwargs=kwargs diff --git a/example_scripts/agentic_toolservice_local_single.py b/example_scripts/agentic_toolservice_local_single.py index 1faafe25..d045a1a9 100644 --- a/example_scripts/agentic_toolservice_local_single.py +++ b/example_scripts/agentic_toolservice_local_single.py @@ -10,16 +10,6 @@ from llama_index.llms.openai import OpenAI -LOGGING = False - -if LOGGING: - import logging - import sys - - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) - - # create an agent def get_the_secret_fact() -> str: """Returns the secret fact.""" @@ -60,20 +50,10 @@ def get_the_secret_fact() -> str: service_name="secret_fact_agent", ) -worker2 = FunctionCallingAgentWorker.from_tools([], llm=OpenAI()) -agent2 = worker2.as_agent() -agent_server_2 = AgentService( - agent=agent2, - message_queue=message_queue, - description="Useful for getting random dumb facts.", - service_name="dumb_fact_agent", -) - # launch it launcher = LocalLauncher( - [agent_server_1, agent_server_2, tool_service], + [agent_server_1, tool_service], control_plane, message_queue, - additional_consumers=[meta_tool.as_consumer()], ) launcher.launch_single("What is the secret fact?") diff --git a/tests/test_meta_service_tool.py b/tests/test_meta_service_tool.py index 06f6fb15..ad247b90 100644 --- a/tests/test_meta_service_tool.py +++ b/tests/test_meta_service_tool.py @@ -63,6 +63,7 @@ async def test_init( # assert assert meta_service_tool.metadata.name == "multiply" + assert not meta_service_tool.registered @pytest.mark.asyncio() @@ -78,6 +79,7 @@ async def test_create_from_tool_service_direct( # assert assert meta_service_tool.metadata.name == "multiply" + assert not meta_service_tool.registered @pytest.mark.asyncio() @@ -137,7 +139,6 @@ async def test_tool_call_output( meta_service_tool: MetaServiceTool = await MetaServiceTool.from_tool_service( tool_service=tool_service, message_queue=message_queue, name="multiply" ) - await message_queue.register_consumer(meta_service_tool.as_consumer()) await message_queue.register_consumer(tool_service.as_consumer()) mq_task = asyncio.create_task(message_queue.start()) ts_task = asyncio.create_task(tool_service.processing_loop()) @@ -155,6 +156,7 @@ async def test_tool_call_output( assert tool_output.tool_name == "multiply" assert tool_output.raw_input == {"args": (), "kwargs": {"a": 1, "b": 9}} assert len(meta_service_tool.tool_call_results) == 0 + assert meta_service_tool.registered @pytest.mark.asyncio() @@ -169,7 +171,6 @@ async def test_tool_call_raise_timeout( timeout=1e-9, raise_timeout=True, ) - await message_queue.register_consumer(meta_service_tool.as_consumer()) await message_queue.register_consumer(tool_service.as_consumer()) mq_task = asyncio.create_task(message_queue.start()) ts_task = asyncio.create_task(tool_service.processing_loop()) @@ -196,7 +197,6 @@ async def test_tool_call_reach_timeout( timeout=1e-9, raise_timeout=False, ) - await message_queue.register_consumer(meta_service_tool.as_consumer()) await message_queue.register_consumer(tool_service.as_consumer()) mq_task = asyncio.create_task(message_queue.start()) ts_task = asyncio.create_task(tool_service.processing_loop()) @@ -212,3 +212,4 @@ async def test_tool_call_reach_timeout( assert tool_output.is_error assert tool_output.raw_input == {"args": (), "kwargs": {"a": 1, "b": 9}} assert len(meta_service_tool.tool_call_results) == 0 + assert meta_service_tool.registered From 964e1dbd97873fec3ee1b45fcc4acd5ffb25276b Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Thu, 20 Jun 2024 15:03:17 -0400 Subject: [PATCH 5/5] add deregister method --- agentfile/tools/meta_service_tool.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/agentfile/tools/meta_service_tool.py b/agentfile/tools/meta_service_tool.py index 3a2b9fc3..a375e56a 100644 --- a/agentfile/tools/meta_service_tool.py +++ b/agentfile/tools/meta_service_tool.py @@ -165,6 +165,11 @@ async def _poll_for_tool_call_result(self, tool_call_id: str) -> ToolCallResult: await asyncio.sleep(self.step_interval) return tool_call_result + async def deregister(self) -> None: + """Deregister from message queue.""" + await self.message_queue.deregister_consumer(self.as_consumer()) + self.registered = False + def call(self, *args: Any, **kwargs: Any) -> ToolOutput: """Call.""" return asyncio.run(self.acall(*args, **kwargs))