Skip to content

Commit

Permalink
Merge pull request #2113 from Agenta-AI/feature/observability-checkpo…
Browse files Browse the repository at this point in the history
…int-2

[Feature] Observability Checkpoint 2
  • Loading branch information
aybruhm authored Nov 12, 2024
2 parents c715382 + 199d7ee commit 5fa388e
Show file tree
Hide file tree
Showing 176 changed files with 25,135 additions and 6,381 deletions.
4 changes: 2 additions & 2 deletions agenta-backend/agenta_backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

if os.environ["FEATURE_FLAG"] in ["cloud", "cloud-dev"]:
if os.environ.get("FEATURE_FLAG") in ["cloud", "cloud-dev"]:
import agenta_backend.cloud.__init__
if os.environ["FEATURE_FLAG"] in ["ee"]:
if os.environ.get("FEATURE_FLAG") in ["ee"]:
import agenta_backend.ee.__init__
File renamed without changes.
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions agenta-backend/agenta_backend/apis/fastapi/observability/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import List, Optional
from pydantic import BaseModel

from agenta_backend.apis.fastapi.shared.models import VersionedModel

from agenta_backend.core.observability.dtos import (
OTelSpanDTO,
SpanDTO,
TreeDTO,
RootDTO,
)


class CollectStatusResponse(VersionedModel):
status: str


class OTelSpansResponse(VersionedModel):
count: Optional[int] = None
spans: List[OTelSpanDTO]


class AgentaNodeDTO(SpanDTO):
pass


class AgentaNodesDTO(BaseModel):
nodes: List[AgentaNodeDTO]


class AgentaTreeDTO(BaseModel):
tree: TreeDTO

nodes: List[AgentaNodeDTO]


class AgentaTreesDTO(BaseModel):
trees: List[AgentaTreeDTO]


class AgentaRootDTO(BaseModel):
root: RootDTO

trees: List[AgentaTreeDTO]


class AgentaRootsDTO(BaseModel):
roots: List[AgentaRootDTO]


class AgentaNodesResponse(VersionedModel, AgentaNodesDTO):
count: Optional[int] = None


class AgentaTreesResponse(VersionedModel, AgentaTreesDTO):
count: Optional[int] = None


class AgentaRootsResponse(VersionedModel, AgentaRootsDTO):
count: Optional[int] = None
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import List
from datetime import datetime

import agenta_backend.apis.fastapi.observability.opentelemetry.traces_proto as Trace_Proto

from google.protobuf.json_format import MessageToDict

from agenta_backend.core.observability.dtos import (
OTelSpanDTO,
OTelContextDTO,
OTelEventDTO,
OTelLinkDTO,
)


SPAN_KINDS = [
"SPAN_KIND_UNSPECIFIED",
"SPAN_KIND_INTERNAL",
"SPAN_KIND_SERVER",
"SPAN_KIND_CLIENT",
"SPAN_KIND_PRODUCER",
"SPAN_KIND_CONSUMER",
]

SPAN_STATUS_CODES = [
"STATUS_CODE_UNSET",
"STATUS_CODE_OK",
"STATUS_CODE_ERROR",
]


def _parse_attribute(attribute):
raw_value = attribute.value
value_type = list(MessageToDict(raw_value).keys())[0].replace("V", "_v")
clean_value = getattr(raw_value, value_type)

return (attribute.key, clean_value)


def _parse_timestamp(timestamp_ns: int) -> str:
timestamp = timestamp_ns / 1_000_000_000

return datetime.fromtimestamp(timestamp).isoformat(timespec="microseconds")


def parse_otlp_stream(otlp_stream: bytes) -> List[OTelSpanDTO]:
proto = Trace_Proto.TracesData()
proto.ParseFromString(otlp_stream)

otel_span_dtos = []

for resource_span in proto.resource_spans:
for scope_span in resource_span.scope_spans:
for span in scope_span.spans:
# SPAN CONTEXT
s_trace_id = "0x" + span.trace_id.hex()
s_span_id = "0x" + span.span_id.hex()
s_context = OTelContextDTO(trace_id=s_trace_id, span_id=s_span_id)

# SPAN PARENT CONTEXT
s_parent_id = span.parent_span_id.hex()
s_parent_id = "0x" + s_parent_id if s_parent_id else None
p_context = (
OTelContextDTO(trace_id=s_trace_id, span_id=s_parent_id)
if s_parent_id
else None
)

# SPAN NAME
s_name = span.name

# SPAN KIND
s_kind = SPAN_KINDS[span.kind]

# SPAN TIME
s_start_time = _parse_timestamp(span.start_time_unix_nano)
s_end_time = _parse_timestamp(span.end_time_unix_nano)

# SPAN STATUS
s_status_code = SPAN_STATUS_CODES[
span.status.code if span.status.code else 0
]
s_status_message = (
span.status.message if span.status.message != "" else None
)

# SPAN ATTRIBUTES
s_attributes = {
k: v
for k, v in [
_parse_attribute(attribute) for attribute in span.attributes
]
}

# SPAN EVENTS
s_events = [
OTelEventDTO(
name=event.name,
timestamp=_parse_timestamp(event.time_unix_nano),
attributes={
k: v
for k, v in [
_parse_attribute(attribute)
for attribute in event.attributes
]
},
)
for event in span.events
]
s_events = s_events if len(s_events) > 0 else None

