Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/fix ops trace #5672

Merged
merged 78 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
cd7c586
feat: add llm ops tracing
ZhouhaoJiang Jun 16, 2024
a40433c
feat: add remove tracing app
ZhouhaoJiang Jun 17, 2024
4eb0c4d
feat: update trace table
ZhouhaoJiang Jun 17, 2024
000e855
feat: change table struct
ZhouhaoJiang Jun 18, 2024
e6def58
feat: change TraceAppConfigApi request type patch
ZhouhaoJiang Jun 18, 2024
6cfeb1a
feat: update default reply when config is none
ZhouhaoJiang Jun 18, 2024
98e1c30
fix: advanced chat trace error
ZhouhaoJiang Jun 19, 2024
1fc0225
feat: add trace ops api check
ZhouhaoJiang Jun 19, 2024
a454813
fix: uuid error
ZhouhaoJiang Jun 19, 2024
07531ab
feat: update workflow trace conversation_id
ZhouhaoJiang Jun 19, 2024
ebea85d
feat: format input and out
ZhouhaoJiang Jun 20, 2024
5eb901a
feat: generate tracing_instance in app_generator
ZhouhaoJiang Jun 21, 2024
5ae066a
feat: update down_revision
ZhouhaoJiang Jun 21, 2024
04af80c
feat: add trace_config_check_error
ZhouhaoJiang Jun 21, 2024
f604b7c
feat: update poetry.lock
ZhouhaoJiang Jun 21, 2024
a0a7c75
feat: format llm workflow trace
ZhouhaoJiang Jun 21, 2024
660d4e5
feat: add ops trace encrypt config decrypt_config obfuscate_config
ZhouhaoJiang Jun 20, 2024
9470169
feat: update the file structure
ZhouhaoJiang Jun 23, 2024
d1c8d69
fix: moderation trace message_id error
ZhouhaoJiang Jun 23, 2024
8d2f08d
feat: adding comments for BaseTraceInstance
ZhouhaoJiang Jun 23, 2024
0d798ac
fix: tracing_provider null error
ZhouhaoJiang Jun 24, 2024
41e2347
fix: completion-messages trace instance error
ZhouhaoJiang Jun 24, 2024
8cb809c
fix: add trace entity
ZhouhaoJiang Jun 24, 2024
41e936d
feat: add langfuse and langsmith trace class
ZhouhaoJiang Jun 24, 2024
c1bc774
fix: message trace start time error
ZhouhaoJiang Jun 24, 2024
7ee9616
feat: update base_trace_instance.py
ZhouhaoJiang Jun 24, 2024
54fc284
fix: workflow_trace llm error
ZhouhaoJiang Jun 24, 2024
0c10f77
fix: api_check_trace error
ZhouhaoJiang Jun 24, 2024
4293361
fix: type definition error
ZhouhaoJiang Jun 24, 2024
c77a738
fix: remove chinese
ZhouhaoJiang Jun 24, 2024
1d652e6
feat: remove trace folder
ZhouhaoJiang Jun 24, 2024
ad7fbc7
fix: the field is indeed wrong
ZhouhaoJiang Jun 24, 2024
d2ffc48
feat: trace manager generated in app generator
ZhouhaoJiang Jun 24, 2024
f815e7e
feat: add workflow_node_executions created time precision
ZhouhaoJiang Jun 24, 2024
ad45808
fix: update trace config error
ZhouhaoJiang Jun 24, 2024
c787fac
fix: remove superfluous tracing instance
ZhouhaoJiang Jun 24, 2024
53c033e
feat: through trace queue manager generate trace_instance
ZhouhaoJiang Jun 24, 2024
afbd778
chore: get rid of useless stuff
ZhouhaoJiang Jun 24, 2024
e6294b1
chore: change ops_trace_service location
ZhouhaoJiang Jun 24, 2024
c885fe0
Merge branch 'refs/heads/main' into feat/cleaning_ops_trace
ZhouhaoJiang Jun 24, 2024
7701aaf
update WorkflowNodeExecution init created_at
takatost Jun 24, 2024
16511dc
Merge remote-tracking branch 'origin/feat/cleaning_ops_trace' into fe…
ZhouhaoJiang Jun 24, 2024
bb2ad52
feat: update poetry.lock
ZhouhaoJiang Jun 24, 2024
3d1b27a
feat: add provider_config_map
ZhouhaoJiang Jun 24, 2024
7bf8faa
fix: trace queue manager error
ZhouhaoJiang Jun 25, 2024
084112e
fix: workflow on_tool_end trace_manager error
ZhouhaoJiang Jun 25, 2024
202872e
chore: change the trace structure
ZhouhaoJiang Jun 25, 2024
27c2446
fix: pydantic model_ warning
ZhouhaoJiang Jun 25, 2024
0971768
fix: conversation none error
ZhouhaoJiang Jun 25, 2024
13a0104
fix: from_account_id type error
ZhouhaoJiang Jun 25, 2024
b5df587
chore: update pydantic warning and tracing_provider_map
ZhouhaoJiang Jun 25, 2024
a760d0d
fix: tracing_provider error
ZhouhaoJiang Jun 25, 2024
0117310
fix: invalid tracing provider None
ZhouhaoJiang Jun 25, 2024
d5b9fbd
fix: trace_manager none error
ZhouhaoJiang Jun 25, 2024
fca9121
fix: conversation generate error
ZhouhaoJiang Jun 25, 2024
9590da7
fix generate conversation name
takatost Jun 25, 2024
f88d915
feat: add BaseTraceInfo field validator
ZhouhaoJiang Jun 25, 2024
1e6cc9d
feat: update MessageTraceInfo
ZhouhaoJiang Jun 25, 2024
a21da4c
feat: update suggested question trace manager
ZhouhaoJiang Jun 25, 2024
3f9de5c
optimize error msg
takatost Jun 25, 2024
81a6f80
fix _invoke_error_mapping of tongyi tts
takatost Jun 25, 2024
568b4d4
fix
takatost Jun 25, 2024
a071529
feat: optimize file_list
ZhouhaoJiang Jun 25, 2024
f426e55
Merge remote-tracking branch 'origin/feat/cleaning_ops_trace' into fe…
ZhouhaoJiang Jun 25, 2024
10cd08e
optimize
takatost Jun 25, 2024
8bf2c38
feat: update workflow trace
ZhouhaoJiang Jun 25, 2024
da93485
feat: update langfuse trace structure
ZhouhaoJiang Jun 26, 2024
91c177b
Resolve merge conflicts
ZhouhaoJiang Jun 26, 2024
890585c
fix: generate name trace info error
ZhouhaoJiang Jun 26, 2024
fad2053
feat: update migrate file
ZhouhaoJiang Jun 26, 2024
969ac35
feat: add ops_trace celery
ZhouhaoJiang Jun 27, 2024
9e200e7
feat: update trace queue manager timer
ZhouhaoJiang Jun 27, 2024
ebc91a5
Resolve merge conflicts and fix linter issues
ZhouhaoJiang Jun 27, 2024
3585bfc
feat: update langfuse tool_trace error
ZhouhaoJiang Jun 27, 2024
27419fa
fix: tool_trace level error
ZhouhaoJiang Jun 27, 2024
b107090
fix: remove logging error and info
ZhouhaoJiang Jun 27, 2024
28f19b2
fix: remove trace manager run start timer
ZhouhaoJiang Jun 27, 2024
0db04aa
feat: add try except in trace manager run
ZhouhaoJiang Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/post_create_command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
cd web && npm install

