diff --git a/.env.template b/.env.template index da6fb3bca..18b436794 100644 --- a/.env.template +++ b/.env.template @@ -278,4 +278,18 @@ DBGPT_LOG_LEVEL=INFO ## Non-streaming scene retries # DBGPT_APP_SCENE_NON_STREAMING_RETRIES_BASE=1 ## Non-streaming scene parallelism -# DBGPT_APP_SCENE_NON_STREAMING_PARALLELISM_BASE=1 \ No newline at end of file +# DBGPT_APP_SCENE_NON_STREAMING_PARALLELISM_BASE=1 + +#*******************************************************************# +#** Observability Config **# +#*******************************************************************# +## Whether to enable DB-GPT send trace to OpenTelemetry +# TRACER_TO_OPEN_TELEMETRY=False +## Following configurations are only valid when TRACER_TO_OPEN_TELEMETRY=True +## More details see https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html +# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_TRACES_INSECURE=False +# OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE= +# OTEL_EXPORTER_OTLP_TRACES_HEADERS= +# OTEL_EXPORTER_OTLP_TRACES_TIMEOUT= +# OTEL_EXPORTER_OTLP_TRACES_COMPRESSION= \ No newline at end of file diff --git a/dbgpt/agent/core/base_agent.py b/dbgpt/agent/core/base_agent.py index d42ff5ec4..43be81416 100644 --- a/dbgpt/agent/core/base_agent.py +++ b/dbgpt/agent/core/base_agent.py @@ -192,7 +192,7 @@ async def send( "sender": self.name, "recipient": recipient.name, "reviewer": reviewer.name if reviewer else None, - "agent_message": message.to_dict(), + "agent_message": json.dumps(message.to_dict(), ensure_ascii=False), "request_reply": request_reply, "is_recovery": is_recovery, "conv_uid": self.not_null_agent_context.conv_id, @@ -222,7 +222,7 @@ async def receive( "sender": sender.name, "recipient": self.name, "reviewer": reviewer.name if reviewer else None, - "agent_message": message.to_dict(), + "agent_message": json.dumps(message.to_dict(), ensure_ascii=False), "request_reply": request_reply, "silent": silent, "is_recovery": is_recovery, @@ -276,7 +276,7 @@ async def generate_reply( "sender": sender.name, "recipient": self.name, "reviewer": reviewer.name if reviewer else None, - "received_message": received_message.to_dict(), + "received_message": json.dumps(received_message.to_dict()), "conv_uid": self.not_null_agent_context.conv_id, "rely_messages": ( [msg.to_dict() for msg in rely_messages] if rely_messages else None @@ -287,9 +287,6 @@ async def generate_reply( try: with root_tracer.start_span( "agent.generate_reply._init_reply_message", - metadata={ - "received_message": received_message.to_dict(), - }, ) as span: # initialize reply message reply_message: AgentMessage = self._init_reply_message( @@ -324,9 +321,10 @@ async def generate_reply( with root_tracer.start_span( "agent.generate_reply.thinking", metadata={ - "thinking_messages": [ - msg.to_dict() for msg in thinking_messages - ], + "thinking_messages": json.dumps( + [msg.to_dict() for msg in thinking_messages], + ensure_ascii=False, + ) }, ) as span: # 1.Think about how to do things @@ -574,7 +572,9 @@ async def initiate_chat( "sender": self.name, "recipient": recipient.name, "reviewer": reviewer.name if reviewer else None, - "agent_message": agent_message.to_dict(), + "agent_message": json.dumps( + agent_message.to_dict(), ensure_ascii=False + ), "conv_uid": self.not_null_agent_context.conv_id, }, ): diff --git a/dbgpt/app/base.py b/dbgpt/app/base.py index ec392875b..95ed452ab 100644 --- a/dbgpt/app/base.py +++ b/dbgpt/app/base.py @@ -9,7 +9,7 @@ from dbgpt._private.config import Config from dbgpt.component import SystemApp from dbgpt.storage import DBType -from dbgpt.util.parameter_utils import BaseParameters +from dbgpt.util.parameter_utils import BaseServerParameters ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(ROOT_PATH) @@ -199,7 +199,7 @@ def _create_mysql_database(db_name: str, db_url: str, try_to_create_db: bool = F @dataclass -class WebServerParameters(BaseParameters): +class WebServerParameters(BaseServerParameters): host: Optional[str] = field( default="0.0.0.0", metadata={"help": "Webserver deploy host"} ) @@ -247,21 +247,7 @@ class WebServerParameters(BaseParameters): "text2vec --rerank --model_name xxx --model_path xxx`" }, ) - log_level: Optional[str] = field( - default=None, - metadata={ - "help": "Logging level", - "valid_values": [ - "FATAL", - "ERROR", - "WARNING", - "WARNING", - "INFO", - "DEBUG", - "NOTSET", - ], - }, - ) + light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"}) log_file: Optional[str] = field( default="dbgpt_webserver.log", diff --git a/dbgpt/app/dbgpt_server.py b/dbgpt/app/dbgpt_server.py index 9e7481b48..1f7ee4921 100644 --- a/dbgpt/app/dbgpt_server.py +++ b/dbgpt/app/dbgpt_server.py @@ -111,10 +111,15 @@ def mount_static_files(app: FastAPI): def _get_webserver_params(args: List[str] = None): from dbgpt.util.parameter_utils import EnvArgumentParser - parser: argparse.ArgumentParser = EnvArgumentParser.create_argparse_option( - WebServerParameters + parser = EnvArgumentParser() + + env_prefix = "webserver_" + webserver_params: WebServerParameters = parser.parse_args_into_dataclass( + WebServerParameters, + env_prefixes=[env_prefix], + command_args=args, ) - return WebServerParameters(**vars(parser.parse_args(args=args))) + return webserver_params def initialize_app(param: WebServerParameters = None, args: List[str] = None): @@ -245,6 +250,10 @@ def run_webserver(param: WebServerParameters = None): os.path.join(LOGDIR, param.tracer_file), system_app=system_app, tracer_storage_cls=param.tracer_storage_cls, + enable_open_telemetry=param.tracer_to_open_telemetry, + otlp_endpoint=param.otel_exporter_otlp_traces_endpoint, + otlp_insecure=param.otel_exporter_otlp_traces_insecure, + otlp_timeout=param.otel_exporter_otlp_traces_timeout, ) with root_tracer.start_span( diff --git a/dbgpt/core/awel/operators/base.py b/dbgpt/core/awel/operators/base.py index 6f852fd97..6d486a1f6 100644 --- a/dbgpt/core/awel/operators/base.py +++ b/dbgpt/core/awel/operators/base.py @@ -24,6 +24,7 @@ DefaultExecutorFactory, blocking_func_to_async, ) +from dbgpt.util.tracer import root_tracer from ..dag.base import DAG, DAGContext, DAGNode, DAGVar from ..task.base import EMPTY_DATA, OUT, T, TaskOutput, is_empty_data @@ -218,10 +219,11 @@ async def call( """ if not is_empty_data(call_data): call_data = {"data": call_data} - out_ctx = await self._runner.execute_workflow( - self, call_data, exist_dag_ctx=dag_ctx - ) - return out_ctx.current_task_context.task_output.output + with root_tracer.start_span("dbgpt.awel.operator.call"): + out_ctx = await self._runner.execute_workflow( + self, call_data, exist_dag_ctx=dag_ctx + ) + return out_ctx.current_task_context.task_output.output def _blocking_call( self, @@ -265,19 +267,27 @@ async def call_stream( """ if call_data != EMPTY_DATA: call_data = {"data": call_data} - out_ctx = await self._runner.execute_workflow( - self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx - ) + with root_tracer.start_span("dbgpt.awel.operator.call_stream"): + + out_ctx = await self._runner.execute_workflow( + self, call_data, streaming_call=True, exist_dag_ctx=dag_ctx + ) - task_output = out_ctx.current_task_context.task_output - if task_output.is_stream: - return out_ctx.current_task_context.task_output.output_stream - else: + task_output = out_ctx.current_task_context.task_output + if task_output.is_stream: + stream_generator = ( + out_ctx.current_task_context.task_output.output_stream + ) + else: - async def _gen(): - yield task_output.output + # No stream output, wrap the output in a stream + async def _gen(): + yield task_output.output - return _gen() + stream_generator = _gen() + return root_tracer.wrapper_async_stream( + stream_generator, "dbgpt.awel.operator.call_stream.iterate" + ) def _blocking_call_stream( self, diff --git a/dbgpt/core/awel/runner/local_runner.py b/dbgpt/core/awel/runner/local_runner.py index dfe4bc21f..f968be9af 100644 --- a/dbgpt/core/awel/runner/local_runner.py +++ b/dbgpt/core/awel/runner/local_runner.py @@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional, Set, cast from dbgpt.component import SystemApp +from dbgpt.util.tracer import root_tracer from ..dag.base import DAGContext, DAGVar from ..operators.base import CALL_DATA, BaseOperator, WorkflowRunner @@ -90,9 +91,20 @@ async def execute_workflow( # Save dag context await node.dag._save_dag_ctx(dag_ctx) await job_manager.before_dag_run() - await self._execute_node( - job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app - ) + + with root_tracer.start_span( + "dbgpt.awel.workflow.run_workflow", + metadata={ + "exist_dag_ctx": exist_dag_ctx is not None, + "event_loop_task_id": event_loop_task_id, + "streaming_call": streaming_call, + "awel_node_id": node.node_id, + "awel_node_name": node.node_name, + }, + ): + await self._execute_node( + job_manager, node, dag_ctx, node_outputs, skip_node_ids, system_app + ) if not streaming_call and node.dag and exist_dag_ctx is None: # streaming call not work for dag end # if exist_dag_ctx is not None, it means current dag is a sub dag @@ -158,9 +170,23 @@ async def _execute_node( if system_app is not None and node.system_app is None: node.set_system_app(system_app) - await node._run(dag_ctx, task_ctx.log_id) - node_outputs[node.node_id] = dag_ctx.current_task_context - task_ctx.set_current_state(TaskState.SUCCESS) + run_metadata = { + "awel_node_id": node.node_id, + "awel_node_name": node.node_name, + "awel_node_type": str(node), + "state": TaskState.RUNNING.value, + "task_log_id": task_ctx.log_id, + } + with root_tracer.start_span( + "dbgpt.awel.workflow.run_operator", metadata=run_metadata + ) as span: + await node._run(dag_ctx, task_ctx.log_id) + node_outputs[node.node_id] = dag_ctx.current_task_context + task_ctx.set_current_state(TaskState.SUCCESS) + + run_metadata["skip_node_ids"] = ",".join(skip_node_ids) + run_metadata["state"] = TaskState.SUCCESS.value + span.metadata = run_metadata if isinstance(node, BranchOperator): skip_nodes = task_ctx.metadata.get("skip_node_names", []) diff --git a/dbgpt/core/awel/trigger/http_trigger.py b/dbgpt/core/awel/trigger/http_trigger.py index 18f7d5b9b..0f0931292 100644 --- a/dbgpt/core/awel/trigger/http_trigger.py +++ b/dbgpt/core/awel/trigger/http_trigger.py @@ -1,4 +1,5 @@ """Http trigger for AWEL.""" + import json import logging from enum import Enum @@ -24,6 +25,7 @@ model_to_dict, ) from dbgpt.util.i18n_utils import _ +from dbgpt.util.tracer import root_tracer from ..dag.base import DAG from ..flow import ( @@ -650,12 +652,21 @@ async def _trigger_dag( from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse + span_id = root_tracer._parse_span_id(body) + leaf_nodes = dag.leaf_nodes if len(leaf_nodes) != 1: raise ValueError("HttpTrigger just support one leaf node in dag") end_node = cast(BaseOperator, leaf_nodes[0]) + metadata = { + "awel_node_id": end_node.node_id, + "awel_node_name": end_node.node_name, + } if not streaming_response: - return await end_node.call(call_data=body) + with root_tracer.start_span( + "dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata + ): + return await end_node.call(call_data=body) else: headers = response_headers media_type = response_media_type if response_media_type else "text/event-stream" @@ -666,7 +677,10 @@ async def _trigger_dag( "Connection": "keep-alive", "Transfer-Encoding": "chunked", } - generator = await end_node.call_stream(call_data=body) + _generator = await end_node.call_stream(call_data=body) + trace_generator = root_tracer.wrapper_async_stream( + _generator, "dbgpt.core.trigger.http.run_dag", span_id, metadata=metadata + ) async def _after_dag_end(): await dag._after_dag_end(end_node.current_event_loop_task_id) @@ -674,7 +688,7 @@ async def _after_dag_end(): background_tasks = BackgroundTasks() background_tasks.add_task(_after_dag_end) return StreamingResponse( - generator, + trace_generator, headers=headers, media_type=media_type, background=background_tasks, diff --git a/dbgpt/model/base.py b/dbgpt/model/base.py index 6970044ea..ccf67983f 100644 --- a/dbgpt/model/base.py +++ b/dbgpt/model/base.py @@ -33,6 +33,10 @@ class ModelInstance: prompt_template: Optional[str] = None last_heartbeat: Optional[datetime] = None + def to_dict(self) -> Dict: + """Convert to dict""" + return asdict(self) + class WorkerApplyType(str, Enum): START = "start" diff --git a/dbgpt/model/cluster/apiserver/api.py b/dbgpt/model/cluster/apiserver/api.py index 2f7303059..cf44a2c5e 100644 --- a/dbgpt/model/cluster/apiserver/api.py +++ b/dbgpt/model/cluster/apiserver/api.py @@ -45,6 +45,7 @@ from dbgpt.model.parameter import ModelAPIServerParameters, WorkerType from dbgpt.util.fastapi import create_app from dbgpt.util.parameter_utils import EnvArgumentParser +from dbgpt.util.tracer import initialize_tracer, root_tracer from dbgpt.util.utils import setup_logging logger = logging.getLogger(__name__) @@ -353,23 +354,34 @@ async def chat_completion_generate( return ChatCompletionResponse(model=model_name, choices=choices, usage=usage) async def embeddings_generate( - self, model: str, texts: List[str] + self, + model: str, + texts: List[str], + span_id: Optional[str] = None, ) -> List[List[float]]: """Generate embeddings Args: model (str): Model name texts (List[str]): Texts to embed + span_id (Optional[str], optional): The span id. Defaults to None. Returns: List[List[float]]: The embeddings of texts """ - worker_manager: WorkerManager = self.get_worker_manager() - params = { - "input": texts, - "model": model, - } - return await worker_manager.embeddings(params) + with root_tracer.start_span( + "dbgpt.model.apiserver.generate_embeddings", + parent_span_id=span_id, + metadata={ + "model": model, + }, + ): + worker_manager: WorkerManager = self.get_worker_manager() + params = { + "input": texts, + "model": model, + } + return await worker_manager.embeddings(params) async def relevance_generate( self, model: str, query: str, texts: List[str] @@ -438,12 +450,29 @@ async def create_chat_completion( params["user"] = request.user # TODO check token length + trace_kwargs = { + "operation_name": "dbgpt.model.apiserver.create_chat_completion", + "metadata": { + "model": request.model, + "messages": request.messages, + "temperature": request.temperature, + "top_p": request.top_p, + "max_tokens": request.max_tokens, + "stop": request.stop, + "user": request.user, + }, + } if request.stream: generator = api_server.chat_completion_stream_generator( request.model, params, request.n ) - return StreamingResponse(generator, media_type="text/event-stream") - return await api_server.chat_completion_generate(request.model, params, request.n) + trace_generator = root_tracer.wrapper_async_stream(generator, **trace_kwargs) + return StreamingResponse(trace_generator, media_type="text/event-stream") + else: + with root_tracer.start_span(**trace_kwargs): + return await api_server.chat_completion_generate( + request.model, params, request.n + ) @router.post("/v1/embeddings", dependencies=[Depends(check_api_key)]) @@ -462,7 +491,11 @@ async def create_embeddings( data = [] async_tasks = [] for num_batch, batch in enumerate(batches): - async_tasks.append(api_server.embeddings_generate(request.model, batch)) + async_tasks.append( + api_server.embeddings_generate( + request.model, batch, span_id=root_tracer.get_current_span_id() + ) + ) # Request all embeddings in parallel batch_embeddings: List[List[List[float]]] = await asyncio.gather(*async_tasks) @@ -486,15 +519,22 @@ async def create_embeddings( dependencies=[Depends(check_api_key)], response_model=RelevanceResponse, ) -async def create_embeddings( +async def create_relevance( request: RelevanceRequest, api_server: APIServer = Depends(get_api_server) ): """Generate relevance scores for a query and a list of documents.""" await api_server.get_model_instances_or_raise(request.model, worker_type="text2vec") - scores = await api_server.relevance_generate( - request.model, request.query, request.documents - ) + with root_tracer.start_span( + "dbgpt.model.apiserver.generate_relevance", + metadata={ + "model": request.model, + "query": request.query, + }, + ): + scores = await api_server.relevance_generate( + request.model, request.query, request.documents + ) return model_to_dict( RelevanceResponse(data=scores, model=request.model, usage=UsageInfo()), exclude_none=True, @@ -534,6 +574,7 @@ def _initialize_all(controller_addr: str, system_app: SystemApp): def initialize_apiserver( controller_addr: str, + apiserver_params: Optional[ModelAPIServerParameters] = None, app=None, system_app: SystemApp = None, host: str = None, @@ -541,6 +582,10 @@ def initialize_apiserver( api_keys: List[str] = None, embedding_batch_size: Optional[int] = None, ): + import os + + from dbgpt.configs.model_config import LOGDIR + global global_system_app global api_settings embedded_mod = True @@ -552,6 +597,18 @@ def initialize_apiserver( system_app = SystemApp(app) global_system_app = system_app + if apiserver_params: + initialize_tracer( + os.path.join(LOGDIR, apiserver_params.tracer_file), + system_app=system_app, + root_operation_name="DB-GPT-APIServer", + tracer_storage_cls=apiserver_params.tracer_storage_cls, + enable_open_telemetry=apiserver_params.tracer_to_open_telemetry, + otlp_endpoint=apiserver_params.otel_exporter_otlp_traces_endpoint, + otlp_insecure=apiserver_params.otel_exporter_otlp_traces_insecure, + otlp_timeout=apiserver_params.otel_exporter_otlp_traces_timeout, + ) + if api_keys: api_settings.api_keys = api_keys @@ -602,6 +659,7 @@ def run_apiserver(): initialize_apiserver( apiserver_params.controller_addr, + apiserver_params, host=apiserver_params.host, port=apiserver_params.port, api_keys=api_keys, diff --git a/dbgpt/model/cluster/apiserver/tests/test_api.py b/dbgpt/model/cluster/apiserver/tests/test_api.py index 64d880962..dbc2d4cfe 100644 --- a/dbgpt/model/cluster/apiserver/tests/test_api.py +++ b/dbgpt/model/cluster/apiserver/tests/test_api.py @@ -52,7 +52,7 @@ async def client(request, system_app: SystemApp): worker_manager, model_registry = cluster system_app.register(_DefaultWorkerManagerFactory, worker_manager) system_app.register_instance(model_registry) - initialize_apiserver(None, app, system_app, api_keys=api_keys) + initialize_apiserver(None, None, app, system_app, api_keys=api_keys) yield client diff --git a/dbgpt/model/cluster/controller/controller.py b/dbgpt/model/cluster/controller/controller.py index a28e4ccb7..2ff1c7d98 100644 --- a/dbgpt/model/cluster/controller/controller.py +++ b/dbgpt/model/cluster/controller/controller.py @@ -1,4 +1,5 @@ import logging +import os from abc import ABC, abstractmethod from typing import List, Literal, Optional @@ -13,6 +14,7 @@ from dbgpt.util.api_utils import _sync_api_remote as sync_api_remote from dbgpt.util.fastapi import create_app from dbgpt.util.parameter_utils import EnvArgumentParser +from dbgpt.util.tracer.tracer_impl import initialize_tracer, root_tracer from dbgpt.util.utils import setup_http_service_logging, setup_logging logger = logging.getLogger(__name__) @@ -159,7 +161,10 @@ def initialize_controller( host: str = None, port: int = None, registry: Optional[ModelRegistry] = None, + controller_params: Optional[ModelControllerParameters] = None, + system_app: Optional[SystemApp] = None, ): + global controller if remote_controller_addr: controller.backend = _RemoteModelController(remote_controller_addr) @@ -173,8 +178,25 @@ def initialize_controller( else: import uvicorn + from dbgpt.configs.model_config import LOGDIR + setup_http_service_logging() app = create_app() + if not system_app: + system_app = SystemApp(app) + if not controller_params: + raise ValueError("Controller parameters are required.") + initialize_tracer( + os.path.join(LOGDIR, controller_params.tracer_file), + root_operation_name="DB-GPT-ModelController", + system_app=system_app, + tracer_storage_cls=controller_params.tracer_storage_cls, + enable_open_telemetry=controller_params.tracer_to_open_telemetry, + otlp_endpoint=controller_params.otel_exporter_otlp_traces_endpoint, + otlp_insecure=controller_params.otel_exporter_otlp_traces_insecure, + otlp_timeout=controller_params.otel_exporter_otlp_traces_timeout, + ) + app.include_router(router, prefix="/api", tags=["Model"]) uvicorn.run(app, host=host, port=port, log_level="info") @@ -187,13 +209,19 @@ async def api_health_check(): @router.post("/controller/models") async def api_register_instance(request: ModelInstance): - return await controller.register_instance(request) + with root_tracer.start_span( + "dbgpt.model.controller.register_instance", metadata=request.to_dict() + ): + return await controller.register_instance(request) @router.delete("/controller/models") async def api_deregister_instance(model_name: str, host: str, port: int): instance = ModelInstance(model_name=model_name, host=host, port=port) - return await controller.deregister_instance(instance) + with root_tracer.start_span( + "dbgpt.model.controller.deregister_instance", metadata=instance.to_dict() + ): + return await controller.deregister_instance(instance) @router.get("/controller/models") @@ -303,7 +331,10 @@ def run_model_controller(): registry = _create_registry(controller_params) initialize_controller( - host=controller_params.host, port=controller_params.port, registry=registry + host=controller_params.host, + port=controller_params.port, + registry=registry, + controller_params=controller_params, ) diff --git a/dbgpt/model/cluster/worker/default_worker.py b/dbgpt/model/cluster/worker/default_worker.py index 04cd25bf7..ea98837ff 100644 --- a/dbgpt/model/cluster/worker/default_worker.py +++ b/dbgpt/model/cluster/worker/default_worker.py @@ -320,17 +320,16 @@ def _prepare_generate_stream(self, params: Dict, span_operation_name: str): map(lambda m: m.dict(), span_params["messages"]) ) - model_span = root_tracer.start_span( - span_operation_name, - metadata={ - "prompt": str_prompt, - "params": span_params, - "is_async_func": self.support_async(), - "llm_adapter": str(self.llm_adapter), - "generate_stream_func": generate_stream_func_str_name, - "model_context": model_context, - }, - ) + metadata = { + "is_async_func": self.support_async(), + "llm_adapter": str(self.llm_adapter), + "generate_stream_func": generate_stream_func_str_name, + } + metadata.update(span_params) + metadata.update(model_context) + metadata["prompt"] = str_prompt + + model_span = root_tracer.start_span(span_operation_name, metadata=metadata) return params, model_context, generate_stream_func, model_span diff --git a/dbgpt/model/cluster/worker/manager.py b/dbgpt/model/cluster/worker/manager.py index 449f52ac6..32714a303 100644 --- a/dbgpt/model/cluster/worker/manager.py +++ b/dbgpt/model/cluster/worker/manager.py @@ -827,12 +827,18 @@ async def api_model_shutdown(request: WorkerStartupRequest): def _setup_fastapi( - worker_params: ModelWorkerParameters, app=None, ignore_exception: bool = False + worker_params: ModelWorkerParameters, + app=None, + ignore_exception: bool = False, + system_app: Optional[SystemApp] = None, ): if not app: app = create_app() setup_http_service_logging() + if system_app: + system_app._asgi_app = app + if worker_params.standalone: from dbgpt.model.cluster.controller.controller import initialize_controller from dbgpt.model.cluster.controller.controller import ( @@ -848,7 +854,7 @@ def _setup_fastapi( logger.info( f"Run WorkerManager with standalone mode, controller_addr: {worker_params.controller_addr}" ) - initialize_controller(app=app) + initialize_controller(app=app, system_app=system_app) app.include_router(controller_router, prefix="/api") async def startup_event(): @@ -1074,7 +1080,7 @@ def initialize_worker_manager_in_client( worker_params.register = True worker_params.port = local_port logger.info(f"Worker params: {worker_params}") - _setup_fastapi(worker_params, app, ignore_exception=True) + _setup_fastapi(worker_params, app, ignore_exception=True, system_app=system_app) _start_local_worker(worker_manager, worker_params) worker_manager.after_start(start_listener) _start_local_embedding_worker( @@ -1100,7 +1106,9 @@ def initialize_worker_manager_in_client( worker_manager.worker_manager = RemoteWorkerManager(client) worker_manager.after_start(start_listener) initialize_controller( - app=app, remote_controller_addr=worker_params.controller_addr + app=app, + remote_controller_addr=worker_params.controller_addr, + system_app=system_app, ) loop = asyncio.get_event_loop() loop.run_until_complete(worker_manager.start()) @@ -1140,17 +1148,22 @@ def run_worker_manager( embedded_mod = True logger.info(f"Worker params: {worker_params}") + system_app = SystemApp() if not app: # Run worker manager independently embedded_mod = False - app = _setup_fastapi(worker_params) + app = _setup_fastapi(worker_params, system_app=system_app) + system_app._asgi_app = app - system_app = SystemApp(app) initialize_tracer( os.path.join(LOGDIR, worker_params.tracer_file), system_app=system_app, - root_operation_name="DB-GPT-WorkerManager-Entry", + root_operation_name="DB-GPT-ModelWorker", tracer_storage_cls=worker_params.tracer_storage_cls, + enable_open_telemetry=worker_params.tracer_to_open_telemetry, + otlp_endpoint=worker_params.otel_exporter_otlp_traces_endpoint, + otlp_insecure=worker_params.otel_exporter_otlp_traces_insecure, + otlp_timeout=worker_params.otel_exporter_otlp_traces_timeout, ) _start_local_worker(worker_manager, worker_params) diff --git a/dbgpt/model/cluster/worker/remote_worker.py b/dbgpt/model/cluster/worker/remote_worker.py index c25a71ca7..70cbbb02d 100644 --- a/dbgpt/model/cluster/worker/remote_worker.py +++ b/dbgpt/model/cluster/worker/remote_worker.py @@ -5,6 +5,7 @@ from dbgpt.core import ModelMetadata, ModelOutput from dbgpt.model.cluster.worker_base import ModelWorker from dbgpt.model.parameter import ModelParameters +from dbgpt.util.tracer import DBGPT_TRACER_SPAN_ID, root_tracer logger = logging.getLogger(__name__) @@ -57,7 +58,7 @@ async def async_generate_stream(self, params: Dict) -> Iterator[ModelOutput]: async with client.stream( "POST", url, - headers=self.headers, + headers=self._get_trace_headers(), json=params, timeout=self.timeout, ) as response: @@ -84,7 +85,7 @@ async def async_generate(self, params: Dict) -> ModelOutput: logger.debug(f"Send async_generate to url {url}, params: {params}") response = await client.post( url, - headers=self.headers, + headers=self._get_trace_headers(), json=params, timeout=self.timeout, ) @@ -101,7 +102,7 @@ async def async_count_token(self, prompt: str) -> int: logger.debug(f"Send async_count_token to url {url}, params: {prompt}") response = await client.post( url, - headers=self.headers, + headers=self._get_trace_headers(), json={"prompt": prompt}, timeout=self.timeout, ) @@ -118,7 +119,7 @@ async def async_get_model_metadata(self, params: Dict) -> ModelMetadata: ) response = await client.post( url, - headers=self.headers, + headers=self._get_trace_headers(), json=params, timeout=self.timeout, ) @@ -136,7 +137,7 @@ def embeddings(self, params: Dict) -> List[List[float]]: logger.debug(f"Send embeddings to url {url}, params: {params}") response = requests.post( url, - headers=self.headers, + headers=self._get_trace_headers(), json=params, timeout=self.timeout, ) @@ -151,8 +152,14 @@ async def async_embeddings(self, params: Dict) -> List[List[float]]: logger.debug(f"Send async_embeddings to url {url}") response = await client.post( url, - headers=self.headers, + headers=self._get_trace_headers(), json=params, timeout=self.timeout, ) return response.json() + + def _get_trace_headers(self): + span_id = root_tracer.get_current_span_id() + headers = self.headers.copy() + headers.update({DBGPT_TRACER_SPAN_ID: span_id}) + return headers diff --git a/dbgpt/model/parameter.py b/dbgpt/model/parameter.py index d549d8db7..f0a2974c8 100644 --- a/dbgpt/model/parameter.py +++ b/dbgpt/model/parameter.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Dict, Optional, Tuple, Union -from dbgpt.util.parameter_utils import BaseParameters +from dbgpt.util.parameter_utils import BaseParameters, BaseServerParameters class WorkerType(str, Enum): @@ -48,10 +48,7 @@ def parse_worker_key(worker_key: str) -> Tuple[str, str]: @dataclass -class ModelControllerParameters(BaseParameters): - host: Optional[str] = field( - default="0.0.0.0", metadata={"help": "Model Controller deploy host"} - ) +class ModelControllerParameters(BaseServerParameters): port: Optional[int] = field( default=8000, metadata={"help": "Model Controller deploy port"} ) @@ -133,24 +130,6 @@ class ModelControllerParameters(BaseParameters): }, ) - daemon: Optional[bool] = field( - default=False, metadata={"help": "Run Model Controller in background"} - ) - log_level: Optional[str] = field( - default=None, - metadata={ - "help": "Logging level", - "valid_values": [ - "FATAL", - "ERROR", - "WARNING", - "WARNING", - "INFO", - "DEBUG", - "NOTSET", - ], - }, - ) log_file: Optional[str] = field( default="dbgpt_model_controller.log", metadata={ @@ -172,16 +151,10 @@ class ModelControllerParameters(BaseParameters): @dataclass -class ModelAPIServerParameters(BaseParameters): - host: Optional[str] = field( - default="0.0.0.0", metadata={"help": "Model API server deploy host"} - ) +class ModelAPIServerParameters(BaseServerParameters): port: Optional[int] = field( default=8100, metadata={"help": "Model API server deploy port"} ) - daemon: Optional[bool] = field( - default=False, metadata={"help": "Run Model API server in background"} - ) controller_addr: Optional[str] = field( default="http://127.0.0.1:8000", metadata={"help": "The Model controller address to connect"}, @@ -195,21 +168,6 @@ class ModelAPIServerParameters(BaseParameters): default=None, metadata={"help": "Embedding batch size"} ) - log_level: Optional[str] = field( - default=None, - metadata={ - "help": "Logging level", - "valid_values": [ - "FATAL", - "ERROR", - "WARNING", - "WARNING", - "INFO", - "DEBUG", - "NOTSET", - ], - }, - ) log_file: Optional[str] = field( default="dbgpt_model_apiserver.log", metadata={ @@ -237,7 +195,7 @@ class BaseModelParameters(BaseParameters): @dataclass -class ModelWorkerParameters(BaseModelParameters): +class ModelWorkerParameters(BaseServerParameters, BaseModelParameters): worker_type: Optional[str] = field( default=None, metadata={"valid_values": WorkerType.values(), "help": "Worker type"}, @@ -257,16 +215,10 @@ class ModelWorkerParameters(BaseModelParameters): "tags": "fixed", }, ) - host: Optional[str] = field( - default="0.0.0.0", metadata={"help": "Model worker deploy host"} - ) port: Optional[int] = field( default=8001, metadata={"help": "Model worker deploy port"} ) - daemon: Optional[bool] = field( - default=False, metadata={"help": "Run Model Worker in background"} - ) limit_model_concurrency: Optional[int] = field( default=5, metadata={"help": "Model concurrency limit"} ) @@ -280,7 +232,8 @@ class ModelWorkerParameters(BaseModelParameters): worker_register_host: Optional[str] = field( default=None, metadata={ - "help": "The ip address of current worker to register to ModelController. If None, the address is automatically determined" + "help": "The ip address of current worker to register to ModelController. " + "If None, the address is automatically determined" }, ) controller_addr: Optional[str] = field( @@ -293,21 +246,6 @@ class ModelWorkerParameters(BaseModelParameters): default=20, metadata={"help": "The interval for sending heartbeats (seconds)"} ) - log_level: Optional[str] = field( - default=None, - metadata={ - "help": "Logging level", - "valid_values": [ - "FATAL", - "ERROR", - "WARNING", - "WARNING", - "INFO", - "DEBUG", - "NOTSET", - ], - }, - ) log_file: Optional[str] = field( default="dbgpt_model_worker_manager.log", metadata={ diff --git a/dbgpt/rag/embedding/embeddings.py b/dbgpt/rag/embedding/embeddings.py index 851ab1101..97820cc5d 100644 --- a/dbgpt/rag/embedding/embeddings.py +++ b/dbgpt/rag/embedding/embeddings.py @@ -9,6 +9,7 @@ from dbgpt.core import Embeddings from dbgpt.core.awel.flow import Parameter, ResourceCategory, register_resource from dbgpt.util.i18n_utils import _ +from dbgpt.util.tracer import DBGPT_TRACER_SPAN_ID, root_tracer DEFAULT_MODEL_NAME = "sentence-transformers/all-mpnet-base-v2" DEFAULT_INSTRUCT_MODEL = "hkunlp/instructor-large" @@ -655,6 +656,9 @@ class OpenAPIEmbeddings(BaseModel, Embeddings): timeout: int = Field( default=60, description="The timeout for the request in seconds." ) + pass_trace_id: bool = Field( + default=True, description="Whether to pass the trace ID to the API." + ) session: Optional[requests.Session] = None @@ -688,10 +692,15 @@ def embed_documents(self, texts: List[str]) -> List[List[float]]: corresponds to a single input text. """ # Call OpenAI Embedding API + headers = {} + if self.pass_trace_id: + # Set the trace ID if available + headers[DBGPT_TRACER_SPAN_ID] = root_tracer.get_current_span_id() res = self.session.post( # type: ignore self.api_url, json={"input": texts, "model": self.model_name}, timeout=self.timeout, + headers=headers, ) return _handle_request_result(res) @@ -717,6 +726,9 @@ async def aembed_documents(self, texts: List[str]) -> List[List[float]]: List[float] corresponds to a single input text. """ headers = {"Authorization": f"Bearer {self.api_key}"} + if self.pass_trace_id: + # Set the trace ID if available + headers[DBGPT_TRACER_SPAN_ID] = root_tracer.get_current_span_id() async with aiohttp.ClientSession( headers=headers, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as session: diff --git a/dbgpt/rag/embedding/rerank.py b/dbgpt/rag/embedding/rerank.py index dd01b5d14..0c8fb8009 100644 --- a/dbgpt/rag/embedding/rerank.py +++ b/dbgpt/rag/embedding/rerank.py @@ -8,6 +8,7 @@ from dbgpt._private.pydantic import EXTRA_FORBID, BaseModel, ConfigDict, Field from dbgpt.core import RerankEmbeddings +from dbgpt.util.tracer import DBGPT_TRACER_SPAN_ID, root_tracer class CrossEncoderRerankEmbeddings(BaseModel, RerankEmbeddings): @@ -78,6 +79,9 @@ class OpenAPIRerankEmbeddings(BaseModel, RerankEmbeddings): timeout: int = Field( default=60, description="The timeout for the request in seconds." ) + pass_trace_id: bool = Field( + default=True, description="Whether to pass the trace ID to the API." + ) session: Optional[requests.Session] = None @@ -112,9 +116,13 @@ def predict(self, query: str, candidates: List[str]) -> List[float]: """ if not candidates: return [] + headers = {} + if self.pass_trace_id: + # Set the trace ID if available + headers[DBGPT_TRACER_SPAN_ID] = root_tracer.get_current_span_id() data = {"model": self.model_name, "query": query, "documents": candidates} response = self.session.post( # type: ignore - self.api_url, json=data, timeout=self.timeout + self.api_url, json=data, timeout=self.timeout, headers=headers ) response.raise_for_status() return response.json()["data"] @@ -122,6 +130,9 @@ def predict(self, query: str, candidates: List[str]) -> List[float]: async def apredict(self, query: str, candidates: List[str]) -> List[float]: """Predict the rank scores of the candidates asynchronously.""" headers = {"Authorization": f"Bearer {self.api_key}"} + if self.pass_trace_id: + # Set the trace ID if available + headers[DBGPT_TRACER_SPAN_ID] = root_tracer.get_current_span_id() async with aiohttp.ClientSession( headers=headers, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as session: diff --git a/dbgpt/rag/retriever/db_schema.py b/dbgpt/rag/retriever/db_schema.py index 3c2b1d079..0e9922f70 100644 --- a/dbgpt/rag/retriever/db_schema.py +++ b/dbgpt/rag/retriever/db_schema.py @@ -1,4 +1,5 @@ """DBSchema retriever.""" + from functools import reduce from typing import List, Optional, cast @@ -10,6 +11,8 @@ from dbgpt.rag.summary.rdbms_db_summary import _parse_db_summary from dbgpt.storage.vector_store.filters import MetadataFilters from dbgpt.util.chat_util import run_async_tasks +from dbgpt.util.executor_utils import blocking_func_to_async_no_executor +from dbgpt.util.tracer import root_tracer class DBSchemaRetriever(BaseRetriever): @@ -155,7 +158,12 @@ async def _aretrieve( """ if self._need_embeddings: queries = [query] - candidates = [self._similarity_search(query, filters) for query in queries] + candidates = [ + self._similarity_search( + query, filters, root_tracer.get_current_span_id() + ) + for query in queries + ] result_candidates = await run_async_tasks( tasks=candidates, concurrency_limit=1 ) @@ -166,7 +174,8 @@ async def _aretrieve( ) table_summaries = await run_async_tasks( - tasks=[self._aparse_db_summary()], concurrency_limit=1 + tasks=[self._aparse_db_summary(root_tracer.get_current_span_id())], + concurrency_limit=1, ) return [Chunk(content=table_summary) for table_summary in table_summaries] @@ -186,15 +195,33 @@ async def _aretrieve_with_score( return await self._aretrieve(query, filters) async def _similarity_search( - self, query, filters: Optional[MetadataFilters] = None + self, + query, + filters: Optional[MetadataFilters] = None, + parent_span_id: Optional[str] = None, ) -> List[Chunk]: """Similar search.""" - return self._index_store.similar_search(query, self._top_k, filters) + with root_tracer.start_span( + "dbgpt.rag.retriever.db_schema._similarity_search", + parent_span_id, + metadata={"query": query}, + ): + return await blocking_func_to_async_no_executor( + self._index_store.similar_search, query, self._top_k, filters + ) - async def _aparse_db_summary(self) -> List[str]: + async def _aparse_db_summary( + self, parent_span_id: Optional[str] = None + ) -> List[str]: """Similar search.""" from dbgpt.rag.summary.rdbms_db_summary import _parse_db_summary if not self._connector: raise RuntimeError("RDBMSConnector connection is required.") - return _parse_db_summary(self._connector) + with root_tracer.start_span( + "dbgpt.rag.retriever.db_schema._aparse_db_summary", + parent_span_id, + ): + return await blocking_func_to_async_no_executor( + _parse_db_summary, self._connector + ) diff --git a/dbgpt/rag/retriever/embedding.py b/dbgpt/rag/retriever/embedding.py index ddd161e17..f55480840 100644 --- a/dbgpt/rag/retriever/embedding.py +++ b/dbgpt/rag/retriever/embedding.py @@ -10,6 +10,7 @@ from dbgpt.rag.retriever.rewrite import QueryRewrite from dbgpt.storage.vector_store.filters import MetadataFilters from dbgpt.util.chat_util import run_async_tasks +from dbgpt.util.executor_utils import blocking_func_to_async_no_executor from dbgpt.util.tracer import root_tracer @@ -140,7 +141,10 @@ async def _aretrieve( queries = [query] if self._query_rewrite: candidates_tasks = [ - self._similarity_search(query, filters) for query in queries + self._similarity_search( + query, filters, root_tracer.get_current_span_id() + ) + for query in queries ] chunks = await self._run_async_tasks(candidates_tasks) context = "\n".join([chunk.content for chunk in chunks]) @@ -148,7 +152,10 @@ async def _aretrieve( origin_query=query, context=context, nums=1 ) queries.extend(new_queries) - candidates = [self._similarity_search(query, filters) for query in queries] + candidates = [ + self._similarity_search(query, filters, root_tracer.get_current_span_id()) + for query in queries + ] new_candidates = await run_async_tasks(tasks=candidates, concurrency_limit=1) return new_candidates @@ -170,16 +177,19 @@ async def _aretrieve_with_score( queries = [query] if self._query_rewrite: with root_tracer.start_span( - "EmbeddingRetriever.query_rewrite.similarity_search", + "dbgpt.rag.retriever.embeddings.query_rewrite.similarity_search", metadata={"query": query, "score_threshold": score_threshold}, ): candidates_tasks = [ - self._similarity_search(query, filters) for query in queries + self._similarity_search( + query, filters, root_tracer.get_current_span_id() + ) + for query in queries ] chunks = await self._run_async_tasks(candidates_tasks) context = "\n".join([chunk.content for chunk in chunks]) with root_tracer.start_span( - "EmbeddingRetriever.query_rewrite.rewrite", + "dbgpt.rag.retriever.embeddings.query_rewrite.rewrite", metadata={"query": query, "context": context, "nums": 1}, ): new_queries = await self._query_rewrite.rewrite( @@ -188,11 +198,13 @@ async def _aretrieve_with_score( queries.extend(new_queries) with root_tracer.start_span( - "EmbeddingRetriever.similarity_search_with_score", + "dbgpt.rag.retriever.embeddings.similarity_search_with_score", metadata={"query": query, "score_threshold": score_threshold}, ): candidates_with_score = [ - self._similarity_search_with_score(query, score_threshold, filters) + self._similarity_search_with_score( + query, score_threshold, filters, root_tracer.get_current_span_id() + ) for query in queries ] res_candidates_with_score = await run_async_tasks( @@ -203,7 +215,7 @@ async def _aretrieve_with_score( ) with root_tracer.start_span( - "EmbeddingRetriever.rerank", + "dbgpt.rag.retriever.embeddings.rerank", metadata={ "query": query, "score_threshold": score_threshold, @@ -216,10 +228,22 @@ async def _aretrieve_with_score( return new_candidates_with_score async def _similarity_search( - self, query, filters: Optional[MetadataFilters] = None + self, + query, + filters: Optional[MetadataFilters] = None, + parent_span_id: Optional[str] = None, ) -> List[Chunk]: """Similar search.""" - return self._index_store.similar_search(query, self._top_k, filters) + with root_tracer.start_span( + "dbgpt.rag.retriever.embeddings.similarity_search", + parent_span_id, + metadata={ + "query": query, + }, + ): + return await blocking_func_to_async_no_executor( + self._index_store.similar_search, query, self._top_k, filters + ) async def _run_async_tasks(self, tasks) -> List[Chunk]: """Run async tasks.""" @@ -228,9 +252,25 @@ async def _run_async_tasks(self, tasks) -> List[Chunk]: return cast(List[Chunk], candidates) async def _similarity_search_with_score( - self, query, score_threshold, filters: Optional[MetadataFilters] = None + self, + query, + score_threshold, + filters: Optional[MetadataFilters] = None, + parent_span_id: Optional[str] = None, ) -> List[Chunk]: """Similar search with score.""" - return await self._index_store.asimilar_search_with_scores( - query, self._top_k, score_threshold, filters - ) + with root_tracer.start_span( + "dbgpt.rag.retriever.embeddings._do_similarity_search_with_score", + parent_span_id, + metadata={ + "query": query, + "score_threshold": score_threshold, + }, + ): + return await blocking_func_to_async_no_executor( + self._index_store.similar_search_with_scores, + query, + self._top_k, + score_threshold, + filters, + ) diff --git a/dbgpt/rag/retriever/rerank.py b/dbgpt/rag/retriever/rerank.py index a328cae33..3b15120d2 100644 --- a/dbgpt/rag/retriever/rerank.py +++ b/dbgpt/rag/retriever/rerank.py @@ -1,11 +1,11 @@ """Rerank module for RAG retriever.""" -import asyncio from abc import ABC, abstractmethod from typing import Callable, List, Optional from dbgpt.core import Chunk, RerankEmbeddings from dbgpt.core.awel.flow import Parameter, ResourceCategory, register_resource +from dbgpt.util.executor_utils import blocking_func_to_async_no_executor from dbgpt.util.i18n_utils import _ RANK_FUNC = Callable[[List[Chunk]], List[Chunk]] @@ -54,8 +54,8 @@ async def arank( Return: List[Chunk] """ - return await asyncio.get_running_loop().run_in_executor( - None, self.rank, candidates_with_scores, query + return await blocking_func_to_async_no_executor( + self.rank, candidates_with_scores, query ) def _filter(self, candidates_with_scores: List) -> List[Chunk]: diff --git a/dbgpt/serve/agent/agents/controller.py b/dbgpt/serve/agent/agents/controller.py index 88809d46f..1e0464b4f 100644 --- a/dbgpt/serve/agent/agents/controller.py +++ b/dbgpt/serve/agent/agents/controller.py @@ -29,6 +29,7 @@ from dbgpt.serve.agent.model import PagenationFilter, PluginHubFilter from dbgpt.serve.conversation.serve import Serve as ConversationServe from dbgpt.util.json_utils import serialize +from dbgpt.util.tracer import root_tracer from ..db.gpts_app import GptsApp, GptsAppDao, GptsAppQuery from ..db.gpts_conversations_db import GptsConversationsDao, GptsConversationsEntity @@ -158,7 +159,12 @@ async def agent_chat( task = asyncio.create_task( multi_agents.agent_team_chat_new( - user_query, agent_conv_id, gpt_app, is_retry_chat, agent_memory + user_query, + agent_conv_id, + gpt_app, + is_retry_chat, + agent_memory, + span_id=root_tracer.get_current_span_id(), ) ) @@ -241,6 +247,7 @@ async def agent_team_chat_new( gpts_app: GptsApp, is_retry_chat: bool = False, agent_memory: Optional[AgentMemory] = None, + span_id: Optional[str] = None, ): employees: List[Agent] = [] rm = get_resource_manager() @@ -304,10 +311,13 @@ async def agent_team_chat_new( self.gpts_conversations.update(conv_uid, Status.RUNNING.value) try: - await user_proxy.initiate_chat( - recipient=recipient, - message=user_query, - ) + with root_tracer.start_span( + "dbgpt.serve.agent.run_agent", parent_span_id=span_id + ): + await user_proxy.initiate_chat( + recipient=recipient, + message=user_query, + ) except Exception as e: logger.error(f"chat abnormal termination!{str(e)}", e) self.gpts_conversations.update(conv_uid, Status.FAILED.value) diff --git a/dbgpt/util/executor_utils.py b/dbgpt/util/executor_utils.py index d530967b4..464678bed 100644 --- a/dbgpt/util/executor_utils.py +++ b/dbgpt/util/executor_utils.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from concurrent.futures import Executor, ThreadPoolExecutor from functools import partial -from typing import Any, Awaitable, Callable +from typing import Any, Callable, Optional from dbgpt.component import BaseComponent, ComponentType, SystemApp @@ -67,6 +67,11 @@ def run_with_context(): return await loop.run_in_executor(executor, run_with_context) +async def blocking_func_to_async_no_executor(func: BlockingFunction, *args, **kwargs): + """Run a potentially blocking function within an executor.""" + return await blocking_func_to_async(None, func, *args, **kwargs) # type: ignore + + class AsyncToSyncIterator: def __init__(self, async_iterable, loop: asyncio.BaseEventLoop): self.async_iterable = async_iterable diff --git a/dbgpt/util/parameter_utils.py b/dbgpt/util/parameter_utils.py index 3a4f193e8..77cd6d86d 100644 --- a/dbgpt/util/parameter_utils.py +++ b/dbgpt/util/parameter_utils.py @@ -108,6 +108,99 @@ def to_dict(self) -> Dict[str, Any]: return asdict(self) +@dataclass +class BaseServerParameters(BaseParameters): + host: Optional[str] = field( + default="0.0.0.0", metadata={"help": "The host IP address to bind to."} + ) + port: Optional[int] = field( + default=None, metadata={"help": "The port number to bind to."} + ) + daemon: Optional[bool] = field( + default=False, metadata={"help": "Run the server as a daemon."} + ) + log_level: Optional[str] = field( + default=None, + metadata={ + "help": "Logging level", + "valid_values": [ + "FATAL", + "ERROR", + "WARNING", + "WARNING", + "INFO", + "DEBUG", + "NOTSET", + ], + }, + ) + log_file: Optional[str] = field( + default=None, + metadata={ + "help": "The filename to store log", + }, + ) + tracer_file: Optional[str] = field( + default=None, + metadata={ + "help": "The filename to store tracer span records", + }, + ) + tracer_to_open_telemetry: Optional[bool] = field( + default=os.getenv("TRACER_TO_OPEN_TELEMETRY", "False").lower() == "true", + metadata={ + "help": "Whether send tracer span records to OpenTelemetry", + }, + ) + otel_exporter_otlp_traces_endpoint: Optional[str] = field( + default=None, + metadata={ + "help": "`OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` target to which the span " + "exporter is going to send spans. The endpoint MUST be a valid URL host, " + "and MAY contain a scheme (http or https), port and path. A scheme of https" + " indicates a secure connection and takes precedence over this " + "configuration setting.", + }, + ) + otel_exporter_otlp_traces_insecure: Optional[bool] = field( + default=None, + metadata={ + "help": "OTEL_EXPORTER_OTLP_TRACES_INSECURE` represents whether to enable " + "client transport security for gRPC requests for spans. A scheme of https " + "takes precedence over the this configuration setting. Default: False" + }, + ) + otel_exporter_otlp_traces_certificate: Optional[str] = field( + default=None, + metadata={ + "help": "`OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE` stores the path to the " + "certificate file for TLS credentials of gRPC client for traces. " + "Should only be used for a secure connection for tracing", + }, + ) + otel_exporter_otlp_traces_headers: Optional[str] = field( + default=None, + metadata={ + "help": "`OTEL_EXPORTER_OTLP_TRACES_HEADERS` contains the key-value pairs " + "to be used as headers for spans associated with gRPC or HTTP requests.", + }, + ) + otel_exporter_otlp_traces_timeout: Optional[int] = field( + default=None, + metadata={ + "help": "`OTEL_EXPORTER_OTLP_TRACES_TIMEOUT` is the maximum time the OTLP " + "exporter will wait for each batch export for spans.", + }, + ) + otel_exporter_otlp_traces_compression: Optional[str] = field( + default=None, + metadata={ + "help": "`OTEL_EXPORTER_OTLP_COMPRESSION` but only for the span exporter. " + "If both are present, this takes higher precedence.", + }, + ) + + def _get_dataclass_print_str(obj): class_name = obj.__class__.__name__ parameters = [ diff --git a/dbgpt/util/tracer/__init__.py b/dbgpt/util/tracer/__init__.py index 25cbe372e..6a0688bee 100644 --- a/dbgpt/util/tracer/__init__.py +++ b/dbgpt/util/tracer/__init__.py @@ -1,4 +1,5 @@ from dbgpt.util.tracer.base import ( + DBGPT_TRACER_SPAN_ID, Span, SpanStorage, SpanStorageType, @@ -28,6 +29,7 @@ "SpanStorage", "SpanStorageType", "TracerContext", + "DBGPT_TRACER_SPAN_ID", "MemorySpanStorage", "FileSpanStorage", "SpanStorageContainer", diff --git a/dbgpt/util/tracer/base.py b/dbgpt/util/tracer/base.py index 5ef3a0318..c3bfe756e 100644 --- a/dbgpt/util/tracer/base.py +++ b/dbgpt/util/tracer/base.py @@ -1,15 +1,24 @@ from __future__ import annotations import json +import secrets import uuid from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from dbgpt.component import BaseComponent, ComponentType, SystemApp +DBGPT_TRACER_SPAN_ID = "DB-GPT-Trace-Span-Id" + +# Compatibility with OpenTelemetry API +_TRACE_ID_MAX_VALUE = 2**128 - 1 +_SPAN_ID_MAX_VALUE = 2**64 - 1 +INVALID_SPAN_ID = 0x0000000000000000 +INVALID_TRACE_ID = 0x00000000000000000000000000000000 + class SpanType(str, Enum): BASE = "base" @@ -60,7 +69,7 @@ def __init__( # Timestamp when this span ended, initially None self.end_time = None # Additional metadata associated with the span - self.metadata = metadata + self.metadata = metadata or {} self._end_callers = [] if end_caller: self._end_callers.append(end_caller) @@ -91,13 +100,17 @@ def to_dict(self) -> Dict: "span_id": self.span_id, "parent_span_id": self.parent_span_id, "operation_name": self.operation_name, - "start_time": None - if not self.start_time - else self.start_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], - "end_time": None - if not self.end_time - else self.end_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], - "metadata": _clean_for_json(self.metadata), + "start_time": ( + None + if not self.start_time + else self.start_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + ), + "end_time": ( + None + if not self.end_time + else self.end_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + ), + "metadata": _clean_for_json(self.metadata) if self.metadata else None, } def copy(self) -> Span: @@ -200,6 +213,60 @@ def _new_uuid(self) -> str: """ return str(uuid.uuid4()) + def _new_random_trace_id(self) -> str: + """Create a new random trace ID.""" + + return _new_random_trace_id() + + def _new_random_span_id(self) -> str: + """Create a new random span ID.""" + + return _new_random_span_id() + + +def _new_random_trace_id() -> str: + """Create a new random trace ID.""" + # Generate a 128-bit hex string + return secrets.token_hex(16) + + +def _is_valid_trace_id(trace_id: Union[str, int]) -> bool: + if isinstance(trace_id, str): + try: + trace_id = int(trace_id, 16) + except ValueError: + return False + return INVALID_TRACE_ID < int(trace_id) <= _TRACE_ID_MAX_VALUE + + +def _new_random_span_id() -> str: + """Create a new random span ID.""" + + # Generate a 64-bit hex string + return secrets.token_hex(8) + + +def _is_valid_span_id(span_id: Union[str, int]) -> bool: + if isinstance(span_id, str): + try: + span_id = int(span_id, 16) + except ValueError: + return False + return INVALID_SPAN_ID < int(span_id) <= _SPAN_ID_MAX_VALUE + + +def _split_span_id(span_id: str) -> Tuple[int, int]: + parent_span_id_parts = span_id.split(":") + if len(parent_span_id_parts) != 2: + return 0, 0 + trace_id, parent_span_id = parent_span_id_parts + try: + trace_id = int(trace_id, 16) + span_id = int(parent_span_id, 16) + return trace_id, span_id + except ValueError: + return 0, 0 + @dataclass class TracerContext: @@ -240,3 +307,28 @@ def _clean_for_json(data: Optional[str, Any] = None): return data except TypeError: return None + + +def _parse_span_id(body: Any) -> Optional[str]: + from starlette.requests import Request + + from dbgpt._private.pydantic import BaseModel, model_to_dict + + span_id: Optional[str] = None + if isinstance(body, Request): + span_id = body.headers.get(DBGPT_TRACER_SPAN_ID) + elif isinstance(body, dict): + span_id = body.get(DBGPT_TRACER_SPAN_ID) or body.get("span_id") + elif isinstance(body, BaseModel): + dict_body = model_to_dict(body) + span_id = dict_body.get(DBGPT_TRACER_SPAN_ID) or dict_body.get("span_id") + if not span_id: + return None + else: + int_trace_id, int_span_id = _split_span_id(span_id) + if not int_trace_id: + return None + if _is_valid_span_id(int_span_id) and _is_valid_trace_id(int_trace_id): + return span_id + else: + return span_id diff --git a/dbgpt/util/tracer/opentelemetry.py b/dbgpt/util/tracer/opentelemetry.py new file mode 100644 index 000000000..b46633eeb --- /dev/null +++ b/dbgpt/util/tracer/opentelemetry.py @@ -0,0 +1,122 @@ +from typing import Dict, List, Optional + +from .base import Span, SpanStorage, _split_span_id + +try: + from opentelemetry import trace + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import Span as OTSpan + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.trace import SpanContext, SpanKind +except ImportError: + raise ImportError( + "To use OpenTelemetrySpanStorage, you must install opentelemetry-api, " + "opentelemetry-sdk and opentelemetry-exporter-otlp." + "You can install it via `pip install opentelemetry-api opentelemetry-sdk " + "opentelemetry-exporter-otlp`" + ) + + +class OpenTelemetrySpanStorage(SpanStorage): + """OpenTelemetry span storage.""" + + def __init__( + self, + service_name: str, + otlp_endpoint: Optional[str] = None, + otlp_insecure: Optional[bool] = None, + otlp_timeout: Optional[int] = None, + ): + super().__init__() + self.service_name = service_name + + resource = Resource(attributes={"service.name": service_name}) + self.tracer_provider = TracerProvider(resource=resource) + self.tracer = self.tracer_provider.get_tracer(__name__) + # Store the spans that have not ended + self.spans: Dict[str, OTSpan] = {} + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=otlp_insecure, + timeout=otlp_timeout, + ) + span_processor = BatchSpanProcessor(otlp_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace.set_tracer_provider(self.tracer_provider) + + def append_span(self, span: Span): + span_id = span.span_id + + if span_id in self.spans: + otel_span = self.spans.pop(span_id) + # Update the end time and attributes of the span + end_time = int(span.end_time.timestamp() * 1e9) if span.end_time else None + if span.metadata: + for key, value in span.metadata.items(): + if isinstance(value, (bool, str, bytes, int, float)) or ( + isinstance(value, list) + and all( + isinstance(i, (bool, str, bytes, int, float)) for i in value + ) + ): + otel_span.set_attribute(key, value) + if end_time: + otel_span.end(end_time=end_time) + else: + otel_span.end() + else: + parent_context = self._create_parent_context(span) + # Datetime -> int + start_time = int(span.start_time.timestamp() * 1e9) + + otel_span = self.tracer.start_span( + span.operation_name, + context=parent_context, + kind=SpanKind.INTERNAL, + start_time=start_time, + ) + + otel_span.set_attribute("dbgpt_trace_id", span.trace_id) + otel_span.set_attribute("dbgpt_span_id", span.span_id) + + if span.parent_span_id: + otel_span.set_attribute("dbgpt_parent_span_id", span.parent_span_id) + + otel_span.set_attribute("span_type", span.span_type.value) + if span.metadata: + for key, value in span.metadata.items(): + if isinstance(value, (bool, str, bytes, int, float)) or ( + isinstance(value, list) + and all( + isinstance(i, (bool, str, bytes, int, float)) for i in value + ) + ): + otel_span.set_attribute(key, value) + + if not span.end_time: + self.spans[span_id] = otel_span + + def append_span_batch(self, spans: List[Span]): + for span in spans: + self.append_span(span) + + def _create_parent_context(self, span: Span): + if not span.parent_span_id: + return trace.set_span_in_context(trace.INVALID_SPAN) + + trace_id, parent_span_id = _split_span_id(span.parent_span_id) + if not trace_id: + return trace.set_span_in_context(trace.INVALID_SPAN) + + span_context = SpanContext( + trace_id=trace_id, + span_id=parent_span_id, + is_remote=True, + trace_flags=trace.TraceFlags(0x01), # Default: SAMPLED + ) + return trace.set_span_in_context(trace.NonRecordingSpan(span_context)) + + def close(self): + self.tracer_provider.shutdown() diff --git a/dbgpt/util/tracer/tracer_cli.py b/dbgpt/util/tracer/tracer_cli.py index 40fdf87e3..eebc2b8ca 100644 --- a/dbgpt/util/tracer/tracer_cli.py +++ b/dbgpt/util/tracer/tracer_cli.py @@ -249,7 +249,7 @@ def chat( for sp in spans: span_type = sp["span_type"] metadata = sp.get("metadata") - if span_type == SpanType.RUN: + if span_type == SpanType.RUN and metadata and "run_service" in metadata: service_name = metadata["run_service"] service_spans[service_name] = sp.copy() if set(service_spans.keys()) == service_names and found_trace_id: diff --git a/dbgpt/util/tracer/tracer_impl.py b/dbgpt/util/tracer/tracer_impl.py index 152bbaef7..17f808695 100644 --- a/dbgpt/util/tracer/tracer_impl.py +++ b/dbgpt/util/tracer/tracer_impl.py @@ -3,7 +3,7 @@ import logging from contextvars import ContextVar from functools import wraps -from typing import Dict, Optional +from typing import Any, AsyncIterator, Dict, Optional from dbgpt.component import ComponentType, SystemApp from dbgpt.util.module_utils import import_from_checked_string @@ -46,9 +46,12 @@ def start_span( metadata: Dict = None, ) -> Span: trace_id = ( - self._new_uuid() if parent_span_id is None else parent_span_id.split(":")[0] + self._new_random_trace_id() + if parent_span_id is None + else parent_span_id.split(":")[0] ) - span_id = f"{trace_id}:{self._new_uuid()}" + span_id = f"{trace_id}:{self._new_random_span_id()}" + span = Span( trace_id, span_id, @@ -164,6 +167,33 @@ def _get_current_span_type(self) -> Optional[SpanType]: current_span = self.get_current_span() return current_span.span_type if current_span else None + def _parse_span_id(self, body: Any) -> Optional[str]: + from .base import _parse_span_id + + return _parse_span_id(body) + + def wrapper_async_stream( + self, + generator: AsyncIterator[Any], + operation_name: str, + parent_span_id: str = None, + span_type: SpanType = None, + metadata: Dict = None, + ) -> AsyncIterator[Any]: + """Wrap an async generator with a span""" + + parent_span_id = parent_span_id or self.get_current_span_id() + + async def wrapper(): + span = self.start_span(operation_name, parent_span_id, span_type, metadata) + try: + async for item in generator: + yield item + finally: + span.end() + + return wrapper() + root_tracer: TracerManager = TracerManager() @@ -206,10 +236,14 @@ def _parse_operation_name(func, *args): def initialize_tracer( tracer_filename: str, - root_operation_name: str = "DB-GPT-Web-Entry", + root_operation_name: str = "DB-GPT-Webserver", system_app: Optional[SystemApp] = None, tracer_storage_cls: Optional[str] = None, create_system_app: bool = False, + enable_open_telemetry: bool = False, + otlp_endpoint: Optional[str] = None, + otlp_insecure: Optional[bool] = None, + otlp_timeout: Optional[int] = None, ): """Initialize the tracer with the given filename and system app.""" from dbgpt.util.tracer.span_storage import FileSpanStorage, SpanStorageContainer @@ -227,6 +261,17 @@ def initialize_tracer( storage_container = SpanStorageContainer(system_app) storage_container.append_storage(FileSpanStorage(tracer_filename)) + if enable_open_telemetry: + from dbgpt.util.tracer.opentelemetry import OpenTelemetrySpanStorage + + storage_container.append_storage( + OpenTelemetrySpanStorage( + service_name=root_operation_name, + otlp_endpoint=otlp_endpoint, + otlp_insecure=otlp_insecure, + otlp_timeout=otlp_timeout, + ) + ) if tracer_storage_cls: logger.info(f"Begin parse storage class {tracer_storage_cls}") diff --git a/dbgpt/util/tracer/tracer_middleware.py b/dbgpt/util/tracer/tracer_middleware.py index 6e0d35222..33bcfb41f 100644 --- a/dbgpt/util/tracer/tracer_middleware.py +++ b/dbgpt/util/tracer/tracer_middleware.py @@ -1,4 +1,4 @@ -import uuid +import logging from contextvars import ContextVar from starlette.middleware.base import BaseHTTPMiddleware @@ -7,7 +7,11 @@ from dbgpt.util.tracer import Tracer, TracerContext -_DEFAULT_EXCLUDE_PATHS = ["/api/controller/heartbeat"] +from .base import _parse_span_id + +_DEFAULT_EXCLUDE_PATHS = ["/api/controller/heartbeat", "/api/health"] + +logger = logging.getLogger(__name__) class TraceIDMiddleware(BaseHTTPMiddleware): @@ -33,11 +37,12 @@ async def dispatch(self, request: Request, call_next): ): return await call_next(request) - span_id = request.headers.get("DBGPT_TRACER_SPAN_ID") - # if not span_id: - # span_id = str(uuid.uuid4()) - # self.trace_context_var.set(TracerContext(span_id=span_id)) - + # Read trace_id from request headers + span_id = _parse_span_id(request) + logger.debug( + f"TraceIDMiddleware: span_id={span_id}, path={request.url.path}, " + f"headers={request.headers}" + ) with self.tracer.start_span( self.root_operation_name, span_id, metadata={"path": request.url.path} ): diff --git a/docker/compose_examples/observability/docker-compose.yml b/docker/compose_examples/observability/docker-compose.yml new file mode 100644 index 000000000..d0aa985b4 --- /dev/null +++ b/docker/compose_examples/observability/docker-compose.yml @@ -0,0 +1,113 @@ +# An example of using docker-compose to start a cluster with observability enabled. +# For simplicity, we use chatgpt_proxyllm as the model for the worker, and we build a new docker image named eosphorosai/dbgpt-openai:latest. +# How to build the image: +# run `bash ./docker/base/build_proxy_image.sh` in the root directory of the project. +# If you want to use other pip index url, you can run command with `--pip-index-url` option. +# For example, `bash ./docker/base/build_proxy_image.sh --pip-index-url https://pypi.tuna.tsinghua.edu.cn/simple` +# +# How to start the cluster: +# 1. run `cd docker/compose_examples/observability` +# 2. run `OPENAI_API_KEY="{your api key}" OPENAI_API_BASE="https://api.openai.com/v1" docker compose up -d` +# Note: Make sure you have set the environment variables OPENAI_API_KEY. +version: '3.10' + +services: + jaeger: + image: jaegertracing/all-in-one:1.58 + restart: unless-stopped + networks: + - dbgptnet + ports: + # serve frontend + - "16686:16686" + # accept jaeger.thrift over Thrift-compact protocol (used by most SDKs) + - "6831:6831" + # accept OpenTelemetry Protocol (OTLP) over HTTP + - "4318:4318" + # accept OpenTelemetry Protocol (OTLP) over gRPC + - "4317:4317" + - "14268:14268" + environment: + - LOG_LEVEL=debug + - SPAN_STORAGE_TYPE=badger + - BADGER_EPHEMERAL=false + - BADGER_DIRECTORY_VALUE=/badger/data + - BADGER_DIRECTORY_KEY=/badger/key + volumes: + # Set the uid and gid to the same as the user in the container + - jaeger-badger:/badger:uid=10001,gid=10001 + user: root + controller: + image: eosphorosai/dbgpt-openai:latest + command: dbgpt start controller + restart: unless-stopped + environment: + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + volumes: + - ../../../:/app + networks: + - dbgptnet + llm-worker: + image: eosphorosai/dbgpt-openai:latest + command: dbgpt start worker --model_type proxy --model_name chatgpt_proxyllm --model_path chatgpt_proxyllm --proxy_server_url ${OPENAI_API_BASE}/chat/completions --proxy_api_key ${OPENAI_API_KEY} --controller_addr http://controller:8000 + environment: + # Your real openai model name, e.g. gpt-3.5-turbo, gpt-4o + - PROXYLLM_BACKEND=gpt-3.5-turbo + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + depends_on: + - controller + volumes: + - ../../../:/app + restart: unless-stopped + networks: + - dbgptnet + ipc: host + embedding-worker: + image: eosphorosai/dbgpt-openai:latest + command: dbgpt start worker --worker_type text2vec --model_name proxy_http_openapi --model_path proxy_http_openapi --proxy_server_url ${OPENAI_API_BASE}/embeddings --proxy_api_key ${OPENAI_API_KEY} --controller_addr http://controller:8000 + environment: + - proxy_http_openapi_proxy_backend=text-embedding-3-small + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + depends_on: + - controller + volumes: + - ../../../:/app + restart: unless-stopped + networks: + - dbgptnet + ipc: host + webserver: + image: eosphorosai/dbgpt-openai:latest + command: dbgpt start webserver --light --remote_embedding --controller_addr http://controller:8000 + environment: + - LLM_MODEL=chatgpt_proxyllm + - EMBEDDING_MODEL=proxy_http_openapi + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + depends_on: + - controller + - llm-worker + - embedding-worker + volumes: + - ../../../:/app + - dbgpt-data:/app/pilot/data + - dbgpt-message:/app/pilot/message + ports: + - 5670:5670/tcp + restart: unless-stopped + networks: + - dbgptnet +volumes: + dbgpt-data: + dbgpt-message: + jaeger-badger: +networks: + dbgptnet: + driver: bridge + name: dbgptnet \ No newline at end of file diff --git a/docs/docs/application/advanced_tutorial/debugging.md b/docs/docs/application/advanced_tutorial/debugging.md index 1a4cc1317..263097acd 100644 --- a/docs/docs/application/advanced_tutorial/debugging.md +++ b/docs/docs/application/advanced_tutorial/debugging.md @@ -1,7 +1,8 @@ # Debugging DB-GPT provides a series of tools to help developers troubleshoot and solve some problems they may encounter. -## Trace log +## View Trace Logs With Command + DB-GPT writes some key system runtime information to trace logs. By default, they are located in `logs/dbgpt*.jsonl`. diff --git a/docs/docs/application/advanced_tutorial/observability.md b/docs/docs/application/advanced_tutorial/observability.md new file mode 100644 index 000000000..9c929828a --- /dev/null +++ b/docs/docs/application/advanced_tutorial/observability.md @@ -0,0 +1,247 @@ +# Observability + +**Observability** is a measure of how well internal states of a system can be inferred from +knowledge of its external outputs. In the context of a software system, observability +is the ability to understand the internal state of the system by examining its outputs. +This is important for debugging, monitoring, and maintaining the system. + + +## Observability In DB-GPT + +DB-GPT provides observability through the following mechanisms: +- **Logging**: DB-GPT logs various events and metrics to help you understand the internal state of the system. +- **Tracing**: DB-GPT provides tracing capabilities to help you understand the flow of requests through the system. + +## Logging + +You can configure the logging level and storage location for DB-GPT logs. By default, +logs are stored in the `logs` directory in the DB-GPT root directory. You can change +the log level and storage location by setting the `DBGPT_LOG_LEVEL` and `DBGPT_LOG_DIR` environment. + + +## Tracing + +DB-GPT has built-in tracing capabilities that allow you to trace the flow of requests +through the system. + + +## Trace Storage + +### Local Storage + +DB-GPT will store traces in the `traces` directory in the DB-GPT logs directory, by default, +they are located in `logs/dbgpt*.jsonl`. + +If you want to know more about the local storage of traces and how to use them, you +can refer to the [Debugging](./debugging) documentation. + + +### OpenTelemetry Support + +DB-GPT also supports [OpenTelemetry](https://opentelemetry.io/) for distributed tracing. +Now, you can export traces to open-telemetry compatible backends like Jaeger, Zipkin, +and others with OpenTelemetry Protocol (OTLP). + +To enable OpenTelemetry support, you need install following packages: + +```bash +pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp +``` + +Then, modify your `.env` file to enable OpenTelemetry tracing: + +```bash +## Whether to enable DB-GPT send trace to OpenTelemetry +TRACER_TO_OPEN_TELEMETRY=True +## More details see https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4317 +``` +In the above configuration, you can change the `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to +your OTLP collector or backend, we use gRPC endpoint by default. + +Here, we use Jaeger as an example to show how to use OpenTelemetry to trace DB-GPT. + +### Jaeger Support + +Here is an example of how to use Jaeger to trace DB-GPT with docker: + +Run the Jaeger all-in-one image: + +```bash +docker run --rm --name jaeger \ + -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + -p 14250:14250 \ + -p 14268:14268 \ + -p 14269:14269 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.58 +``` +Then, modify your `.env` file to enable OpenTelemetry tracing like above. + +```bash +TRACER_TO_OPEN_TELEMETRY=True +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4317 +``` + +Start the DB-GPT server: + +```bash +dbgpt start webserver +``` + +Now, you can access the Jaeger UI at `http://localhost:16686` to view the traces. + +Here are some examples of screenshot of Jaeger UI: + +**Search Traces Page** +
+ +
+ +**Show Normal Conversation Trace** + ++ +
+ +**Show Conversation Detail Tags** + ++ +
+ +**Show Agent Conversation Trace** + ++ +
+ +**Show Trace In Cluster** + +### Jaeger Support With Docker Compose + +If you want to use docker-compose to start DB-GPT and Jaeger, you can use the following +`docker-compose.yml` file: + +```yaml +# An example of using docker-compose to start a cluster with observability enabled. +version: '3.10' + +services: + jaeger: + image: jaegertracing/all-in-one:1.58 + restart: unless-stopped + networks: + - dbgptnet + ports: + # serve frontend + - "16686:16686" + # accept jaeger.thrift over Thrift-compact protocol (used by most SDKs) + - "6831:6831" + # accept OpenTelemetry Protocol (OTLP) over HTTP + - "4318:4318" + # accept OpenTelemetry Protocol (OTLP) over gRPC + - "4317:4317" + - "14268:14268" + environment: + - LOG_LEVEL=debug + - SPAN_STORAGE_TYPE=badger + - BADGER_EPHEMERAL=false + - BADGER_DIRECTORY_VALUE=/badger/data + - BADGER_DIRECTORY_KEY=/badger/key + volumes: + - jaeger-badger:/badger + user: root + controller: + image: eosphorosai/dbgpt:latest + command: dbgpt start controller + restart: unless-stopped + environment: + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + networks: + - dbgptnet + llm-worker: + image: eosphorosai/dbgpt:latest + command: dbgpt start worker --model_type proxy --model_name chatgpt_proxyllm --model_path chatgpt_proxyllm --proxy_server_url ${OPENAI_API_BASE}/chat/completions --proxy_api_key ${OPENAI_API_KEY} --controller_addr http://controller:8000 + environment: + # Your real openai model name, e.g. gpt-3.5-turbo, gpt-4o + - PROXYLLM_BACKEND=gpt-3.5-turbo + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + depends_on: + - controller + restart: unless-stopped + networks: + - dbgptnet + ipc: host + embedding-worker: + image: eosphorosai/dbgpt:latest + command: dbgpt start worker --worker_type text2vec --model_name proxy_http_openapi --model_path proxy_http_openapi --proxy_server_url ${OPENAI_API_BASE}/embeddings --proxy_api_key ${OPENAI_API_KEY} --controller_addr http://controller:8000 + environment: + - proxy_http_openapi_proxy_backend=text-embedding-3-small + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + - DBGPT_LOG_LEVEL=DEBUG + depends_on: + - controller + restart: unless-stopped + networks: + - dbgptnet + ipc: host + webserver: + image: eosphorosai/dbgpt:latest + command: dbgpt start webserver --light --remote_embedding --controller_addr http://controller:8000 + environment: + - LLM_MODEL=chatgpt_proxyllm + - EMBEDDING_MODEL=proxy_http_openapi + - TRACER_TO_OPEN_TELEMETRY=True + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4317 + depends_on: + - controller + - llm-worker + - embedding-worker + volumes: + - dbgpt-data:/app/pilot/data + - dbgpt-message:/app/pilot/message + ports: + - 5670:5670/tcp + restart: unless-stopped + networks: + - dbgptnet +volumes: + dbgpt-data: + dbgpt-message: + jaeger-badger: +networks: + dbgptnet: + driver: bridge + name: dbgptnet +``` + +You can start the cluster with the following command: + +```bash +OPENAI_API_KEY="{your api key}" OPENAI_API_BASE="https://api.openai.com/v1" docker compose up -d +``` +Please replace `{your api key}` with your real OpenAI API key and `https://api.openai.com/v1` +with your real OpenAI API base URL. +You can see more details about the docker-compose file in the `docker/compose_examples/observability/docker-compose.yml` documentation. + +After the cluster is started, you can access the Jaeger UI at `http://localhost:16686` to view the traces. + +**Show RAG Conversation Trace** + ++ +
+ +In the above screenshot, you can see the trace of cross-service communication between the DB-GPT controller, LLM worker, and webserver. diff --git a/docs/sidebars.js b/docs/sidebars.js index b2057c5e4..2e26dfe5f 100755 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -345,6 +345,10 @@ const sidebars = { type: 'doc', id: 'application/advanced_tutorial/api', }, + { + type: 'doc', + id: 'application/advanced_tutorial/observability', + }, { type: 'doc', id: 'application/advanced_tutorial/debugging', diff --git a/docs/static/img/application/advanced_tutorial/observability_img1.png b/docs/static/img/application/advanced_tutorial/observability_img1.png new file mode 100644 index 000000000..1b3db72f9 Binary files /dev/null and b/docs/static/img/application/advanced_tutorial/observability_img1.png differ diff --git a/docs/static/img/application/advanced_tutorial/observability_img2.png b/docs/static/img/application/advanced_tutorial/observability_img2.png new file mode 100644 index 000000000..8c18fcb64 Binary files /dev/null and b/docs/static/img/application/advanced_tutorial/observability_img2.png differ diff --git a/docs/static/img/application/advanced_tutorial/observability_img3.png b/docs/static/img/application/advanced_tutorial/observability_img3.png new file mode 100644 index 000000000..ad8f7e034 Binary files /dev/null and b/docs/static/img/application/advanced_tutorial/observability_img3.png differ diff --git a/docs/static/img/application/advanced_tutorial/observability_img4.png b/docs/static/img/application/advanced_tutorial/observability_img4.png new file mode 100644 index 000000000..40b48a814 Binary files /dev/null and b/docs/static/img/application/advanced_tutorial/observability_img4.png differ diff --git a/docs/static/img/application/advanced_tutorial/observability_img5.png b/docs/static/img/application/advanced_tutorial/observability_img5.png new file mode 100644 index 000000000..3ecdc3950 Binary files /dev/null and b/docs/static/img/application/advanced_tutorial/observability_img5.png differ diff --git a/setup.py b/setup.py index 21de06ef8..c6f06071f 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ ) BUILD_VERSION_OPENAI = os.getenv("BUILD_VERSION_OPENAI") INCLUDE_QUANTIZATION = os.getenv("INCLUDE_QUANTIZATION", "true").lower() == "true" +INCLUDE_OBSERVABILITY = os.getenv("INCLUDE_OBSERVABILITY", "true").lower() == "true" def parse_requirements(file_name: str) -> List[str]: @@ -629,6 +630,9 @@ def openai_requires(): else: setup_spec.extras["openai"].append("openai") + if INCLUDE_OBSERVABILITY: + setup_spec.extras["openai"] += setup_spec.extras["observability"] + setup_spec.extras["openai"] += setup_spec.extras["framework"] setup_spec.extras["openai"] += setup_spec.extras["rag"] @@ -654,6 +658,19 @@ def cache_requires(): setup_spec.extras["cache"] = ["rocksdict"] +def observability_requires(): + """ + pip install "dbgpt[observability]" + + Send DB-GPT traces to OpenTelemetry compatible backends. + """ + setup_spec.extras["observability"] = [ + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp", + ] + + def default_requires(): """ pip install "dbgpt[default]" @@ -672,10 +689,12 @@ def default_requires(): setup_spec.extras["default"] += setup_spec.extras["rag"] setup_spec.extras["default"] += setup_spec.extras["datasource"] setup_spec.extras["default"] += setup_spec.extras["torch"] + setup_spec.extras["default"] += setup_spec.extras["cache"] if INCLUDE_QUANTIZATION: # Add quantization extra to default, default is True setup_spec.extras["default"] += setup_spec.extras["quantization"] - setup_spec.extras["default"] += setup_spec.extras["cache"] + if INCLUDE_OBSERVABILITY: + setup_spec.extras["default"] += setup_spec.extras["observability"] def all_requires(): @@ -699,11 +718,12 @@ def init_install_requires(): all_vector_store_requires() all_datasource_requires() knowledge_requires() -openai_requires() gpt4all_requires() vllm_requires() cache_requires() +observability_requires() +openai_requires() # must be last default_requires() all_requires()