Skip to content

Commit

Permalink
Merge pull request #281 from ag2ai/rewrite-OpenAIRealtimeClient
Browse files Browse the repository at this point in the history
Integrate OpenAI realtime API
  • Loading branch information
davorrunje authored Dec 30, 2024
2 parents 69166d2 + 029d6cb commit c84000a
Show file tree
Hide file tree
Showing 23 changed files with 1,139 additions and 689 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/contrib-openai.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ jobs:
run: |
docker --version
python -m pip install --upgrade pip wheel
pip install -e .[teachable]
pip install -e .[teachable,test]
python -c "import autogen"
pip install pytest-cov>=5
- name: Coverage
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
Expand Down Expand Up @@ -211,7 +210,7 @@ jobs:
run: |
docker --version
python -m pip install --upgrade pip wheel
pip install -e .
pip install -e ".[test]"
python -c "import autogen"
pip install pytest-cov>=5 pytest-asyncio
- name: Install packages for test when needed
Expand Down Expand Up @@ -290,9 +289,8 @@ jobs:
run: |
docker --version
python -m pip install --upgrade pip wheel
pip install -e .[lmm]
pip install -e .[lmm,test]
python -c "import autogen"
pip install pytest-cov>=5
- name: Coverage
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/openai.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ jobs:
run: |
docker --version
python -m pip install --upgrade pip wheel
pip install -e.
pip install -e ".[test]"
python -c "import autogen"
pip install pytest-cov>=5 pytest-asyncio
- name: Install packages for test when needed
if: matrix.python-version == '3.9'
run: |
Expand Down
11 changes: 8 additions & 3 deletions autogen/agentchat/realtime_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai
#
# SPDX-License-Identifier: Apache-2.0

from .function_observer import FunctionObserver
from .realtime_agent import RealtimeAgent
from .twilio_observer import TwilioAudioAdapter
from .websocket_observer import WebsocketAudioAdapter
from .realtime_observer import RealtimeObserver
from .twilio_audio_adapter import TwilioAudioAdapter
from .websocket_audio_adapter import WebSocketAudioAdapter

__all__ = ["RealtimeAgent", "FunctionObserver", "TwilioAudioAdapter", "WebsocketAudioAdapter"]
__all__ = ["FunctionObserver", "RealtimeAgent", "RealtimeObserver", "TwilioAudioAdapter", "WebSocketAudioAdapter"]
192 changes: 0 additions & 192 deletions autogen/agentchat/realtime_agent/client.py

This file was deleted.

67 changes: 30 additions & 37 deletions autogen/agentchat/realtime_agent/function_observer.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,37 @@
# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT

import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any
from logging import Logger, getLogger
from typing import TYPE_CHECKING, Any, Optional

from asyncer import asyncify
from pydantic import BaseModel

from .realtime_observer import RealtimeObserver

if TYPE_CHECKING:
from .realtime_agent import RealtimeAgent

logger = logging.getLogger(__name__)


class FunctionObserver(RealtimeObserver):
"""Observer for handling function calls from the OpenAI Realtime API."""

def __init__(self, agent: "RealtimeAgent") -> None:
"""Observer for handling function calls from the OpenAI Realtime API.
Args:
agent (RealtimeAgent): The realtime agent attached to the observer.
"""
super().__init__()
self._agent = agent
def __init__(self, *, logger: Optional[Logger] = None) -> None:
"""Observer for handling function calls from the OpenAI Realtime API."""
super().__init__(logger=logger)

async def update(self, response: dict[str, Any]) -> None:
async def on_event(self, event: dict[str, Any]) -> None:
"""Handle function call events from the OpenAI Realtime API.
Args:
response (dict[str, Any]): The response from the OpenAI Realtime API.
event (dict[str, Any]): The event from the OpenAI Realtime API.
"""
if response.get("type") == "response.function_call_arguments.done":
logger.info(f"Received event: {response['type']}", response)
if event["type"] == "response.function_call_arguments.done":
self.logger.info(f"Received event: {event['type']}", event)
await self.call_function(
call_id=response["call_id"], name=response["name"], kwargs=json.loads(response["arguments"])
call_id=event["call_id"],
name=event["name"],
kwargs=json.loads(event["arguments"]),
)

async def call_function(self, call_id: str, name: str, kwargs: dict[str, Any]) -> None:
Expand All @@ -54,33 +43,37 @@ async def call_function(self, call_id: str, name: str, kwargs: dict[str, Any]) -
kwargs (Any[str, Any]): The arguments to pass to the function.
"""

if name in self._agent.realtime_functions:
_, func = self._agent.realtime_functions[name]
if name in self.agent._registred_realtime_functions:
_, func = self.agent._registred_realtime_functions[name]
func = func if asyncio.iscoroutinefunction(func) else asyncify(func)
try:
result = await func(**kwargs)
except Exception:
result = "Function call failed"
logger.warning(f"Function call failed: {name}")
self.logger.info(f"Function call failed: {name=}, {kwargs=}", stack_info=True)

if isinstance(result, BaseModel):
result = result.model_dump_json()
elif not isinstance(result, str):
result = json.dumps(result)
try:
result = json.dumps(result)
except Exception:
result = str(result)

await self.client.function_result(call_id, result)

async def run(self) -> None:
"""Run the observer.
Initialize the session with the OpenAI Realtime API.
"""
await self.initialize_session()
await self.realtime_client.send_function_result(call_id, result)

async def initialize_session(self) -> None:
"""Add registered tools to OpenAI with a session update."""
session_update = {
"tools": [schema for schema, _ in self._agent.realtime_functions.values()],
"tools": [schema for schema, _ in self.agent._registred_realtime_functions.values()],
"tool_choice": "auto",
}
await self.client.session_update(session_update)
await self.realtime_client.session_update(session_update)

async def run_loop(self) -> None:
"""Run the observer loop."""
pass


if TYPE_CHECKING:
function_observer: RealtimeObserver = FunctionObserver()
Loading

0 comments on commit c84000a

Please sign in to comment.