echo 'alias start-api="cd /workspaces/dify/api && flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace"' >> ~/.bashrc
echo 'alias start-web="cd /workspaces/dify/web && npm run dev"' >> ~/.bashrc
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify up -d"' >> ~/.bashrc

Expand Down
14 changes: 13 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,19 @@
"FLASK_DEBUG": "1",
"GEVENT_SUPPORT": "True"
},
"args": ["-A", "app.celery", "worker", "-P", "gevent", "-c", "1", "--loglevel", "info", "-Q", "dataset,generation,mail"],
"args": [
"-A",
"app.celery",
"worker",
"-P",
"gevent",
"-c",
"1",
"--loglevel",
"info",
"-Q",
"dataset,generation,mail,ops_trace"
]
},
]
}
2 changes: 1 addition & 1 deletion api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
10. If you need to debug local async processing, please start the worker service.

```bash
poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail
poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace
```

The started celery app handles the async tasks, e.g. dataset importing and documents indexing.
Expand Down
2 changes: 0 additions & 2 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from commands import register_commands

# DO NOT REMOVE BELOW
from events import event_handlers
from extensions import (
ext_celery,
ext_code_based_extension,
Expand All @@ -43,7 +42,6 @@
from extensions.ext_database import db
from extensions.ext_login import login_manager
from libs.passport import PassportService
from models import account, dataset, model, source, task, tool, tools, web
from services.account_service import AccountService

# DO NOT REMOVE ABOVE
Expand Down
2 changes: 1 addition & 1 deletion api/core/moderation/input_moderation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def check(
timer=timer
)
)

