Skip to content

Commit

Permalink
Refactoring of core telemetry class, group chat run_chat added
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Sze <[email protected]>
  • Loading branch information
marklysze committed Dec 27, 2024
1 parent 1beb7cb commit cd0251a
Show file tree
Hide file tree
Showing 19 changed files with 549 additions and 301 deletions.
3 changes: 2 additions & 1 deletion autogen/agentchat/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

logger = logging.getLogger(__name__)
Prerequisite = tuple[int, int]
from ..telemetry.telemetry_core import SpanKind, get_current_telemetry
from ..telemetry.base_telemetry import SpanKind
from ..telemetry.intrumentation_manager import get_current_telemetry


@dataclass
Expand Down
5 changes: 3 additions & 2 deletions autogen/agentchat/conversable_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
from ..io.base import IOStream
from ..oai.client import ModelClient, OpenAIWrapper
from ..runtime_logging import log_event, log_function_use, log_new_agent, logging_enabled
from ..telemetry.telemetry_core import EventKind, SpanKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind, SpanKind
from ..telemetry.intrumentation_manager import get_current_telemetry
from .agent import Agent, LLMAgent
from .chat import ChatResult, a_initiate_chats, initiate_chats
from .utils import consolidate_chat_info, gather_usage_summary
Expand Down Expand Up @@ -1206,7 +1207,7 @@ def my_message(sender: ConversableAgent, recipient: ConversableAgent, context: d
for i in range(max_turns):

if telemetry:
round_span_context = telemetry.start_span(
_ = telemetry.start_span(
kind=SpanKind.ROUND,
attributes={
"ag2.chat_function": inspect.currentframe().f_code.co_name,
Expand Down
229 changes: 209 additions & 20 deletions autogen/agentchat/groupchat.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion autogen/oai/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@

from autogen.oai.client_utils import validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

TOOL_ENABLED = anthropic_version >= "0.23.1"
if TOOL_ENABLED:
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@

from autogen.oai.client_utils import validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry


class BedrockClient:
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/cerebras.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@

from autogen.oai.client_utils import should_hide_tools, validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

CEREBRAS_PRICING_1K = {
# Convert pricing per million to per thousand tokens.
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from autogen.runtime_logging import log_chat_completion, log_new_client, log_new_wrapper, logging_enabled
from autogen.token_count_utils import count_token

from ..telemetry.telemetry_core import EventKind, SpanKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

TOOL_ENABLED = False
try:
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/cohere.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@

from autogen.oai.client_utils import logging_formatter, validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

logger = logging.getLogger(__name__)
if not logger.handlers:
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
Tool as vaiTool,
)

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/groq.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@

from autogen.oai.client_utils import should_hide_tools, validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

# Cost per thousand tokens - Input / Output (NOTE: Convert $/Million to $/K)
GROQ_PRICING_1K = {
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/mistral.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@

from autogen.oai.client_utils import should_hide_tools, validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry

MISTRAL_PRICING_1K = {
"mistral-large-latest": (0.002, 0.006),
Expand Down
3 changes: 2 additions & 1 deletion autogen/oai/together.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@

from autogen.oai.client_utils import should_hide_tools, validate_parameter

from ..telemetry.telemetry_core import EventKind, get_current_telemetry
from ..telemetry.base_telemetry import EventKind
from ..telemetry.intrumentation_manager import get_current_telemetry


class TogetherClient:
Expand Down
11 changes: 11 additions & 0 deletions autogen/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai
#
# SPDX-License-Identifier: Apache-2.0
from .intrumentation_manager import InstrumentationManager, get_current_telemetry, telemetry_context
from .providers import CostTrackerProvider, MermaidDiagramProvider, OpenTelemetryProvider

__all__ = [
"InstrumentationManager",
"telemetry_context",
"get_current_telemetry",
"OpenTelemetryProvider",
"CostTrackerProvider",
"MermaidDiagramProvider",
]
260 changes: 260 additions & 0 deletions autogen/telemetry/base_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai
#
# SPDX-License-Identifier: Apache-2.0
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional

# Number of bits for trace and span IDs
TRACE_ID_BITS = 128
SPAN_ID_BITS = 64


class SpanKind(Enum):
"""Enumeration of span kinds
Spans represent a unit of work within a trace. Typically spans have child spans and events.
Descriptions:
WORKFLOW: Main workflow span
CHATS: Multiple chats, e.g. initiate_chats
CHAT: Single chat, e.g. initiate_chat
NESTED_CHAT: A nested chat, e.g. _summary_from_nested_chats
GROUP_CHAT: A groupchat, e.g. run_chat
ROUND: A round within a chat (where chats have max_round or max_turn)
REPLY: Agent replying to another agent
REPLY_FUNCTION: Functions executed during a reply, such as generate_oai_reply and check_termination_and_human_reply
SUMMARY: Summarization of a chat
REASONING: Agent reasoning step, for advanced agents like ReasoningAgent and CaptainAgent
GROUPCHAT_SELECT_SPEAKER: GroupChat speaker selection (covers all selection methods)
SWARM_ON_CONDITION: Swarm-specific, ON_CONDITION hand off
"""

WORKFLOW = "workflow"
CHATS = "chats"
CHAT = "chat"
NESTED_CHAT = "nested_chat"
GROUP_CHAT = "group_chat"
ROUND = "round"
REPLY = "reply"
REPLY_FUNCTION = "reply_function"
SUMMARY = "summary"
REASONING = "reasoning"
GROUPCHAT_SELECT_SPEAKER = "groupchat_select_speaker"
SWARM_ON_CONDITION = "swarm_on_condition"


class EventKind(Enum):
"""Enumeration of span event kinds
Events represent a singular point in time within a span, capturing specific moments or actions.
Descriptions:
AGENT_TRANSITION: Transition moved from one agent to another
AGENT_CREATION: Creation of an Agent
GROUPCHAT_CREATION: Creation of a GroupChat
LLM_CREATE: LLM execution
AGENT_SEND_MSG: Agent sending a message to another agent
TOOL_EXECUTION: Tool or Function execution
COST: Cost event
SWARM_TRANSITION: Swarm-specific, transition reason (e.g. ON_CONDITION)
CONSOLE_PRINT: Console output (TBD)
"""

AGENT_TRANSITION = "agent_transition"
AGENT_CREATION = "agent_creation"
GROUPCHAT_CREATION = "groupchat_creation"
LLM_CREATE = "llm_create"
AGENT_SEND_MSG = "agent_send_msg"
TOOL_EXECUTION = "tool_execution"
COST = "cost"
SWARM_TRANSITION = "swarm_transition"
CONSOLE_PRINT = "console_print" # TBD


@dataclass
class SpanContext:
"""Data class to represent a span (a unit of work within a trace)."""

kind: SpanKind
trace_id: str
timestamp: datetime = None
parent_span_id: Optional[str] = None
attributes: Dict[str, Any] = None
core_span_id: Optional[str] = None

def __post_init__(self):
# Timestamps for ordering spans
self.timestamp = datetime.now()

if self.attributes is None:
self.attributes = {}

def set_attribute(self, key: str, value: Any) -> None:
"""Set/update an attribute."""
self.attributes[key] = value

def has_attribute(self, key: str) -> bool:
"""Check if an attribute exists."""
return key in self.attributes


@dataclass
class EventContext:
"""Data class to represent a span event (point in time events occurring within a span)."""

kind: EventKind
attributes: Dict[str, Any] = None

def __post_init__(self):
if self.attributes is None:
self.attributes = {}


class TelemetryProvider(ABC):
"""Base class for telemetry providers.
All telemetry providers are to implement this interface and then be registered
with the InstrumentationManager.
Telemetry follows the OpenTelemetry signals terminology of traces, spans, and events. However, it is
not restricted to OpenTelemetry use and can be used to gather AG2 activity or represent it in other formats,
such as diagrams or logging to files/databases.
Trace: Full workflow, including object creations, chats, summaries, etc.
Span: A unit of work within a trace, such as a chat, round, or agent reply. Spans typically have child spans and events.
Event: A singular point in time within a span, such as LLM execution, agent creation, agent transition, or cost.
Multiple providers can be attached to the InstrumentationManager for simultaneous and real-time telemetry.
"""

@abstractmethod
def start_trace(self, name: str, core_span_id: str, attributes: Dict[str, Any] = None) -> SpanContext:
"""Start a new trace.
Args:
name: User-provided name of the trace
core_span_id: Unique ID for the trace-level span that's created with a trace
attributes: Optional attributes for the trace
Returns:
SpanContext: The trace-level span
"""
pass

@abstractmethod
def start_span(
self,
kind: SpanKind,
core_span_id: str,
parent_context: Optional[SpanContext] = None,
attributes: Dict[str, Any] = None,
) -> SpanContext:
"""Start a new span.
Args:
kind (SpanKind): The kind of span
core_span_id (str): Unique ID for the span, determined by the InstrumentationManager and common to the span across all providers
parent_context (SpanContext or None): Optional parent span context to help maintain hierarchy
Note: a provider, such as OpenTelemetry, may not require this to maintain hierarchy
attributes (dict or None): Optional attributes for the span
Returns:
SpanContext: The created span context
"""
pass

@abstractmethod
def set_span_attribute(self, context: SpanContext, key: str, value: Any) -> None:
"""Set an attribute on a span.
Args:
context (SpanContext): The span context
key (str): The attribute key
value (Any): The attribute value
"""
pass

@abstractmethod
def end_span(self, context: SpanContext) -> None:
"""Ends a span.
Note: Some telemetry providers, like OpenTelemetry, must have an end_span for every start_span to create a valid trace.
Args:
context (SpanContext): The span context to end"""
pass

@abstractmethod
def record_event(
self, span_context: SpanContext, event_name: str, kind: EventKind, attributes: Dict[str, Any] = None
) -> None:
"""Record a span event.
Args:
span_context (SpanContext): The span context to record the event in
event_name (str): The name of the event
kind (EventKind): The kind of event
attributes (dict or None): Optional attributes for the event
"""
pass

@abstractmethod
def convert_attribute_value(self, value: Any) -> Any:
"""Convert an attribute value to a format suitable for the provider.
Args:
value (Any): The value to convert
Returns:
Any: The converted value
"""
pass


# STATIC METHODS


@staticmethod
def generate_id(bits: int) -> str:
"""Generate a random ID of specified bit length.
Creates a random identifier by generating a UUID4 and masking it to the desired bit length.
The result is formatted as a lowercase hex string with the appropriate number of leading zeros.
Based on the requirement for OpenTelemetry trace and span IDs.
Args:
bits: Number of bits for the ID (e.g. 128 for trace ID, 64 for span ID)
Returns:
str: Hex string representation of the ID with appropriate length (bits/4 characters)
Examples:
>>> InstrumentationManager.generate_id(128) # For trace ID
'a1b2c3d4e5f67890a1b2c3d4e5f67890'
>>> InstrumentationManager.generate_id(64) # For span ID
'a1b2c3d4e5f67890'
"""
hex_chars = bits // 4 # Each hex char represents 4 bits
return format(uuid.uuid4().int & ((1 << bits) - 1), f"0{hex_chars}x")


@staticmethod
def _is_list_of_string_dicts(item: Any) -> bool:
"""Check if an object is a list of dictionaries with string values, for handling messages
Args:
item: The object to check
Returns:
bool: True if the object is a list of dictionaries with string values, False otherwise
"""
if not isinstance(item, list):
return False
if not all(isinstance(d, dict) for d in item):
return False
return all(isinstance(key, str) and isinstance(value, str) for d in item for key, value in d.items())
Loading

0 comments on commit cd0251a

Please sign in to comment.