# SPAN LINKS
s_links = [
OTelLinkDTO(
context=OTelContextDTO(
trace_id="0x" + link.trace_id.hex(),
span_id="0x" + link.span_id.hex(),
),
attributes={
k: v
for k, v in [
_parse_attribute(attribute)
for attribute in link.attributes
]
},
)
for link in span.links
]
s_links = s_links if len(s_links) > 0 else None

# PUTTING IT ALL TOGETHER
otel_span_dto = OTelSpanDTO(
context=s_context,
name=s_name,
kind=s_kind,
start_time=s_start_time,
end_time=s_end_time,
status_code=s_status_code,
status_message=s_status_message,
attributes=s_attributes,
events=s_events,
parent=p_context,
links=s_links,
)

otel_span_dtos.append(otel_span_dto)

return otel_span_dtos
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from json import loads

VERSION = "0.4.1"

V_0_4_1_ATTRIBUTES_EXACT = [
# OPENLLMETRY
("gen_ai.system", "ag.meta.system"),
("gen_ai.request.base_url", "ag.meta.request.base_url"),
("gen_ai.request.endpoint", "ag.meta.request.endpoint"),
("gen_ai.request.headers", "ag.meta.request.headers"),
("gen_ai.request.type", "ag.type.node"),
("gen_ai.request.streaming", "ag.meta.request.streaming"),
("gen_ai.request.model", "ag.meta.request.model"),
("gen_ai.request.max_tokens", "ag.meta.request.max_tokens"),
("gen_ai.request.temperature", "ag.meta.request.temperature"),
("gen_ai.request.top_p", "ag.meta.request.top_p"),
("gen_ai.response.model", "ag.meta.response.model"),
("gen_ai.usage.prompt_tokens", "ag.metrics.unit.tokens.prompt"),
("gen_ai.usage.completion_tokens", "ag.metrics.unit.tokens.completion"),
("gen_ai.usage.total_tokens", "ag.metrics.unit.tokens.total"),
("llm.headers", "ag.meta.request.headers"),
("llm.request.type", "ag.type.node"),
("llm.top_k", "ag.meta.request.top_k"),
("llm.is_streaming", "ag.meta.request.streaming"),
("llm.usage.total_tokens", "ag.metrics.unit.tokens.total"),
("gen_ai.openai.api_base", "ag.meta.request.base_url"),
("db.system", "ag.meta.system"),
("db.vector.query.top_k", "ag.meta.request.top_k"),
("pinecone.query.top_k", "ag.meta.request.top_k"),
("traceloop.span.kind", "ag.type.node"),
("traceloop.entity.name", "ag.node.name"),
# OPENINFERENCE
("output.value", "ag.data.outputs"),
("input.value", "ag.data.inputs"),
("embedding.model_name", "ag.meta.request.model"),
("llm.invocation_parameters", "ag.meta.request"),
("llm.model_name", "ag.meta.request.model"),
("llm.provider", "ag.meta.provider"),
("llm.system", "ag.meta.system"),
]
V_0_4_1_ATTRIBUTES_PREFIX = [
# OPENLLMETRY
("gen_ai.prompt", "ag.data.inputs.prompt"),
("gen_ai.completion", "ag.data.outputs.completion"),
("llm.request.functions", "ag.data.inputs.functions"),
("llm.request.tools", "ag.data.inputs.tools"),
# OPENINFERENCE
("llm.token_count", "ag.metrics.unit.tokens"),
("llm.input_messages", "ag.data.inputs.prompt"),
("llm.output_messages", "ag.data.outputs.completion"),
]

V_0_4_1_ATTRIBUTES_DYNAMIC = [
# OPENLLMETRY
("traceloop.entity.input", lambda x: ("ag.data.inputs", loads(x).get("inputs"))),
("traceloop.entity.output", lambda x: ("ag.data.outputs", loads(x).get("outputs"))),
]


V_0_4_1_MAPS = {
"attributes": {
"exact": {
"from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_EXACT[::-1]},
"to": {agenta: otel for otel, agenta in V_0_4_1_ATTRIBUTES_EXACT[::-1]},
},
"prefix": {
"from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]},
"to": {agenta: otel for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]},
},
"dynamic": {
"from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_DYNAMIC[::-1]}
},
},
}
V_0_4_1_KEYS = {
"attributes": {
"exact": {
"from": list(V_0_4_1_MAPS["attributes"]["exact"]["from"].keys()),
"to": list(V_0_4_1_MAPS["attributes"]["exact"]["to"].keys()),
},
"prefix": {
"from": list(V_0_4_1_MAPS["attributes"]["prefix"]["from"].keys()),
"to": list(V_0_4_1_MAPS["attributes"]["prefix"]["to"].keys()),
},
"dynamic": {
"from": list(V_0_4_1_MAPS["attributes"]["dynamic"]["from"].keys()),
},
},
}


MAPS = {
"0.4.1": V_0_4_1_MAPS, # LATEST
}
KEYS = {
"0.4.1": V_0_4_1_KEYS, # LATEST
}

CODEX = {"maps": MAPS[VERSION], "keys": KEYS[VERSION]}
Loading

0 comments on commit 5fa388e

Please sign in to comment.