if not moderation_result.flagged:
return False, inputs, query

Expand Down
12 changes: 11 additions & 1 deletion api/core/ops/entities/trace_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,15 @@ class ToolTraceInfo(BaseTraceInfo):


class GenerateNameTraceInfo(BaseTraceInfo):
conversation_id: str
conversation_id: Optional[str] = None
tenant_id: str

trace_info_info_map = {
'WorkflowTraceInfo': WorkflowTraceInfo,
'MessageTraceInfo': MessageTraceInfo,
'ModerationTraceInfo': ModerationTraceInfo,
'SuggestedQuestionTraceInfo': SuggestedQuestionTraceInfo,
'DatasetRetrievalTraceInfo': DatasetRetrievalTraceInfo,
'ToolTraceInfo': ToolTraceInfo,
'GenerateNameTraceInfo': GenerateNameTraceInfo,
}
31 changes: 29 additions & 2 deletions api/core/ops/langfuse_trace/langfuse_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
# add span
if trace_info.message_id:
span_data = LangfuseSpan(
id=node_execution_id,
name=f"{node_name}_{node_execution_id}",
input=inputs,
output=outputs,
Expand All @@ -160,6 +161,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
)
else:
span_data = LangfuseSpan(
id=node_execution_id,
name=f"{node_name}_{node_execution_id}",
input=inputs,
output=outputs,
Expand All @@ -173,6 +175,30 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):

self.add_span(langfuse_span_data=span_data)

process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
if process_data and process_data.get("model_mode") == "chat":
total_token = metadata.get("total_tokens", 0)
# add generation
generation_usage = GenerationUsage(
totalTokens=total_token,
)

node_generation_data = LangfuseGeneration(
name=f"generation_{node_execution_id}",
trace_id=trace_id,
parent_observation_id=node_execution_id,
start_time=created_at,
end_time=finished_at,
input=inputs,
output=outputs,
metadata=metadata,
level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
status_message=trace_info.error if trace_info.error else "",
usage=generation_usage,
)

self.add_generation(langfuse_generation_data=node_generation_data)

def message_trace(
self, trace_info: MessageTraceInfo, **kwargs
):
Expand All @@ -186,7 +212,7 @@ def message_trace(
if message_data.from_end_user_id:
end_user_data: EndUser = db.session.query(EndUser).filter(
EndUser.id == message_data.from_end_user_id
).first().session_id
).first()
user_id = end_user_data.session_id

