mirror of
https://github.com/langgenius/dify.git
synced 2026-02-10 04:54:30 +08:00
**Problem:**
The telemetry system had unnecessary abstraction layers and bad practices
from the last 3 commits introducing the gateway implementation:
- TelemetryFacade class wrapper around emit() function
- String literals instead of SignalType enum
- Dictionary mapping enum → string instead of enum → enum
- Unnecessary ENTERPRISE_TELEMETRY_GATEWAY_ENABLED feature flag
- Duplicate guard checks scattered across files
- Non-thread-safe TelemetryGateway singleton pattern
- Missing guard in ops_trace_task.py causing RuntimeError spam
**Solution:**
1. Deleted TelemetryFacade - replaced with thin emit() function in core/telemetry/__init__.py
2. Added SignalType enum ('trace' | 'metric_log') to enterprise/telemetry/contracts.py
3. Replaced CASE_TO_TRACE_TASK_NAME dict with CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName]
4. Deleted is_gateway_enabled() and _emit_legacy() - using existing ENTERPRISE_ENABLED + ENTERPRISE_TELEMETRY_ENABLED instead
5. Extracted _should_drop_ee_only_event() helper to eliminate duplicate checks
6. Moved TelemetryGateway singleton to ext_enterprise_telemetry.py:
- Init once in init_app() for thread-safety
- Access via get_gateway() function
7. Re-added guard to ops_trace_task.py to prevent RuntimeError when EE=OFF but CE tracing enabled
8. Updated 11 caller files to import 'emit as telemetry_emit' instead of 'TelemetryFacade'
**Result:**
- 322 net lines deleted (533 removed, 211 added)
- All 91 tests pass
- Thread-safe singleton pattern
- Cleaner API surface: from TelemetryFacade.emit() to telemetry_emit()
- Proper enum usage throughout
- No RuntimeError spam in EE=OFF + CE=ON scenario
78 lines
3.0 KiB
Python
78 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
from typing import Any
|
|
|
|
from core.telemetry import TelemetryContext, TelemetryEvent, TraceTaskName
|
|
from core.telemetry import emit as telemetry_emit
|
|
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
|
|
from models.workflow import WorkflowNodeExecutionModel
|
|
|
|
|
|
def enqueue_draft_node_execution_trace(
|
|
*,
|
|
execution: WorkflowNodeExecutionModel,
|
|
outputs: Mapping[str, Any] | None,
|
|
workflow_execution_id: str | None,
|
|
user_id: str,
|
|
) -> None:
|
|
node_data = _build_node_execution_data(
|
|
execution=execution,
|
|
outputs=outputs,
|
|
workflow_execution_id=workflow_execution_id,
|
|
)
|
|
telemetry_emit(
|
|
TelemetryEvent(
|
|
name=TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
|
context=TelemetryContext(
|
|
tenant_id=execution.tenant_id,
|
|
user_id=user_id,
|
|
app_id=execution.app_id,
|
|
),
|
|
payload={"node_execution_data": node_data},
|
|
)
|
|
)
|
|
|
|
|
|
def _build_node_execution_data(
|
|
*,
|
|
execution: WorkflowNodeExecutionModel,
|
|
outputs: Mapping[str, Any] | None,
|
|
workflow_execution_id: str | None,
|
|
) -> dict[str, Any]:
|
|
metadata = execution.execution_metadata_dict
|
|
node_outputs = outputs if outputs is not None else execution.outputs_dict
|
|
execution_id = workflow_execution_id or execution.workflow_run_id or execution.id
|
|
|
|
return {
|
|
"workflow_id": execution.workflow_id,
|
|
"workflow_execution_id": execution_id,
|
|
"tenant_id": execution.tenant_id,
|
|
"app_id": execution.app_id,
|
|
"node_execution_id": execution.id,
|
|
"node_id": execution.node_id,
|
|
"node_type": execution.node_type,
|
|
"title": execution.title,
|
|
"status": execution.status,
|
|
"error": execution.error,
|
|
"elapsed_time": execution.elapsed_time,
|
|
"index": execution.index,
|
|
"predecessor_node_id": execution.predecessor_node_id,
|
|
"created_at": execution.created_at,
|
|
"finished_at": execution.finished_at,
|
|
"total_tokens": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
|
|
"total_price": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
|
|
"currency": metadata.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
|
|
"tool_name": (metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
|
|
if isinstance(metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
|
|
else None,
|
|
"iteration_id": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
|
|
"iteration_index": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
|
|
"loop_id": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
|
|
"loop_index": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
|
|
"parallel_id": metadata.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
|
|
"node_inputs": execution.inputs_dict,
|
|
"node_outputs": node_outputs,
|
|
"process_data": execution.process_data_dict,
|
|
}
|