From 045644dd9504ccb8c6b859a9bbce841d918c0adb Mon Sep 17 00:00:00 2001 From: Ciprian Jichici Date: Tue, 24 Dec 2024 15:56:36 +0200 Subject: [PATCH] Improve the use of OpenTelemetry spans --- .../CoreAPI/Controllers/CompletionsController.cs | 4 ++-- src/python/AgentHubAPI/app/routers/manage.py | 2 +- src/python/DataSourceHubAPI/app/routers/manage.py | 2 +- .../GatekeeperIntegrationAPI/app/routers/analyze.py | 2 +- .../GatekeeperIntegrationAPI/app/routers/manage.py | 2 +- src/python/LangChainAPI/app/routers/completions.py | 10 +++++----- src/python/LangChainAPI/app/routers/manage.py | 4 +++- src/python/PromptHubAPI/app/routers/manage.py | 2 +- .../agents/langchain_knowledge_management_agent.py | 2 +- 9 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/dotnet/CoreAPI/Controllers/CompletionsController.cs b/src/dotnet/CoreAPI/Controllers/CompletionsController.cs index 9f7df3f926..1b4caade8f 100644 --- a/src/dotnet/CoreAPI/Controllers/CompletionsController.cs +++ b/src/dotnet/CoreAPI/Controllers/CompletionsController.cs @@ -75,7 +75,7 @@ public async Task GetCompletion(string instanceId, [FromBody] Com { using var telemetryActivity = TelemetryActivitySources.CoreAPIActivitySource.StartActivity( TelemetryActivityNames.CoreAPI_Completions_GetCompletion, - ActivityKind.Consumer, + ActivityKind.Server, parentContext: default, tags: new Dictionary { @@ -102,7 +102,7 @@ public async Task> StartCompletionOperation(s { using var telemetryActivity = TelemetryActivitySources.CoreAPIActivitySource.StartActivity( TelemetryActivityNames.CoreAPI_AsyncCompletions_StartCompletionOperation, - ActivityKind.Consumer, + ActivityKind.Server, parentContext: default, tags: new Dictionary { diff --git a/src/python/AgentHubAPI/app/routers/manage.py b/src/python/AgentHubAPI/app/routers/manage.py index b08b609e6c..5556b3736c 100644 --- a/src/python/AgentHubAPI/app/routers/manage.py +++ b/src/python/AgentHubAPI/app/routers/manage.py @@ -33,7 +33,7 @@ async def refresh_cache(name: str): The name of the cache object to refresh. "config", for example. """ - with tracer.start_span('refresh_cache') as span: + with tracer.start_as_current_span('refresh_cache') as span: span.set_attribute('cache_name', name) span.add_event(f'{API_NAME} {name} cache refresh requested.') diff --git a/src/python/DataSourceHubAPI/app/routers/manage.py b/src/python/DataSourceHubAPI/app/routers/manage.py index b08b609e6c..5556b3736c 100644 --- a/src/python/DataSourceHubAPI/app/routers/manage.py +++ b/src/python/DataSourceHubAPI/app/routers/manage.py @@ -33,7 +33,7 @@ async def refresh_cache(name: str): The name of the cache object to refresh. "config", for example. """ - with tracer.start_span('refresh_cache') as span: + with tracer.start_as_current_span('refresh_cache') as span: span.set_attribute('cache_name', name) span.add_event(f'{API_NAME} {name} cache refresh requested.') diff --git a/src/python/GatekeeperIntegrationAPI/app/routers/analyze.py b/src/python/GatekeeperIntegrationAPI/app/routers/analyze.py index ba49413629..8bdc717426 100644 --- a/src/python/GatekeeperIntegrationAPI/app/routers/analyze.py +++ b/src/python/GatekeeperIntegrationAPI/app/routers/analyze.py @@ -34,7 +34,7 @@ async def analyze(request: AnalyzeRequest) -> AnalyzeResponse: If the request includes anonymize=True, the original content will be returned anonymized. """ - #with tracer.start_span('analyze') as span: + #with tracer.start_as_current_span('analyze') as span: try: analyzer = Analyzer(request) return analyzer.analyze() diff --git a/src/python/GatekeeperIntegrationAPI/app/routers/manage.py b/src/python/GatekeeperIntegrationAPI/app/routers/manage.py index aacf2b24be..221a9b497d 100644 --- a/src/python/GatekeeperIntegrationAPI/app/routers/manage.py +++ b/src/python/GatekeeperIntegrationAPI/app/routers/manage.py @@ -33,7 +33,7 @@ async def refresh_cache(name: str): The name of the cache object to refresh. "config", for example. """ - #with tracer.start_span('refresh_cache') as span: + #with tracer.start_as_current_span('refresh_cache') as span: # span.set_attribute('cache_name', name) # span.add_event(f'{API_NAME} {name} cache refresh requested.') diff --git a/src/python/LangChainAPI/app/routers/completions.py b/src/python/LangChainAPI/app/routers/completions.py index ddcda0c5bf..1cbc29dfeb 100644 --- a/src/python/LangChainAPI/app/routers/completions.py +++ b/src/python/LangChainAPI/app/routers/completions.py @@ -82,7 +82,7 @@ async def submit_completion_request( CompletionOperation Object containing the operation ID and status. """ - with tracer.start_span('langchainapi_submit_completion_request', kind=SpanKind.CONSUMER) as span: + with tracer.start_as_current_span('langchainapi_submit_completion_request', kind=SpanKind.SERVER) as span: try: # Get the operation_id from the completion request. operation_id = completion_request.operation_id @@ -129,7 +129,7 @@ async def create_completion_response( """ Generates the completion response for the specified completion request. """ - with tracer.start_span(f'langchainapi_create_completion_response', kind=SpanKind.CONSUMER) as span: + with tracer.start_as_current_span('langchainapi_create_completion_response', kind=SpanKind.SERVER) as span: try: span.set_attribute('operation_id', operation_id) span.set_attribute('instance_id', instance_id) @@ -212,7 +212,7 @@ async def get_operation_status( instance_id: str, operation_id: str ) -> LongRunningOperation: - with tracer.start_span(f'langchainapi_get_operation_status', kind=SpanKind.CONSUMER) as span: + with tracer.start_as_current_span('langchainapi_get_operation_status', kind=SpanKind.SERVER) as span: # Create an operations manager to get the operation status. operations_manager = OperationsManager(raw_request.app.extra['config']) @@ -247,7 +247,7 @@ async def get_operation_result( instance_id: str, operation_id: str ) -> CompletionResponse: - with tracer.start_span(f'langchainapi_get_operation_result', kind=SpanKind.CONSUMER) as span: + with tracer.start_as_current_span('langchainapi_get_operation_result', kind=SpanKind.SERVER) as span: # Create an operations manager to get the operation result. operations_manager = OperationsManager(raw_request.app.extra['config']) @@ -280,7 +280,7 @@ async def get_operation_logs( instance_id: str, operation_id: str ) -> List[LongRunningOperationLogEntry]: - with tracer.start_span(f'langchainapi_get_operation_log', kind=SpanKind.CONSUMER) as span: + with tracer.start_as_current_span('langchainapi_get_operation_log', kind=SpanKind.SERVER) as span: # Create an operations manager to get the operation log. operations_manager = OperationsManager(raw_request.app.extra['config']) diff --git a/src/python/LangChainAPI/app/routers/manage.py b/src/python/LangChainAPI/app/routers/manage.py index 93de03b7a4..719a211a3d 100644 --- a/src/python/LangChainAPI/app/routers/manage.py +++ b/src/python/LangChainAPI/app/routers/manage.py @@ -3,6 +3,8 @@ """ import time from fastapi import APIRouter, Depends, HTTPException +from opentelemetry.trace import SpanKind + from foundationallm.telemetry import Telemetry from app.dependencies import ( API_NAME, @@ -33,7 +35,7 @@ async def refresh_cache(instance_id: str, name: str): The name of the cache object to refresh. "config", for example. """ - with tracer.start_span('refresh_cache') as span: + with tracer.start_as_current_span('refresh_cache', kind=SpanKind.SERVER) as span: span.set_attribute('instance_id', instance_id) span.set_attribute('cache_name', name) span.add_event(f'{API_NAME} {name} cache refresh requested.') diff --git a/src/python/PromptHubAPI/app/routers/manage.py b/src/python/PromptHubAPI/app/routers/manage.py index b08b609e6c..5556b3736c 100644 --- a/src/python/PromptHubAPI/app/routers/manage.py +++ b/src/python/PromptHubAPI/app/routers/manage.py @@ -33,7 +33,7 @@ async def refresh_cache(name: str): The name of the cache object to refresh. "config", for example. """ - with tracer.start_span('refresh_cache') as span: + with tracer.start_as_current_span('refresh_cache') as span: span.set_attribute('cache_name', name) span.add_event(f'{API_NAME} {name} cache refresh requested.') diff --git a/src/python/PythonSDK/foundationallm/langchain/agents/langchain_knowledge_management_agent.py b/src/python/PythonSDK/foundationallm/langchain/agents/langchain_knowledge_management_agent.py index a869608dd9..e71b912303 100644 --- a/src/python/PythonSDK/foundationallm/langchain/agents/langchain_knowledge_management_agent.py +++ b/src/python/PythonSDK/foundationallm/langchain/agents/langchain_knowledge_management_agent.py @@ -539,7 +539,7 @@ async def invoke_async(self, request: KnowledgeManagementCompletionRequest) -> C else: messages = [] - with self.tracer.start_span(f'langchain_invoke_external_workflow', kind=SpanKind.CONSUMER) as span: + with self.tracer.start_as_current_span('langchain_invoke_external_workflow', kind=SpanKind.SERVER) as span: response = await workflow.invoke_async( operation_id=request.operation_id, user_prompt=parsed_user_prompt,