Skip to content

Commit

Permalink
Merge branch 'refs/heads/feat/cleaning_ops_trace' into deploy/dev
Browse files Browse the repository at this point in the history
* refs/heads/feat/cleaning_ops_trace:
  fix: type definition error
  • Loading branch information
ZhouhaoJiang committed Jun 24, 2024
2 parents d94380c + 4293361 commit 1a33d88
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 145 deletions.
3 changes: 1 addition & 2 deletions api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,9 @@ def _handle_response(self, application_generate_entity: WorkflowAppGenerateEntit
user=user,
stream=stream
)
app_id = application_generate_entity.app_config.app_id

try:
return generate_task_pipeline.process(app_id, workflow)
return generate_task_pipeline.process()
except ValueError as e:
if e.args[0] == "I/O operation on closed file.": # ignore this error
raise GenerateTaskStoppedException()
Expand Down
6 changes: 1 addition & 5 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ def __init__(self, application_generate_entity: WorkflowAppGenerateEntity,
self._stream_generate_nodes = self._get_stream_generate_nodes()
self._iteration_nested_relations = self._get_iteration_nested_relations(self._workflow.graph_dict)

def process(
self,
app_id: Optional[str] = None,
workflow: Optional[Workflow] = None,
) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""
Process generate task pipeline.
:return:
Expand Down
83 changes: 4 additions & 79 deletions api/core/ops/base_trace_instance.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod

from core.ops.entities.trace_entity import BaseTraceInfo


class BaseTraceInstance(ABC):
"""
Expand All @@ -15,86 +17,9 @@ def __init__(self):
...

@abstractmethod
def trace(self, **kwargs):
def trace(self, trace_info: BaseTraceInfo):
"""
Abstract method to trace activities.
Subclasses must implement specific tracing logic for activities.
"""
return kwargs

@abstractmethod
def message_trace(self, **kwargs):
"""
Abstract method to trace messaging activities.
Subclasses must implement specific tracing logic for messages.
"""
return kwargs

@abstractmethod
def moderation_trace(self, **kwargs):
"""
Abstract method to trace moderation activities.
Subclasses must implement specific tracing logic for content moderation.
"""
return kwargs

@abstractmethod
def suggested_question_trace(self, **kwargs):
"""
Abstract method to trace suggested questions in a conversation or system.
Subclasses must implement specific tracing logic for tracking suggested questions.
"""
return kwargs

@abstractmethod
def dataset_retrieval_trace(self, **kwargs):
"""
Abstract method to trace data retrieval activities.
Subclasses must implement specific tracing logic for data retrieval operations.
"""
return kwargs

@abstractmethod
def tool_trace(self, **kwargs):
"""
Abstract method to trace the usage of tools within the system.
Subclasses must implement specific tracing logic for tool interactions.
"""
return kwargs

@abstractmethod
def generate_name_trace(self, **kwargs):
"""
Abstract method to trace the generation of names or identifiers within the system.
Subclasses must implement specific tracing logic for name generation activities.
"""
return kwargs

@abstractmethod
def api_check(self, **kwargs):
"""
Abstract method to trace API check activities.
Subclasses must implement specific tracing logic for API check operations.
"""
return kwargs

@abstractmethod
def obfuscate_config(self, **kwargs):
"""
Obfuscate configuration data.
"""
return kwargs

@abstractmethod
def encrypt_config(self, **kwargs):
"""
Encrypt configuration data.
"""
return kwargs

@abstractmethod
def decryption_config(self, **kwargs):
"""
Decrypt configuration data.
"""
return kwargs
...
77 changes: 25 additions & 52 deletions api/core/ops/entities/trace_entity.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from datetime import datetime
from typing import Any, Union
from typing import Any, Optional, Union

from pydantic import BaseModel


class WorkflowTraceInfo(BaseModel):
class BaseTraceInfo(BaseModel):
message_id: str
message_data: Any
inputs: Union[str, dict[str, Any], list, None]
outputs: Union[str, dict[str, Any], list, None]
start_time: datetime
end_time: datetime
metadata: dict[str, Any]


class WorkflowTraceInfo(BaseTraceInfo):
workflow_data: Any
conversation_id: Union[str, None]
conversation_id: Optional[str] = None
workflow_id: str
tenant_id: str
workflow_run_id: str
Expand All @@ -15,100 +25,63 @@ class WorkflowTraceInfo(BaseModel):
workflow_run_inputs: dict[str, Any]
workflow_run_outputs: dict[str, Any]
workflow_run_version: str
error: Union[str, None]
error: Optional[str] = None
total_tokens: int
file_list: list[str]
query: str
metadata: dict[str, Any]


class MessageTraceInfo(BaseModel):
message_data: Any
class MessageTraceInfo(BaseTraceInfo):
conversation_model: str
message_tokens: int
answer_tokens: int
total_tokens: int
error: str
inputs: Union[str, dict[str, Any], list, None]
outputs: Union[str, dict[str, Any], list, None]
error: Optional[str] = None
file_list: list[str]
start_at: datetime
end_time: datetime
metadata: dict[str, Any]
message_file_data: Any
conversation_mode: str


class ModerationTraceInfo(BaseModel):
message_id: str
inputs: dict[str, Any]
message_data: Any
class ModerationTraceInfo(BaseTraceInfo):
flagged: bool
action: str
preset_response: str
query: str
start_time: datetime
end_time: datetime
metadata: dict[str, Any]


#
class SuggestedQuestionTraceInfo(BaseModel):
message_id: str
message_data: Any
inputs: Union[str, dict[str, Any], list, None]
outputs: Union[str, dict[str, Any], list, None]
start_time: datetime
end_time: datetime
metadata: dict[str, Any]
class SuggestedQuestionTraceInfo(BaseTraceInfo):
total_tokens: int
status: Union[str, None]
error: Union[str, None]
status: Optional[str] = None
error: Optional[str] = None
from_account_id: str
agent_based: bool
from_source: str
model_provider: str
model_id: str
suggested_question: list[str]
level: str
status_message: Union[str, None]
status_message: Optional[str] = None


class DatasetRetrievalTraceInfo(BaseModel):
message_id: str
inputs: Union[str, dict[str, Any], list, None]
class DatasetRetrievalTraceInfo(BaseTraceInfo):
documents: Any
start_time: datetime
end_time: datetime
metadata: dict[str, Any]
message_data: Any


class ToolTraceInfo(BaseModel):
message_id: str
message_data: Any
class ToolTraceInfo(BaseTraceInfo):
tool_name: str
start_time: datetime
end_time: datetime
tool_inputs: dict[str, Any]
tool_outputs: str
metadata: dict[str, Any]
message_file_data: Any
error: Union[str, None]
inputs: Union[str, dict[str, Any], list, None]
outputs: Union[str, dict[str, Any], list, None]
error: Optional[str] = None
tool_config: dict[str, Any]
time_cost: Union[int, float]
tool_parameters: dict[str, Any]
file_url: Union[str, None, list]


class GenerateNameTraceInfo(BaseModel):
class GenerateNameTraceInfo(BaseTraceInfo):
conversation_id: str
inputs: Union[str, dict[str, Any], list, None]
outputs: Union[str, dict[str, Any], list, None]
start_time: datetime
end_time: datetime
metadata: dict[str, Any]
tenant_id: str

9 changes: 5 additions & 4 deletions api/core/ops/langfuse_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
UnitEnum,
)
from core.ops.entities.trace_entity import (
BaseTraceInfo,
DatasetRetrievalTraceInfo,
GenerateNameTraceInfo,
MessageTraceInfo,
Expand Down Expand Up @@ -49,7 +50,7 @@ def __init__(
)
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")

def trace(self, trace_info, **kwargs):
def trace(self, trace_info: BaseTraceInfo):
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
if isinstance(trace_info, MessageTraceInfo):
Expand Down Expand Up @@ -293,7 +294,7 @@ def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):
self.langfuse_client.trace(**format_trace_data)
logger.debug("LangFuse Trace created successfully")
except Exception as e:
raise f"LangFuse Failed to create trace: {str(e)}"
raise ValueError(f"LangFuse Failed to create trace: {str(e)}")

def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
format_span_data = (
Expand All @@ -303,7 +304,7 @@ def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
self.langfuse_client.span(**format_span_data)
logger.debug("LangFuse Span created successfully")
except Exception as e:
raise f"LangFuse Failed to create span: {str(e)}"
raise ValueError(f"LangFuse Failed to create span: {str(e)}")

def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):
format_span_data = (
Expand All @@ -324,7 +325,7 @@ def add_generation(
self.langfuse_client.generation(**format_generation_data)
logger.debug("LangFuse Generation created successfully")
except Exception as e:
raise f"LangFuse Failed to create generation: {str(e)}"
raise ValueError(f"LangFuse Failed to create generation: {str(e)}")

def update_generation(
self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None
Expand Down
7 changes: 4 additions & 3 deletions api/core/ops/langsmith_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.langsmith_trace_entity import LangSmithRunModel, LangSmithRunType, LangSmithRunUpdateModel
from core.ops.entities.trace_entity import (
BaseTraceInfo,
DatasetRetrievalTraceInfo,
GenerateNameTraceInfo,
MessageTraceInfo,
Expand Down Expand Up @@ -42,7 +43,7 @@ def __init__(
)
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")

def trace(self, trace_info, **kwargs):
def trace(self, trace_info: BaseTraceInfo):
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
if isinstance(trace_info, MessageTraceInfo):
Expand Down Expand Up @@ -304,7 +305,7 @@ def add_run(self, run_data: LangSmithRunModel):
self.langsmith_client.create_run(**data)
logger.debug("LangSmith Run created successfully.")
except Exception as e:
raise f"LangSmith Failed to create run: {str(e)}"
raise ValueError(f"LangSmith Failed to create run: {str(e)}")

def update_run(self, update_run_data: LangSmithRunUpdateModel):
data = update_run_data.model_dump()
Expand All @@ -313,7 +314,7 @@ def update_run(self, update_run_data: LangSmithRunUpdateModel):
self.langsmith_client.update_run(**data)
logger.debug("LangSmith Run updated successfully.")
except Exception as e:
raise f"LangSmith Failed to update run: {str(e)}"
raise ValueError(f"LangSmith Failed to update run: {str(e)}")

def api_check(self):
try:
Expand Down

0 comments on commit 1a33d88

Please sign in to comment.