Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add prometheus metrics #11612

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,6 @@ CREATE_TIDB_SERVICE_JOB_ENABLED=false
# Maximum number of submitted thread count in a ThreadPool for parallel node execution
MAX_SUBMIT_COUNT=100
# Lockout duration in seconds
LOGIN_LOCKOUT_DURATION=86400
LOGIN_LOCKOUT_DURATION=86400

PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc_dir
16 changes: 15 additions & 1 deletion api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class SecurityConfig(BaseSettings):
SECRET_KEY: str = Field(
description="Secret key for secure session cookie signing."
"Make sure you are changing this key for your deployment with a strong key."
"Generate a strong key using `openssl rand -base64 42` or set via the `SECRET_KEY` environment variable.",
"Generate a strong key using `openssl rand -base64 42` "
"or set via the `SECRET_KEY` environment variable.",
default="",
)

Expand Down Expand Up @@ -767,6 +768,18 @@ class LoginConfig(BaseSettings):
)


class PrometheusConfig(BaseSettings):
HISTOGRAM_BUCKETS_1MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 1 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60],
)

HISTOGRAM_BUCKETS_5MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 5 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60, 120, 180, 300],
)


class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
Expand Down Expand Up @@ -794,6 +807,7 @@ class FeatureConfig(
WorkflowNodeExecutionConfig,
WorkspaceConfig,
LoginConfig,
PrometheusConfig,
# hosted services config
HostedServiceConfig,
CeleryBeatConfig,
Expand Down
11 changes: 9 additions & 2 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ def _process_stream_response(
conversation_id=self._conversation.id,
trace_manager=trace_manager,
)

self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand All @@ -421,6 +423,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
Expand All @@ -445,7 +450,9 @@ def _process_stream_response(
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
)

self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand Down
6 changes: 6 additions & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand All @@ -381,6 +384,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from prometheus_client import Counter, Histogram

from configs import dify_config

app_request = Counter(
name="app_request",
documentation="The total count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_failed = Counter(
name="app_request_failed",
documentation="The failed count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_latency = Histogram(
name="app_request_latency",
documentation="The latency of APP requests",
unit="seconds",
labelnames=["app_id", "tenant_id", "username"],
buckets=dify_config.HISTOGRAM_BUCKETS_5MIN,
)
app_input_tokens = Counter(
name="app_input_tokens",
documentation="The input tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_output_tokens = Counter(
name="app_output_tokens",
documentation="The output tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_total_tokens = Counter(
name="app_total_tokens",
documentation="The total tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
51 changes: 51 additions & 0 deletions api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
MessageEndStreamResponse,
StreamResponse,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
from core.model_manager import ModelInstance
Expand Down Expand Up @@ -251,6 +259,47 @@ def _wrapper_process_stream_response(
if publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)

def _chat_time_it(self, is_success: bool) -> None:
"""
Record chat / completion / agent run metrics.
"""
app_id = self._app_config.app_id
tenant_id = self._app_config.tenant_id
username = self._conversation.from_account_name
app_request.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()

if not is_success:
app_request_failed.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()
return
app_request_latency.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).observe(self._message.provider_response_latency)
app_input_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens)
app_output_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.answer_tokens)
app_total_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens + self._message.answer_tokens)

def _process_stream_response(
self, publisher: AppGeneratorTTSPublisher, trace_manager: Optional[TraceQueueManager] = None
) -> Generator[StreamResponse, None, None]:
Expand All @@ -265,6 +314,7 @@ def _process_stream_response(

if isinstance(event, QueueErrorEvent):
err = self._handle_error(event, self._message)
self._chat_time_it(is_success=False)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent | QueueMessageEndEvent):
Expand All @@ -283,6 +333,7 @@ def _process_stream_response(

# Save message
self._save_message(trace_manager)
self._chat_time_it(is_success=True)

yield self._message_end_to_stream_response()
elif isinstance(event, QueueRetrieverResourcesEvent):
Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@
WorkflowStartStreamResponse,
WorkflowTaskState,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.workflow_entry import WorkflowEntry
Expand Down Expand Up @@ -119,6 +128,33 @@ def _handle_workflow_run_start(self) -> WorkflowRun:

return workflow_run

def _workflow_time_it(
self, is_success: bool, graph_runtime_state: GraphRuntimeState, workflow_run: WorkflowRun
) -> None:
"""
Record advanced-chat / workflow run metrics.
"""
app_id = workflow_run.app_id
tenant_id = workflow_run.tenant_id
username = self._user.name
app_request.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()

if not is_success:
app_request_failed.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()
return
app_request_latency.labels(app_id=app_id, tenant_id=tenant_id, username=username).observe(
workflow_run.elapsed_time
)
app_input_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.prompt_tokens
)
app_output_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.completion_tokens
)
app_total_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.total_tokens
)

def _handle_workflow_run_success(
self,
workflow_run: WorkflowRun,
Expand Down
49 changes: 47 additions & 2 deletions api/core/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections.abc import Callable, Generator, Iterable, Sequence
from typing import IO, Any, Optional, Union, cast

from prometheus_client import Counter, Histogram

from configs import dify_config
from core.entities.embedding_type import EmbeddingInputType
from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
Expand All @@ -26,6 +28,25 @@

logger = logging.getLogger(__name__)

model_request_total_counter = Counter(
name="model_request_total_counter",
documentation="The total count of model requests",
labelnames=["model_type", "provider", "model", "method"],
)
model_request_failed_counter = Counter(
name="model_request_failed_counter",
documentation="The failed count of model requests",
labelnames=["model_type", "provider", "model", "method"],
)
model_request_latency = Histogram(
name="model_request_latency",
documentation="The latency of model requests. For the LLM model, it just indicate "
"the TTFT (a.k.a. Time To First Token).",
unit="seconds",
labelnames=["model_type", "provider", "model", "method"],
buckets=dify_config.HISTOGRAM_BUCKETS_1MIN,
)


class ModelInstance:
"""
Expand Down Expand Up @@ -298,6 +319,30 @@ def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Option
voice=voice,
)

def _invoke_with_timeit(self, function: Callable[..., Any], *args, **kwargs):
with model_request_latency.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).time():
model_request_total_counter.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).inc()
try:
return function(*args, **kwargs)
except Exception as e:
model_request_failed_counter.labels(
model_type=self.model_type_instance.model_type.value,
provider=self.provider,
model=self.model,
method=function.__name__ if hasattr(function, "__name__") else "unknown",
).inc()
raise e

def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
"""
Round-robin invoke
Expand All @@ -307,7 +352,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
:return:
"""
if not self.load_balancing_manager:
return function(*args, **kwargs)
return self._invoke_with_timeit(function, *args, **kwargs)

last_exception = None
while True:
Expand All @@ -321,7 +366,7 @@ def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs):
try:
if "credentials" in kwargs:
del kwargs["credentials"]
return function(*args, **kwargs, credentials=lb_config.credentials)
return self._invoke_with_timeit(function, *args, **kwargs, credentials=lb_config.credentials)
except InvokeRateLimitError as e:
# expire in 60 seconds
self.load_balancing_manager.cooldown(lb_config, expire=60)
Expand Down
Loading
Loading