trace_data = LangfuseTrace(
Expand Down Expand Up @@ -220,6 +246,7 @@ def message_trace(
output=trace_info.answer_tokens,
total=trace_info.total_tokens,
unit=UnitEnum.TOKENS,
totalCost=message_data.total_price,
)

langfuse_generation_data = LangfuseGeneration(
Expand Down Expand Up @@ -303,7 +330,7 @@ def tool_trace(self, trace_info: ToolTraceInfo):
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
level=LevelEnum.DEFAULT if trace_info.error == "" or trace_info.error is None else LevelEnum.ERROR,
status_message=trace_info.error,
)

Expand Down
115 changes: 78 additions & 37 deletions api/core/ops/ops_trace_manager.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import json
import logging
import os
import queue
import threading
import time
from datetime import timedelta
from enum import Enum
from typing import Any, Optional, Union
from uuid import UUID

from flask import Flask, current_app
from flask import current_app

from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import (
LangfuseConfig,
LangSmithConfig,
Expand All @@ -31,6 +32,7 @@
from extensions.ext_database import db
from models.model import App, AppModelConfig, Conversation, Message, MessageAgentThought, MessageFile, TraceAppConfig
from models.workflow import WorkflowAppLog, WorkflowRun
from tasks.ops_trace_task import process_trace_tasks

provider_config_map = {
TracingProviderEnum.LANGFUSE.value: {
Expand Down Expand Up @@ -105,7 +107,7 @@ def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_c
return config_class(**new_config).model_dump()

@classmethod
def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config:dict):
def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
"""
Decrypt tracing config
:param tracing_provider: tracing provider
Expand Down Expand Up @@ -295,11 +297,9 @@ def __init__(
self.kwargs = kwargs
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")

def execute(self, trace_instance: BaseTraceInstance):
def execute(self):
method_name, trace_info = self.preprocess()
if trace_instance:
method = trace_instance.trace
method(trace_info)
return trace_info

def preprocess(self):
if self.trace_type == TraceTaskName.CONVERSATION_TRACE:
Expand Down Expand Up @@ -372,7 +372,7 @@ def workflow_trace(self, workflow_run: WorkflowRun, conversation_id):
}

workflow_trace_info = WorkflowTraceInfo(
workflow_data=workflow_run,
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,
workflow_id=workflow_id,
tenant_id=tenant_id,
Expand Down Expand Up @@ -427,7 +427,8 @@ def message_trace(self, message_id):
message_tokens = message_data.message_tokens

message_trace_info = MessageTraceInfo(
message_data=message_data,
message_id=message_id,
message_data=message_data.to_dict(),
conversation_model=conversation_mode,
message_tokens=message_tokens,
answer_tokens=message_data.answer_tokens,
Expand Down Expand Up @@ -469,7 +470,7 @@ def moderation_trace(self, message_id, timer, **kwargs):
moderation_trace_info = ModerationTraceInfo(
message_id=workflow_app_log_id if workflow_app_log_id else message_id,
inputs=inputs,
message_data=message_data,
message_data=message_data.to_dict(),
flagged=moderation_result.flagged,
action=moderation_result.action,
preset_response=moderation_result.preset_response,
Expand Down Expand Up @@ -508,7 +509,7 @@ def suggested_question_trace(self, message_id, timer, **kwargs):

suggested_question_trace_info = SuggestedQuestionTraceInfo(
message_id=workflow_app_log_id if workflow_app_log_id else message_id,
message_data=message_data,
message_data=message_data.to_dict(),
inputs=message_data.message,
outputs=message_data.answer,
start_time=timer.get("start"),
Expand Down Expand Up @@ -550,11 +551,11 @@ def dataset_retrieval_trace(self, message_id, timer, **kwargs):
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
message_id=message_id,
inputs=message_data.query if message_data.query else message_data.inputs,
documents=documents,
documents=[doc.model_dump() for doc in documents],
start_time=timer.get("start"),
end_time=timer.get("end"),
metadata=metadata,
message_data=message_data,
message_data=message_data.to_dict(),
)

return dataset_retrieval_trace_info
Expand Down Expand Up @@ -613,7 +614,7 @@ def tool_trace(self, message_id, timer, **kwargs):

tool_trace_info = ToolTraceInfo(
message_id=message_id,
message_data=message_data,
message_data=message_data.to_dict(),
tool_name=tool_name,
start_time=timer.get("start") if timer else created_time,
end_time=timer.get("end") if timer else end_time,
Expand Down Expand Up @@ -657,31 +658,71 @@ def generate_name_trace(self, conversation_id, timer, **kwargs):
return generate_name_trace_info


trace_manager_timer = None
trace_manager_queue = queue.Queue()
trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 1))
trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))


class TraceQueueManager:
def __init__(self, app_id=None, conversation_id=None, message_id=None):
tracing_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
self.queue = queue.Queue()
self.is_running = True
self.thread = threading.Thread(
target=self.process_queue, kwargs={
'flask_app': current_app._get_current_object(),
'trace_instance': tracing_instance
}
)
self.thread.start()
global trace_manager_timer

def stop(self):
self.is_running = False

def process_queue(self, flask_app: Flask, trace_instance: BaseTraceInstance):
with flask_app.app_context():
while self.is_running:
try:
task = self.queue.get(timeout=60)
task.execute(trace_instance)
self.queue.task_done()
except queue.Empty:
self.stop()
self.app_id = app_id
self.conversation_id = conversation_id
self.message_id = message_id
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
self.flask_app = current_app._get_current_object()
if trace_manager_timer is None:
self.start_timer()

def add_trace_task(self, trace_task: TraceTask):
self.queue.put(trace_task)
global trace_manager_timer
global trace_manager_queue
try:
if self.trace_instance:
trace_manager_queue.put(trace_task)
except Exception as e:
logging.debug(f"Error adding trace task: {e}")
finally:
self.start_timer()

def collect_tasks(self):
global trace_manager_queue
tasks = []
while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
task = trace_manager_queue.get_nowait()
tasks.append(task)
trace_manager_queue.task_done()
return tasks

def run(self):
try:
tasks = self.collect_tasks()
if tasks:
self.send_to_celery(tasks)
except Exception as e:
logging.debug(f"Error processing trace tasks: {e}")

def start_timer(self):
global trace_manager_timer
if trace_manager_timer is None or not trace_manager_timer.is_alive():
trace_manager_timer = threading.Timer(
trace_manager_interval, self.run
)
trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
trace_manager_timer.daemon = False
trace_manager_timer.start()

def send_to_celery(self, tasks: list[TraceTask]):
with self.flask_app.app_context():
for task in tasks:
trace_info = task.execute()
task_data = {
"app_id": self.app_id,
"conversation_id": self.conversation_id,
"message_id": self.message_id,
"trace_info_type": type(trace_info).__name__,
"trace_info": trace_info.model_dump() if trace_info else {},
}
process_trace_tasks.delay(task_data)
4 changes: 2 additions & 2 deletions api/core/rag/retrieval/dataset_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from core.model_runtime.entities.message_entities import PromptMessageTool
from core.model_runtime.entities.model_entities import ModelFeature, ModelType
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.ops.ops_trace_manager import TraceTask, TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.utils import measure_time
from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.models.document import Document
Expand Down Expand Up @@ -357,7 +357,7 @@ def _on_retrival_end(
db.session.commit()

# get tracing instance
trace_manager = self.application_generate_entity.trace_manager if self.application_generate_entity else None
trace_manager: TraceQueueManager = self.application_generate_entity.trace_manager if self.application_generate_entity else None
if trace_manager:
trace_manager.add_trace_task(
TraceTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
memory = self._fetch_memory(node_data.memory, variable_pool, model_instance)

if set(model_schema.features or []) & {ModelFeature.TOOL_CALL, ModelFeature.MULTI_TOOL_CALL} \
and node_data.reasoning_mode == 'function_call':
and node_data.reasoning_mode == 'function_call':
# use function call
prompt_messages, prompt_message_tools = self._generate_function_call_prompt(
node_data, query, variable_pool, model_config, memory
Expand Down
2 changes: 1 addition & 1 deletion api/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fi

if [[ "${MODE}" == "worker" ]]; then
celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \
-Q ${CELERY_QUEUES:-dataset,generation,mail}
-Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace}
elif [[ "${MODE}" == "beat" ]]; then
celery -A app.celery beat --loglevel INFO
else
Expand Down
Loading