Skip to content

Commit

Permalink
feat(core): Support opentelemetry exporter (eosphoros-ai#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyinc authored Jul 5, 2024
1 parent 84fc1fc commit bf978d2
Show file tree
Hide file tree
Showing 39 changed files with 1,176 additions and 218 deletions.
16 changes: 15 additions & 1 deletion .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,18 @@ DBGPT_LOG_LEVEL=INFO
## Non-streaming scene retries
# DBGPT_APP_SCENE_NON_STREAMING_RETRIES_BASE=1
## Non-streaming scene parallelism
# DBGPT_APP_SCENE_NON_STREAMING_PARALLELISM_BASE=1
# DBGPT_APP_SCENE_NON_STREAMING_PARALLELISM_BASE=1

#*******************************************************************#
#** Observability Config **#
#*******************************************************************#
## Whether to enable DB-GPT send trace to OpenTelemetry
# TRACER_TO_OPEN_TELEMETRY=False
## Following configurations are only valid when TRACER_TO_OPEN_TELEMETRY=True
## More details see https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html
# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4317
# OTEL_EXPORTER_OTLP_TRACES_INSECURE=False
# OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE=
# OTEL_EXPORTER_OTLP_TRACES_HEADERS=
# OTEL_EXPORTER_OTLP_TRACES_TIMEOUT=
# OTEL_EXPORTER_OTLP_TRACES_COMPRESSION=
20 changes: 10 additions & 10 deletions dbgpt/agent/core/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ async def send(
"sender": self.name,
"recipient": recipient.name,
"reviewer": reviewer.name if reviewer else None,
"agent_message": message.to_dict(),
"agent_message": json.dumps(message.to_dict(), ensure_ascii=False),
"request_reply": request_reply,
"is_recovery": is_recovery,
"conv_uid": self.not_null_agent_context.conv_id,
Expand Down Expand Up @@ -222,7 +222,7 @@ async def receive(
"sender": sender.name,
"recipient": self.name,
"reviewer": reviewer.name if reviewer else None,
"agent_message": message.to_dict(),
"agent_message": json.dumps(message.to_dict(), ensure_ascii=False),
"request_reply": request_reply,
"silent": silent,
"is_recovery": is_recovery,
Expand Down Expand Up @@ -276,7 +276,7 @@ async def generate_reply(
"sender": sender.name,
"recipient": self.name,
"reviewer": reviewer.name if reviewer else None,
"received_message": received_message.to_dict(),
"received_message": json.dumps(received_message.to_dict()),
"conv_uid": self.not_null_agent_context.conv_id,
"rely_messages": (
[msg.to_dict() for msg in rely_messages] if rely_messages else None
Expand All @@ -287,9 +287,6 @@ async def generate_reply(
try:
with root_tracer.start_span(
"agent.generate_reply._init_reply_message",
metadata={
"received_message": received_message.to_dict(),
},
) as span:
# initialize reply message
reply_message: AgentMessage = self._init_reply_message(
Expand Down Expand Up @@ -324,9 +321,10 @@ async def generate_reply(
with root_tracer.start_span(
"agent.generate_reply.thinking",
metadata={
"thinking_messages": [
msg.to_dict() for msg in thinking_messages
],
"thinking_messages": json.dumps(
[msg.to_dict() for msg in thinking_messages],
ensure_ascii=False,
)
},
) as span:
# 1.Think about how to do things
Expand Down Expand Up @@ -574,7 +572,9 @@ async def initiate_chat(
"sender": self.name,
"recipient": recipient.name,
"reviewer": reviewer.name if reviewer else None,
"agent_message": agent_message.to_dict(),
"agent_message": json.dumps(
agent_message.to_dict(), ensure_ascii=False
),
"conv_uid": self.not_null_agent_context.conv_id,
},
):
Expand Down
20 changes: 3 additions & 17 deletions dbgpt/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dbgpt._private.config import Config
from dbgpt.component import SystemApp
from dbgpt.storage import DBType
from dbgpt.util.parameter_utils import BaseParameters
from dbgpt.util.parameter_utils import BaseServerParameters

ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_PATH)
Expand Down Expand Up @@ -199,7 +199,7 @@ def _create_mysql_database(db_name: str, db_url: str, try_to_create_db: bool = F


@dataclass
class WebServerParameters(BaseParameters):
class WebServerParameters(BaseServerParameters):
host: Optional[str] = field(
default="0.0.0.0", metadata={"help": "Webserver deploy host"}
)
Expand Down Expand Up @@ -247,21 +247,7 @@ class WebServerParameters(BaseParameters):
"text2vec --rerank --model_name xxx --model_path xxx`"
},
)
log_level: Optional[str] = field(
default=None,
metadata={
"help": "Logging level",
"valid_values": [
"FATAL",
"ERROR",
"WARNING",
"WARNING",
"INFO",
"DEBUG",
"NOTSET",
],
},
)

light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"})
log_file: Optional[str] = field(
default="dbgpt_webserver.log",
Expand Down
15 changes: 12 additions & 3 deletions dbgpt/app/dbgpt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,15 @@ def mount_static_files(app: FastAPI):
def _get_webserver_params(args: List[str] = None):
from dbgpt.util.parameter_utils import EnvArgumentParser

parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option(
WebServerParameters
parser = EnvArgumentParser()

env_prefix = "webserver_"
webserver_params: WebServerParameters = parser.parse_args_into_dataclass(
WebServerParameters,
env_prefixes=[env_prefix],
command_args=args,
)
return WebServerParameters(**vars(parser.parse_args(args=args)))
return webserver_params


def initialize_app(param: WebServerParameters = None, args: List[str] = None):
Expand Down Expand Up @@ -245,6 +250,10 @@ def run_webserver(param: WebServerParameters = None):
os.path.join(LOGDIR, param.tracer_file),
system_app=system_app,
tracer_storage_cls=param.tracer_storage_cls,
enable_open_telemetry=param.tracer_to_open_telemetry,
otlp_endpoint=param.otel_exporter_otlp_traces_endpoint,
otlp_insecure=param.otel_exporter_otlp_traces_insecure,
otlp_timeout=param.otel_exporter_otlp_traces_timeout,
)

with root_tracer.start_span(
Expand Down
38 changes: 24 additions & 14 deletions dbgpt/core/awel/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DefaultExecutorFactory,
blocking_func_to_async,
)
from dbgpt.util.tracer import root_tracer

from ..dag.base import DAG, DAGContext, DAGNode, DAGVar
from ..task.base import EMPTY_DATA, OUT, T, TaskOutput, is_empty_data
Expand Down Expand Up @@ -218,10 +219,11 @@ async def call(
"""
if not is_empty_data(call_data):
call_data = {"data": call_data}
out_ctx = await self._runner.execute_workflow(
self, call_data, exist_dag_ctx=dag_ctx
)
return out_ctx.current_task_context.task_output.output
with root_tracer.start_span("dbgpt.awel.operator.call"):
out_ctx = await self._runner.execute_workflow(
self, call_data, exist_dag_ctx=dag_ctx
)
return out_ctx.current_task_context.task_output.output

def _blocking_call(
self,
Expand Down Expand Up @@ -265,19 +267,27 @@ async def call_stream(
"""
if call_data != EMPTY_DATA:
call_data = {"data": call_data}
out_ctx = await self._runner.execute_workflow(
self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx
)
with root_tracer.start_span("dbgpt.awel.operator.call_stream"):

out_ctx = await self._runner.execute_workflow(
self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx
)

task_output = out_ctx.current_task_context.task_output
if task_output.is_stream:
return out_ctx.current_task_context.task_output.output_stream
else:
task_output = out_ctx.current_task_context.task_output
if task_output.is_stream:
stream_generator = (
out_ctx.current_task_context.task_output.output_stream
)
else:

async def _gen():
yield task_output.output
# No stream output, wrap the output in a stream
async def _gen():
yield task_output.output

return _gen()
stream_generator = _gen()
return root_tracer.wrapper_async_stream(
stream_generator, "dbgpt.awel.operator.call_stream.iterate"
)

def _blocking_call_stream(
self,
Expand Down
38 changes: 32 additions & 6 deletions dbgpt/core/awel/runner/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any, Dict, List, Optional, Set, cast

from dbgpt.component import SystemApp
from dbgpt.util.tracer import root_tracer

from ..dag.base import DAGContext, DAGVar
from ..operators.base import CALL_DATA, BaseOperator, WorkflowRunner
Expand Down Expand Up @@ -90,9 +91,20 @@ async def execute_workflow(
# Save dag context
await node.dag._save_dag_ctx(dag_ctx)
await job_manager.before_dag_run()
await self._execute_node(
job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app
)

with root_tracer.start_span(
"dbgpt.awel.workflow.run_workflow",
metadata={
"exist_dag_ctx": exist_dag_ctx is not None,
"event_loop_task_id": event_loop_task_id,
"streaming_call": streaming_call,
"awel_node_id": node.node_id,
"awel_node_name": node.node_name,
},
):
await self._execute_node(
job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app
)
if not streaming_call and node.dag and exist_dag_ctx is None:
# streaming call not work for dag end
# if exist_dag_ctx is not None, it means current dag is a sub dag
Expand Down Expand Up @@ -158,9 +170,23 @@ async def _execute_node(
if system_app is not None and node.system_app is None:
node.set_system_app(system_app)

await node._run(dag_ctx, task_ctx.log_id)
node_outputs[node.node_id] = dag_ctx.current_task_context
task_ctx.set_current_state(TaskState.SUCCESS)
run_metadata = {
"awel_node_id": node.node_id,
"awel_node_name": node.node_name,
"awel_node_type": str(node),
"state": TaskState.RUNNING.value,
"task_log_id": task_ctx.log_id,
}
with root_tracer.start_span(
"dbgpt.awel.workflow.run_operator", metadata=run_metadata
) as span:
await node._run(dag_ctx, task_ctx.log_id)
node_outputs[node.node_id] = dag_ctx.current_task_context
task_ctx.set_current_state(TaskState.SUCCESS)

run_metadata["skip_node_ids"] = ",".join(skip_node_ids)
run_metadata["state"] = TaskState.SUCCESS.value
span.metadata = run_metadata

if isinstance(node, BranchOperator):
skip_nodes = task_ctx.metadata.get("skip_node_names", [])
Expand Down
20 changes: 17 additions & 3 deletions dbgpt/core/awel/trigger/http_trigger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Http trigger for AWEL."""

import json
import logging
from enum import Enum
Expand All @@ -24,6 +25,7 @@
model_to_dict,
)
from dbgpt.util.i18n_utils import _
from dbgpt.util.tracer import root_tracer

from ..dag.base import DAG
from ..flow import (
Expand Down Expand Up @@ -650,12 +652,21 @@ async def _trigger_dag(
from fastapi import BackgroundTasks
from fastapi.responses import StreamingResponse

span_id = root_tracer._parse_span_id(body)

leaf_nodes = dag.leaf_nodes
if len(leaf_nodes) != 1:
raise ValueError("HttpTrigger just support one leaf node in dag")
end_node = cast(BaseOperator, leaf_nodes[0])
metadata = {
"awel_node_id": end_node.node_id,
"awel_node_name": end_node.node_name,
}
if not streaming_response:
return await end_node.call(call_data=body)
with root_tracer.start_span(
"dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata
):
return await end_node.call(call_data=body)
else:
headers = response_headers
media_type = response_media_type if response_media_type else "text/event-stream"
Expand All @@ -666,15 +677,18 @@ async def _trigger_dag(
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
}
generator = await end_node.call_stream(call_data=body)
_generator = await end_node.call_stream(call_data=body)
trace_generator = root_tracer.wrapper_async_stream(
_generator, "dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata
)

async def _after_dag_end():
await dag._after_dag_end(end_node.current_event_loop_task_id)

background_tasks = BackgroundTasks()
background_tasks.add_task(_after_dag_end)
return StreamingResponse(
generator,
trace_generator,
headers=headers,
media_type=media_type,
background=background_tasks,
Expand Down
4 changes: 4 additions & 0 deletions dbgpt/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class ModelInstance:
prompt_template: Optional[str] = None
last_heartbeat: Optional[datetime] = None

def to_dict(self) -> Dict:
"""Convert to dict"""
return asdict(self)


class WorkerApplyType(str, Enum):
START = "start"
Expand Down
Loading

0 comments on commit bf978d2

Please sign in to comment.