From 5bcd3b6fe60546910c684b4647db3efd5d8c1356 Mon Sep 17 00:00:00 2001 From: Novice Date: Thu, 8 Jan 2026 17:35:42 +0800 Subject: [PATCH] feat: add mention node executor --- .../docs/variable_extraction_design.md | 1418 +++++++++++++++++ .../event_management/event_handlers.py | 69 + api/core/workflow/graph_events/node.py | 6 + api/core/workflow/nodes/base/__init__.py | 13 +- api/core/workflow/nodes/base/entities.py | 21 + api/core/workflow/nodes/base/node.py | 43 + .../nodes/base/virtual_node_executor.py | 213 +++ api/core/workflow/nodes/tool/tool_node.py | 35 +- api/tests/fixtures/pav-test-extraction.yml | 266 ++++ .../workflow/entities/test_virtual_node.py | 77 + .../use-workflow-node-finished.ts | 5 +- .../use-workflow-node-started.ts | 5 + 12 files changed, 2161 insertions(+), 10 deletions(-) create mode 100644 api/core/workflow/docs/variable_extraction_design.md create mode 100644 api/core/workflow/nodes/base/virtual_node_executor.py create mode 100644 api/tests/fixtures/pav-test-extraction.yml create mode 100644 api/tests/unit_tests/core/workflow/entities/test_virtual_node.py diff --git a/api/core/workflow/docs/variable_extraction_design.md b/api/core/workflow/docs/variable_extraction_design.md new file mode 100644 index 0000000000..8022d94766 --- /dev/null +++ b/api/core/workflow/docs/variable_extraction_design.md @@ -0,0 +1,1418 @@ +# Variable Extraction Design + +从 `list[PromptMessage]` 类型变量中通过 LLM 调用提取参数值的功能设计。 + +--- + +## 1. 概述 + +### 1.1 背景 + +目前 LLM 节点会输出 `context`,它是 `list[dict]` 类型,保存了当前对话的 prompt messages(不含 system message)。 + +```python +# LLM Node outputs +outputs = { + "text": "LLM response text", + "context": [ + {"role": "user", "text": "user input", "files": []}, + {"role": "assistant", "text": "assistant response", "files": []}, + ], + # ... +} +``` + +### 1.2 需求 + +允许其他节点(如工具节点)通过特殊语法引用 LLM 节点的 `context`,并附带一个 prompt,再次调用 LLM 来提取所需的参数值。 + +**使用场景示例**: + +``` +工具节点参数 = "@llm1.context | 提取关键词" + +执行流程: +1. 获取 llm1 节点的 context(对话历史) +2. 将 context + 提取 prompt 组合成新的 prompt messages +3. 调用 LLM 获取提取结果 +4. 将结果作为工具节点的参数值 +``` + +### 1.3 节点组概念 + +当 Tool 节点使用了 `@llm1.context` 时,Tool 节点变成一个"节点组": + +``` +┌─────────────────────────────────────────────────────┐ +│ Tool 节点组 (tool1) │ +│ │ +│ ┌───────────────────────────────────────────────┐ │ +│ │ Extraction 子节点 (tool1_ext_1) │ │ +│ │ - 有独立的 node_id │ │ +│ │ - 有独立的日志和流式输出 │ │ +│ │ - 输出存入 variable_pool │ │ +│ └───────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────────────┐ │ +│ │ Tool 主节点 (tool1) │ │ +│ │ - 使用 extraction 的输出作为参数 │ │ +│ │ - 有自己的日志和输出 │ │ +│ └───────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────┘ +``` + +--- + +## 2. 现有调用链分析 + +### 2.1 Graph Engine 执行流程 + +``` +GraphEngine.run() + │ + ▼ +┌───────────────────────────────────────────────────────────────────┐ +│ WorkerPool │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ Worker Thread │ │ +│ │ │ │ +│ │ Worker._execute_node(node) │ │ +│ │ │ │ │ +│ │ ├─ node.run() │ │ +│ │ │ │ │ │ +│ │ │ ├─ yield NodeRunStartedEvent │ │ +│ │ │ ├─ yield NodeRunStreamChunkEvent (多次) │ │ +│ │ │ └─ yield NodeRunSucceededEvent │ │ +│ │ │ │ │ +│ │ └─ for event in node.run(): │ │ +│ │ event_queue.put(event) ──────────────────────┐ │ │ +│ │ │ │ │ +│ └───────────────────────────────────────────────────────────│─┘ │ +└──────────────────────────────────────────────────────────────│────┘ + │ + ┌──────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────┐ +│ Dispatcher Thread │ +│ │ +│ _dispatcher_loop(): │ +│ while True: │ +│ event = event_queue.get() │ +│ event_handler.dispatch(event) │ +│ │ +└───────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────┐ +│ EventHandler.dispatch(event) │ +│ │ +│ ┌─ NodeRunStartedEvent ─────────────────────────────────────┐ │ +│ │ → event_collector.collect(event) │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─ NodeRunStreamChunkEvent ─────────────────────────────────┐ │ +│ │ → response_coordinator.intercept_event(event) │ │ +│ │ → event_collector.collect(stream_events) │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─ NodeRunSucceededEvent ───────────────────────────────────┐ │ +│ │ → _store_node_outputs(node_id, outputs) │ │ +│ │ └─ variable_pool.add((node_id, var_name), value) │ │ +│ │ → response_coordinator.intercept_event(event) │ │ +│ │ → edge_processor.process_node_success(node_id) │ │ +│ │ └─ ready_queue.put(next_nodes) │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ │ +└───────────────────────────────────────────────────────────────────┘ +``` + +### 2.2 关键点 + +1. **事件驱动**:节点通过 yield 事件与引擎通信 +2. **Variable Pool 写入时机**:在 `NodeRunSucceededEvent` 处理时,outputs 被写入 variable_pool +3. **事件收集**:所有事件都通过 `event_collector.collect()` 收集,最终返回给调用方 + +--- + +## 3. 节点内嵌子节点设计 + +### 3.1 设计原则 + +**核心思想**:虚拟节点本质上就是一个完整的节点(如 LLM 节点),应该用完整的节点配置来定义,而不是把配置塞到其他地方。 + +**方案**:在节点配置中添加 `virtual_nodes` 字段,定义该节点依赖的子节点列表。子节点是完整的节点定义,执行时先执行子节点,再执行主节点。 + +### 3.2 DSL 设计 + +```yaml +nodes: + - id: tool1 + type: tool + data: + # 虚拟子节点列表 - 完整的节点定义 + virtual_nodes: + - id: ext_1 # 局部 ID,实际会变成 tool1.ext_1 + type: llm # 就是一个完整的 LLM 节点! + data: + title: "提取关键词" + model: + provider: openai + name: gpt-4o-mini + mode: chat + prompt_template: + - role: user + text: "{{#llm1.context#}}" # 引用上游 context + - role: user + text: "请提取关键词,只返回关键词本身" + + # 主节点参数引用子节点输出 + tool_parameters: + query: + type: variable + value: [ext_1, text] # 引用子节点输出 +``` + +### 3.3 完整示例 + +```yaml +nodes: + # 上游 LLM 节点 + - id: llm1 + type: llm + data: + model: + provider: openai + name: gpt-4 + prompt_template: + - role: user + text: "{{#start.query#}}" + + # Tool 节点 - 包含虚拟子节点 + - id: tool1 + type: tool + data: + # 子节点列表 + virtual_nodes: + - id: ext_1 + type: llm + data: + title: "提取搜索关键词" + model: + provider: openai + name: gpt-4o-mini + prompt_template: + - role: user + text: "{{#llm1.context#}}" + - role: user + text: "请从对话中提取用户想要搜索的关键词" + + - id: ext_2 + type: llm + data: + title: "提取搜索范围" + model: + provider: openai + name: gpt-4o-mini + prompt_template: + - role: user + text: "{{#llm1.context#}}" + - role: user + text: "请提取用户想要的搜索范围(如:最近一周)" + + # 主节点配置 + tool_name: google_search + tool_parameters: + query: + type: variable + value: [ext_1, text] # 引用子节点 ext_1 的输出 + time_range: + type: variable + value: [ext_2, text] # 引用子节点 ext_2 的输出 + limit: + type: constant + value: 10 +``` + +### 3.4 子节点 ID 规则 + +子节点的局部 ID 会被转换为全局 ID: + +| 局部 ID | 父节点 ID | 全局 ID | +|---------|-----------|---------| +| `ext_1` | `tool1` | `tool1.ext_1` | +| `ext_2` | `tool1` | `tool1.ext_2` | + +子节点引用使用局部 ID:`[ext_1, text]` + +### 3.5 实体定义 + +```python +# core/workflow/entities/virtual_node.py + +from pydantic import BaseModel +from typing import Any + + +class VirtualNodeConfig(BaseModel): + """Configuration for a virtual sub-node""" + + # Local ID within parent node (e.g., "ext_1") + id: str + + # Node type (e.g., "llm", "code") + type: str + + # Full node data configuration + data: dict[str, Any] + + +# core/workflow/nodes/base/entities.py + +class BaseNodeData(BaseModel): + """Base class for all node data""" + + title: str + desc: str | None = None + # ... existing fields ... + + # Virtual sub-nodes + virtual_nodes: list[VirtualNodeConfig] = [] +``` + +### 3.6 支持的节点类型 + +以下节点需要输出 `context` 变量以支持 extraction: + +| 节点类型 | NodeType | context 来源 | 模型配置位置 | +| ------------------- | ------------------------------ | ----------------------- | ---------------------------------- | +| LLM | `NodeType.LLM` | 已实现 `_build_context` | `node_data.model` | +| Agent | `NodeType.AGENT` | 需要添加 | `agent_parameters` 中的 model 参数 | +| Question Classify | `NodeType.QUESTION_CLASSIFIER` | 需要添加 | `node_data.model` | +| Parameter Extractor | `NodeType.PARAMETER_EXTRACTOR` | 需要添加 | `node_data.model` | + +**context 结构**(统一格式): + +```python +context = [ + {"role": "user", "text": "用户输入", "files": []}, + {"role": "assistant", "text": "模型回复", "files": []}, +] +``` + +--- + +## 4. 执行流程 + +### 4.1 节点内嵌子节点执行流程 + +``` +Tool 节点组执行 + │ + ├─ node.run() 被调用 + │ + ├─ Step 1: 执行虚拟子节点 + │ │ + │ │ 遍历 node_data.virtual_nodes + │ │ + │ │ ┌─────────────────────────────────────────────────────────┐ + │ │ │ 虚拟节点 (tool1.ext_1) │ + │ │ │ + │ │ yield NodeRunStartedEvent (tool1_ext_1, type=LLM) │ + │ │ yield NodeRunStreamChunkEvent (tool1_ext_1, chunk) │ + │ │ yield NodeRunSucceededEvent (tool1_ext_1, outputs) │ + │ │ │ + │ │ → variable_pool.add((tool1_ext_1, "text"), result) │ + │ └─────────────────────────────────────────────────────────┘ + │ + ├─ Tool 参数解析:使用 {{#tool1_ext_1.text#}} 替代原 @llm1.context + │ + │ ┌─────────────────────────────────────────────────────────┐ + │ │ Tool 主节点 (tool1) │ + │ │ │ + │ │ yield NodeRunStartedEvent (tool1) │ + │ │ yield NodeRunStreamChunkEvent (tool1, tool output) │ + │ │ yield NodeRunSucceededEvent (tool1, outputs) │ + │ └─────────────────────────────────────────────────────────┘ + │ + └─ 完成 +``` + +**优点**: + +- 虚拟节点有独立的 node_id,有独立的日志 +- 虚拟节点的 outputs 存入 variable_pool,可被其他节点引用 +- UI 可以清晰展示两个独立的执行过程 + +**缺点**: + +- 实现稍复杂 +- 需要处理虚拟节点的 ID 生成和关联 + +### 4.2 推荐方案:思路 B + +采用虚拟节点方案,因为: + +1. 符合你说的"节点组"概念 +2. 两个调用都有独立的日志和输出 +3. 更清晰的执行边界 + +### 4.3 执行位置选择 + +在节点 \_run() 方法开始时(推荐) + +```python +# tool_node.py +def _run(self) -> Generator[NodeEventBase, None, None]: + # Step 1: 预处理 - 执行所有 extraction + extraction_results = yield from self._process_extractions() + + # Step 2: 使用 extraction 结果生成参数 + parameters = self._generate_parameters(extraction_results) + + # Step 3: 执行 tool 调用 + ... +``` + +**优点**: + +- 可以 yield 事件 +- 在节点控制范围内 +- 清晰的执行顺序 + +## 5. 详细执行流程 + +### 5.1 完整调用链 + +用户定义的 Tool 节点参数(结构化配置): + +```yaml +# Tool 节点配置 +- id: tool1 + type: tool + data: + tool_name: google_search + inputs: + # extraction 类型输入 + - name: query + type: extraction + value: + source_node_id: llm1 + source_variable: context + extraction_prompt: "提取关键词" + # model 不指定,自动继承 llm1 的模型配置 +``` + +执行流程: + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Worker Thread │ +│ │ +│ Worker._execute_node(tool_node) │ +│ │ │ +│ └─ for event in tool_node.run(): │ +│ event_queue.put(event) │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ ToolNode.run() │ +│ │ +│ ┌───────────────────────────────────────────────────────────────────────┐ │ +│ │ Step 1: 预处理 - 发现并执行 extractions │ │ +│ │ │ │ +│ │ yield from self._process_extractions() │ │ +│ │ │ │ │ +│ │ ├─ 解析参数,发现 type=extraction 的 input │ │ +│ │ │ │ │ +│ │ ├─ 创建虚拟节点 ID: "tool1_ext_1" │ │ +│ │ │ │ │ +│ │ ├─ yield NodeRunStartedEvent( │ │ +│ │ │ node_id="tool1_ext_1", │ │ +│ │ │ node_type=NodeType.LLM, │ │ +│ │ │ node_title="Extraction: 提取关键词" │ │ +│ │ │ ) │ │ +│ │ │ │ │ +│ │ ├─ 获取 llm1.context 并构建 prompt_messages │ │ +│ │ │ │ │ +│ │ ├─ 调用 LLM (流式) │ │ +│ │ │ for chunk in llm_invoke(): │ │ +│ │ │ yield NodeRunStreamChunkEvent( │ │ +│ │ │ node_id="tool1_ext_1", │ │ +│ │ │ selector=["tool1_ext_1", "text"], │ │ +│ │ │ chunk=chunk │ │ +│ │ │ ) │ │ +│ │ │ │ │ +│ │ ├─ yield NodeRunSucceededEvent( │ │ +│ │ │ node_id="tool1_ext_1", │ │ +│ │ │ outputs={"text": "关键词A, 关键词B"} │ │ +│ │ │ ) │ │ +│ │ │ │ │ +│ │ └─ 返回 extraction_results = {"tool1_ext_1": "关键词A, 关键词B"} │ │ +│ │ │ │ +│ └───────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌───────────────────────────────────────────────────────────────────────┐ │ +│ │ Step 2: 主节点执行 │ │ +│ │ │ │ +│ │ yield NodeRunStartedEvent( │ │ +│ │ node_id="tool1", │ │ +│ │ node_type=NodeType.TOOL │ │ +│ │ ) │ │ +│ │ │ │ +│ │ parameters = _generate_parameters(extraction_results) │ │ +│ │ # param = "关键词A, 关键词B" │ │ +│ │ │ │ +│ │ tool.invoke(parameters) │ │ +│ │ for chunk in tool_output: │ │ +│ │ yield NodeRunStreamChunkEvent( │ │ +│ │ node_id="tool1", │ │ +│ │ selector=["tool1", "text"], │ │ +│ │ chunk=chunk │ │ +│ │ ) │ │ +│ │ │ │ +│ │ yield NodeRunSucceededEvent( │ │ +│ │ node_id="tool1", │ │ +│ │ outputs={"text": "tool output..."} │ │ +│ │ ) │ │ +│ │ │ │ +│ └───────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Dispatcher Thread │ +│ │ +│ 收到事件序列: │ +│ │ +│ 1. NodeRunStartedEvent(node_id="tool1_ext_1") │ +│ → event_collector.collect() │ +│ │ +│ 2. NodeRunStreamChunkEvent(node_id="tool1_ext_1", chunk="关键词") │ +│ → response_coordinator → event_collector.collect() │ +│ │ +│ 3. NodeRunSucceededEvent(node_id="tool1_ext_1", outputs={...}) │ +│ → _store_node_outputs("tool1_ext_1", outputs) │ +│ └─ variable_pool.add(("tool1_ext_1", "text"), "关键词A, 关键词B") │ +│ → event_collector.collect() │ +│ 注意:不触发 edge_processor,因为这是虚拟节点 │ +│ │ +│ 4. NodeRunStartedEvent(node_id="tool1") │ +│ → event_collector.collect() │ +│ │ +│ 5. NodeRunStreamChunkEvent(node_id="tool1", chunk="tool output") │ +│ → response_coordinator → event_collector.collect() │ +│ │ +│ 6. NodeRunSucceededEvent(node_id="tool1", outputs={...}) │ +│ → _store_node_outputs("tool1", outputs) │ +│ → edge_processor.process_node_success("tool1") │ +│ └─ ready_queue.put(next_nodes) │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### 5.2 关键问题:虚拟节点的事件处理 + +虚拟节点(如 `tool1_ext_1`)的事件需要特殊处理: + +```python +# EventHandler 需要区分虚拟节点和真实节点 +def _(self, event: NodeRunSucceededEvent) -> None: + # 存储输出到 variable_pool(虚拟节点也需要) + self._store_node_outputs(event.node_id, event.node_run_result.outputs) + + # 检查是否是虚拟节点(通过 node_id 格式判断:包含 _ext_) + if self._is_virtual_node(event.node_id): + # 虚拟节点不触发边处理,只收集事件 + self._event_collector.collect(event) + return + + # 真实节点:触发边处理,推进工作流 + ready_nodes = self._edge_processor.process_node_success(event.node_id) + ... + +def _is_virtual_node(self, node_id: str) -> bool: + """Check if node_id represents a virtual extraction node.""" + return "_ext_" in node_id +``` + +### 5.3 虚拟节点 ID 命名规则 + +```python +def _generate_extraction_node_id( + parent_node_id: str, + extraction_index: int, +) -> str: + """ + Generate unique ID for extraction virtual node. + + Format: {parent_node_id}_ext_{index} + Example: tool1_ext_1, tool1_ext_2 + """ + return f"{parent_node_id}_ext_{extraction_index}" +``` + +### 5.4 ExtractionExecutor 详细设计 + +**设计原则**: + +1. **直接实例化并运行 LLMNode**:创建真正的 LLMNode 实例并调用 `run()` +2. **完全复用节点逻辑**:LLMNode 的 `_run()`、Node 基类的 `run()` 和异常处理全部复用 +3. **通过重新实例化实现重试**:失败时重新创建 LLMNode 实例并再次运行 +4. **自动获得所有能力**:token 统计、流式输出、完整的 NodeRunResult 格式 + +```python +# core/workflow/nodes/base/extraction_executor.py + +class ExtractionExecutor: + """ + Executes LLM calls for extracting values from PromptMessage-type variables. + + This executor directly instantiates LLMNode instances, fully reusing: + - LLMNode's _run() logic + - Node base class's run() method and exception handling + - All events and token statistics + + Retry is implemented at this level by re-instantiating and re-running the node. + """ + + def __init__( + self, + *, + variable_pool: VariablePool, + graph_config: Mapping[str, Any], + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + parent_node_id: str, + parent_retry_config: RetryConfig | None = None, + ): + # Store graph context for creating LLMNode instances + self._graph_init_params = graph_init_params + self._graph_runtime_state = graph_runtime_state + # ... + + def _execute_single_extraction( + self, + spec: VariableExtractionSpec, + ext_node_id: str, + ) -> Generator[GraphNodeEventBase, None, str]: + """ + Execute a single extraction by instantiating and running a real LLMNode. + """ + # Create LLMNode instance with minimal config + llm_node = self._create_llm_node( + ext_node_id=ext_node_id, + context=context, + extraction_prompt=spec.extraction_prompt, + model_config=model_config, + spec=spec, + ) + + # Run the node and collect events - FULLY REUSES LLMNode's logic! + for event in llm_node.run(): + # Mark events as virtual + event = self._mark_event_as_virtual(event, spec) + yield event + + if isinstance(event, NodeRunSucceededEvent): + result_text = event.node_run_result.outputs.get("text", "") + elif isinstance(event, NodeRunFailedEvent): + raise LLMInvocationError(Exception(event.error)) + + return result_text + + def _create_llm_node(self, ...) -> LLMNode: + """ + Create a real LLMNode instance for extraction. + Constructs minimal required configuration. + """ + # Build prompt template from context + extraction prompt + prompt_template = [...] # LLMNodeChatModelMessage list + + # Create LLMNode with full graph context + llm_node = LLMNode( + id=ext_node_id, + config=node_config, + graph_init_params=self._graph_init_params, + graph_runtime_state=self._graph_runtime_state, + ) + return llm_node + + def _execute_with_retry(self, spec, ext_node_id) -> Generator[...]: + """ + Retry by re-instantiating and re-running the LLMNode. + """ + for attempt in range(retry_config.max_retries + 1): + try: + return (yield from self._execute_single_extraction(spec, ext_node_id)) + except Exception as e: + if attempt < retry_config.max_retries: + yield NodeRunRetryEvent(...) + time.sleep(retry_config.retry_interval_seconds) + continue + raise +``` + +--- + +## 6. 事件设计 + +### 6.1 复用现有事件类型 + +采用虚拟节点方案后,**不需要新增事件类型**。虚拟节点直接使用现有的: + +- `NodeRunStartedEvent` +- `NodeRunStreamChunkEvent` +- `NodeRunSucceededEvent` +- `NodeRunFailedEvent` + +**区分虚拟节点的方式**:在 `NodeRunStartedEvent` 中添加可选字段: + +```python +# core/workflow/graph_events/node.py + +class NodeRunStartedEvent(GraphNodeEventBase): + node_title: str + predecessor_node_id: str | None = None + agent_strategy: AgentNodeStrategyInit | None = None + start_at: datetime = Field(..., description="node start time") + + # Existing fields for ToolNode + provider_type: str = "" + provider_id: str = "" + + # NEW: Virtual node fields for extraction + is_virtual: bool = False + parent_node_id: str | None = None + extraction_source: str | None = None # e.g., "llm1.context" + extraction_prompt: str | None = None +``` + +**字段说明**: + +| 字段 | 类型 | 说明 | +| ------------------- | ------------- | ------------------------------ | +| `is_virtual` | `bool` | 是否为虚拟节点,默认 `False` | +| `parent_node_id` | `str \| None` | 父节点 ID,如 `"tool1"` | +| `extraction_source` | `str \| None` | 提取来源,如 `"llm1.context"` | +| `extraction_prompt` | `str \| None` | 提取 prompt,如 `"提取关键词"` | + +### 6.2 事件序列示例 + +前端收到的事件序列: + +``` +1. NodeRunStartedEvent + - node_id: "tool1_ext_1" + - node_type: NodeType.LLM + - node_title: "Extraction: 提取关键词" + - is_virtual: true + - parent_node_id: "tool1" + - extraction_source: "llm1.context" + - extraction_prompt: "提取关键词" + +2. NodeRunStreamChunkEvent + - node_id: "tool1_ext_1" + - selector: ["tool1_ext_1", "text"] + - chunk: "关键词" + +3. NodeRunSucceededEvent + - node_id: "tool1_ext_1" + - outputs: {"text": "关键词A, 关键词B"} + +4. NodeRunStartedEvent + - node_id: "tool1" + - node_type: NodeType.TOOL + - node_title: "Search Tool" + - is_virtual: false + +5. NodeRunStreamChunkEvent + - node_id: "tool1" + - selector: ["tool1", "text"] + - chunk: "search result..." + +6. NodeRunSucceededEvent + - node_id: "tool1" + - outputs: {"text": "..."} +``` + +### 6.3 前端展示建议 + +前端可以根据 `is_virtual` 和 `parent_node_id` 字段: + +1. **嵌套展示**:将虚拟节点的输出显示在父节点内部 +2. **分开展示**:作为独立的节点展示,但用 UI 标识关联关系 +3. **折叠展示**:默认折叠虚拟节点,可展开查看详情 + +--- + +## 7. 日志与记录 + +### 7.1 虚拟节点的 NodeRunResult + +虚拟节点有独立的 `NodeRunResult`,结构与普通 LLM 节点一致: + +```python +NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs={ + "context_source": "llm1.context", + "extraction_prompt": "提取关键词", + }, + process_data={ + "source": "llm1.context", + "prompt": "提取关键词", + "model_mode": "chat", + "prompts": [ + {"role": "user", "text": "原始用户输入"}, + {"role": "assistant", "text": "原始助手回复"}, + {"role": "user", "text": "提取关键词"}, + ], + "usage": { + "prompt_tokens": 100, + "completion_tokens": 20, + "total_tokens": 120, + }, + }, + outputs={ + "text": "关键词A, 关键词B", + }, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 120, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 0.0001, + WorkflowNodeExecutionMetadataKey.CURRENCY: "USD", + }, + llm_usage=LLMUsage( + prompt_tokens=100, + completion_tokens=20, + total_tokens=120, + ), +) +``` + +### 7.2 父节点的 process_data + +父节点(如 ToolNode)可以在 `process_data` 中记录关联的虚拟节点: + +```python +process_data = { + # ... existing fields + "extraction_nodes": ["tool1_ext_1", "tool1_ext_2"], +} +``` + +### 7.3 数据库记录 + +虚拟节点的执行记录会被保存到 `workflow_node_executions` 表: + +| 字段 | 值 | +| ----------- | ----------------------------------------- | +| `node_id` | `"tool1_ext_1"` | +| `node_type` | `"llm"` | +| `title` | `"Extraction: 提取关键词..."` | +| `inputs` | `{"context_source": "llm1.context", ...}` | +| `outputs` | `{"text": "关键词A, 关键词B"}` | +| `status` | `"succeeded"` | + +前端可以通过 `node_id` 中的 `_ext_` 识别虚拟节点,并关联到父节点。 + +--- + +## 8. 集成示例 + +### 8.1 ToolNode 集成 + +```python +# core/workflow/nodes/tool/tool_node.py + +from core.workflow.nodes.base.extraction_executor import ExtractionExecutor + + +class ToolNode(Node[ToolNodeData]): + + def _run(self) -> Generator[NodeEventBase, None, None]: + # Step 1: 创建 ExtractionExecutor(传入父节点的 retry_config) + extraction_executor = ExtractionExecutor( + variable_pool=self.graph_runtime_state.variable_pool, + graph_config=self.graph_config, + graph_init_params=self._graph_init_params, + graph_runtime_state=self.graph_runtime_state, + parent_node_id=self._node_id, + parent_retry_config=self.retry_config, # 继承父节点的重试配置 + ) + + # Step 2: 查找所有 extraction 类型的 inputs + specs = extraction_executor.find_extractions(self.node_data.model_dump()) + + # Step 3: 执行 extractions(yield 虚拟节点事件,包括重试事件) + extraction_results: dict[str, str] = {} + if specs: + try: + extraction_results = yield from extraction_executor.process_extractions(specs) + except ExtractionError as e: + # ExtractionExecutor 已 yield 了 NodeRunFailedEvent + # 根据父节点的 error_strategy 决定如何处理 + if self.error_strategy == ErrorStrategy.DEFAULT_VALUE: + extraction_results = self._get_default_extraction_values(specs) + else: + raise + + # Step 4: 生成参数(使用 extraction 结果作为对应 input 的值) + parameters = self._generate_parameters_with_extractions( + tool_parameters=tool_parameters, + extraction_results=extraction_results, + ) + + # Step 5: 继续正常的 tool 调用流程... + ... + + def _generate_parameters_with_extractions( + self, + *, + tool_parameters: Sequence[ToolParameter], + extraction_results: dict[str, str], # input_name -> extracted_value + ) -> dict[str, Any]: + """Generate parameters, using extraction results for extraction-type inputs.""" + result: dict[str, Any] = {} + + for parameter_name, tool_input in self.node_data.tool_parameters.items(): + # Check if this input is an extraction type (result already in extraction_results) + if parameter_name in extraction_results: + result[parameter_name] = extraction_results[parameter_name] + + elif tool_input.type in {"mixed", "constant"}: + template = str(tool_input.value) + resolved = self.graph_runtime_state.variable_pool.convert_template(template).text + result[parameter_name] = resolved + + elif tool_input.type == "variable": + variable = self.graph_runtime_state.variable_pool.get(tool_input.value) + result[parameter_name] = variable.value if variable else None + + return result + + def _get_default_extraction_values( + self, + specs: list[VariableExtractionSpec], + ) -> dict[str, str]: + """Return default values for failed extractions.""" + return {spec.input_name: "" for spec in specs} +``` + +### 8.2 通用基类集成(可选方案) + +如果多个节点类型都需要支持 extraction,可以在基类中统一处理: + +```python +# core/workflow/nodes/base/node.py + +class Node(Generic[NodeDataT]): + + def run(self) -> Generator[GraphNodeEventBase, None, None]: + # Step 1: 预处理 extractions(如果有) + extraction_results = yield from self._preprocess_extractions() + + # Step 2: 正常执行 + execution_id = self.ensure_execution_id() + # ...existing logic... + + def _preprocess_extractions(self) -> Generator[GraphNodeEventBase, None, dict[str, str]]: + """ + Override in subclasses that support extraction. + Default implementation returns empty dict. + """ + return {} + + def _supports_extraction(self) -> bool: + """Override to return True if node supports extraction.""" + return False +``` + +### 8.3 为其他节点添加 context 输出 + +以下节点需要在 outputs 中添加 `context`: + +```python +# core/workflow/nodes/question_classifier/question_classifier_node.py + +def _run(self) -> NodeRunResult: + # ...existing logic... + + outputs = { + "class_name": result.class_name, + # NEW: Add context for extraction support + "context": self._build_context(prompt_messages, result.text), + } + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs=outputs, + ) +``` + +```python +# core/workflow/nodes/parameter_extractor/parameter_extractor_node.py + +def _run(self) -> NodeRunResult: + # ...existing logic... + + outputs = { + **extracted_parameters, + # NEW: Add context for extraction support + "context": self._build_context(prompt_messages, assistant_response), + } +``` + +**注意**:`_build_context` 方法可以从 `LLMNode` 中提取为公共函数,或者直接复用: + +```python +# core/workflow/nodes/llm/llm_utils.py + +def build_context( + prompt_messages: Sequence[PromptMessage], + assistant_response: str, + model_mode: str, +) -> list[dict[str, Any]]: + """ + Build context from prompt messages and assistant response. + Excludes system messages and includes the current LLM response. + """ + context_messages = [m for m in prompt_messages if m.role != PromptMessageRole.SYSTEM] + context_messages.append(AssistantPromptMessage(content=assistant_response)) + return PromptMessageUtil.prompt_messages_to_prompt_for_saving( + model_mode=model_mode, prompt_messages=context_messages + ) +``` + +--- + +## 9. 配置选项 + +### 9.1 模型配置策略 + +提取调用使用的模型,按优先级: + +| 优先级 | 来源 | 说明 | +| ------ | ---------- | ------------------------------------ | +| 1 | 显式指定 | `extraction.value.model` 配置 | +| 2 | 源节点配置 | 继承 `source_node_id` 节点的模型配置 | + +### 9.2 ExtractionModelConfig 使用 + +```python +# 在 ExtractionExecutor 中获取模型配置 + +def _get_model_config(self, spec: VariableExtractionSpec) -> dict: + # 如果显式指定了 model,使用它 + if spec.model: + return { + "provider": spec.model.provider, + "name": spec.model.name, + "mode": spec.model.mode.value, + "completion_params": spec.model.completion_params, + } + + # 否则继承源节点的模型配置 + source_model_config = self._get_source_node_model_config(spec.source_node_id) + if source_model_config is None: + raise ModelConfigNotFoundError(spec.source_node_id, spec.source_variable) + + return source_model_config +``` + +### 9.3 模型配置示例 + +**场景 1:继承源节点配置(推荐)** + +```yaml +# 节点配置 +inputs: + - name: query + type: extraction + value: + source_node_id: llm1 + source_variable: context + extraction_prompt: "提取关键词" + # 不指定 model,自动继承 llm1 的模型配置 + +# llm1 节点配置 +data: + model: + provider: openai + name: gpt-4 + mode: chat + completion_params: + temperature: 0.7 +# 结果:使用 openai/gpt-4 +``` + +**场景 2:显式指定模型** + +```yaml +# 节点配置 +inputs: + - name: query + type: extraction + value: + source_node_id: llm1 + source_variable: context + extraction_prompt: "提取关键词" + model: + provider: openai + name: gpt-4o-mini + mode: chat + completion_params: + temperature: 0.3 +# 结果:使用 openai/gpt-4o-mini(忽略源节点配置) +``` + +--- + +## 10. 错误处理与重试机制 + +### 10.1 设计考量 + +**重要说明**:虚拟节点(Extraction 节点)的重试机制**无法**直接复用现有的节点级别重试机制。 + +原因分析: + +- Worker 从 `ready_queue` 取节点时,通过 `graph.nodes[node_id]` 获取节点实例 +- 虚拟节点不在 `graph.nodes` 中 +- `ErrorHandler._handle_retry()` 无法找到虚拟节点进行重试 + +因此,**ExtractionExecutor 需要在内部实现重试逻辑**。 + +### 10.2 错误类型 + +```python +# core/workflow/nodes/base/extraction_errors.py + +class ExtractionError(Exception): + """Base exception for extraction operations""" + pass + + +class VariableNotFoundError(ExtractionError): + """Source variable not found in variable pool""" + + def __init__(self, selector: list[str]): + self.selector = selector + super().__init__(f"Variable {'.'.join(selector)} not found in variable pool") + + +class InvalidVariableTypeError(ExtractionError): + """Source variable is not a valid context type (list[dict])""" + + def __init__(self, selector: list[str], actual_type: type): + self.selector = selector + self.actual_type = actual_type + super().__init__( + f"Variable {'.'.join(selector)} is not a list type, got {actual_type.__name__}" + ) + + +class SourceNodeNotFoundError(ExtractionError): + """Source node not found in graph config""" + + def __init__(self, node_id: str): + self.node_id = node_id + super().__init__(f"Source node {node_id} not found in graph config") + + +class LLMInvocationError(ExtractionError): + """LLM invocation failed during extraction""" + + def __init__(self, original_error: Exception): + self.original_error = original_error + super().__init__(f"LLM invocation failed: {original_error}") +``` + +### 10.3 内部重试机制 + +虚拟节点的重试在 `ExtractionExecutor` 内部处理,继承父节点的 `retry_config`: + +```python +# ExtractionExecutor 的重试实现 + +def _execute_single_extraction_with_retry( + self, + spec: VariableExtractionSpec, + ext_node_id: str, +) -> Generator[..., None, tuple[str, LLMUsage]]: + """ + Execute extraction with internal retry support. + + Retry config is inherited from parent node. + """ + retry_config = self._parent_retry_config + last_error: Exception | None = None + + for attempt in range(retry_config.max_retries + 1): + try: + return (yield from self._execute_single_extraction(spec, ext_node_id)) + except LLMInvocationError as e: + last_error = e + + if attempt < retry_config.max_retries: + # Yield retry event for frontend display + yield NodeRunRetryEvent( + id=str(uuid4()), + node_id=ext_node_id, + node_type=NodeType.LLM, + node_title=f"Extraction: {spec.extraction_prompt[:30]}...", + start_at=self._start_time, + error=str(e), + retry_index=attempt + 1, + ) + + # Wait for retry interval + time.sleep(retry_config.retry_interval_seconds) + continue + + # Max retries exceeded, raise + raise + + # Should not reach here, but for type safety + raise last_error or LLMInvocationError(Exception("Unknown error")) +``` + +### 10.4 错误传播 + +```python +# ToolNode 中的错误处理示例 + +def _run(self) -> Generator[NodeEventBase, None, None]: + try: + # 执行 extractions(内部已处理重试) + extraction_results = yield from extraction_executor.process_extractions(specs) + except ExtractionError as e: + # 虚拟节点已 yield 了 NodeRunFailedEvent + # 异常传播到父节点,由父节点的 error_strategy 决定后续处理 + if self.error_strategy == ErrorStrategy.DEFAULT_VALUE: + extraction_results = self._get_default_extraction_values(specs) + else: + raise # 终止执行 + + # 继续执行... +``` + +### 10.5 为什么不能复用节点级别重试 + +节点级别的重试流程: + +``` +Worker 执行节点 + → 失败 → NodeRunFailedEvent + → Dispatcher → EventHandler + → ErrorHandler._handle_retry() + → 检查 graph.nodes[node_id] ← 虚拟节点不存在! + → 重新入队 ready_queue +``` + +虚拟节点不在 `graph.nodes` 中,无法进入此流程。因此重试必须在 ExtractionExecutor 内部完成。 + +--- + +## 11. 设计决策 + +### 11.1 模型配置 + +**决定:使用结构化配置,可选显式指定模型** + +**配置方式**: + +```yaml +# 继承源节点模型(推荐) +- name: query + type: extraction + value: + source_node_id: llm1 + source_variable: context + extraction_prompt: "提取关键词" + +# 显式指定模型 +- name: summary + type: extraction + value: + source_node_id: agent1 + source_variable: context + extraction_prompt: "总结对话" + model: + provider: openai + name: gpt-4o-mini +``` + +**优先级**: + +1. 如果 `extraction.value.model` 存在,使用指定的模型 +2. 否则,继承源节点的模型配置 + +**模型配置字段**: + +| 字段 | 说明 | 来源 | +| ------------------- | ---------- | ------------------------- | +| `provider` | 模型提供商 | 显式指定 或 源节点配置 | +| `name` | 模型名称 | 显式指定 或 源节点配置 | +| `mode` | LLM 模式 | 默认 `chat` 或 源节点配置 | +| `completion_params` | 推理参数 | 显式指定 或 源节点配置 | + +### 11.2 Token 计费 + +**决定:A - 虚拟节点独立计费** + +虚拟节点有独立的 `NodeRunResult`,token 消耗记录在虚拟节点的 `metadata` 中。 + +### 11.3 context 变量类型 + +**决定:C - 暂不新增类型** + +当前 `context` 使用 `list[dict]` 格式(`ArrayAnySegment`),先这样实现,后续视需要再考虑新增 `PromptMessagesSegment` 类型。 + +### 11.4 支持范围 + +**决定:A - 支持所有使用 LLM 的节点** + +包括: + +- LLM 节点 +- Agent 节点 +- Question Classify 节点 +- Parameter Extractor 节点 + +这些节点都需要输出 `context` 变量。 + +### 11.5 重试机制 + +**决定:A - 内部实现重试** + +虚拟节点在 `ExtractionExecutor` 内部实现重试机制,而非复用节点级别的重试流程。 + +**原因**: + +- 节点级别的重试需要节点在 `graph.nodes` 中,虚拟节点不满足此条件 +- `ErrorHandler._handle_retry()` 无法找到虚拟节点 + +**实现方式**: + +- 继承父节点的 `retry_config`(max_retries, retry_interval_seconds) +- 在 `ExtractionExecutor._execute_with_retry()` 中实现重试循环 +- 每次重试 yield `NodeRunRetryEvent` 供前端展示 + +### 11.6 复用 LLMNode 逻辑 + +**决定:使用 LLMNode 静态方法** + +ExtractionExecutor 复用 `LLMNode.invoke_llm()` 和 `LLMNode.handle_invoke_result()` 静态方法: + +**优点**: + +- 获得完整的 streaming 处理能力 +- 获得完整的 token 统计(`LLMUsage`) +- 获得文件处理能力(multimodal) +- 返回格式与真正的 LLM 节点一致 + +**NodeRunResult 包含**: + +- `outputs`: `{"text": "..."}` +- `llm_usage`: `LLMUsage` 对象 +- `metadata`: token 计费信息(TOTAL_TOKENS, TOTAL_PRICE, CURRENCY) + +--- + +## 12. 实现计划 + +### Phase 1: 基础设施 + +| Task | 文件 | 说明 | +| ---- | ----------------------------------------------- | ------------------------------------------------------ | +| 1.1 | `core/workflow/entities/variable_extraction.py` | 定义 `VariableExtractionSpec`、`ExtractionModelConfig` | +| 1.2 | `core/workflow/graph_events/node.py` | 在 `NodeRunStartedEvent` 添加虚拟节点字段 | +| 1.3 | `core/workflow/nodes/llm/llm_utils.py` | 提取 `build_context` 为公共函数 | + +### Phase 2: 核心执行器 + +| Task | 文件 | 说明 | +| ---- | --------------------------------------------------------------- | ----------------------------------------------------------- | +| 2.1 | `core/workflow/nodes/base/extraction_errors.py` | 定义错误类型 | +| 2.2 | `core/workflow/nodes/base/extraction_executor.py` | 实现 `ExtractionExecutor` | +| 2.3 | `core/workflow/graph_engine/event_management/event_handlers.py` | 修改 `_is_virtual_node` 判断,虚拟节点不触发 edge_processor | + +### Phase 3: 节点 context 输出 + +| Task | 文件 | 说明 | +| ---- | ------------------------------------------ | ------------------- | +| 3.1 | `core/workflow/nodes/agent/agent_node.py` | 添加 `context` 输出 | +| 3.2 | `core/workflow/nodes/question_classifier/` | 添加 `context` 输出 | +| 3.3 | `core/workflow/nodes/parameter_extractor/` | 添加 `context` 输出 | + +### Phase 4: 节点集成 + +| Task | 文件 | 说明 | +| ---- | ----------------------------------------- | ------------------------- | +| 4.1 | `core/workflow/nodes/tool/tool_node.py` | 集成 `ExtractionExecutor` | +| 4.2 | `core/workflow/nodes/agent/agent_node.py` | 集成 `ExtractionExecutor` | +| 4.3 | 其他节点 | 按需集成 | + +### Phase 5: 测试 + +| Task | 说明 | +| ---- | ---------------------------------- | +| 5.1 | 单元测试:结构化配置解析 | +| 5.2 | 单元测试:ExtractionExecutor | +| 5.3 | 集成测试:ToolNode with extraction | +| 5.4 | 集成测试:多个 extraction 场景 | + +--- + +## 13. 附录 + +### 13.1 相关代码位置 + +| 模块 | 路径 | 说明 | +| ------------- | --------------------------------------------------------------- | --------------------------------- | +| LLM Node | `core/workflow/nodes/llm/node.py` | `_build_context` 方法(line 600) | +| Tool Node | `core/workflow/nodes/tool/tool_node.py` | `_generate_parameters` 方法 | +| Agent Node | `core/workflow/nodes/agent/agent_node.py` | 需要添加 context 输出 | +| Variable Pool | `core/workflow/runtime/variable_pool.py` | 变量存取和模板解析 | +| Graph Events | `core/workflow/graph_events/node.py` | 节点事件定义 | +| Event Handler | `core/workflow/graph_engine/event_management/event_handlers.py` | 事件处理和变量存储 | +| Worker | `core/workflow/graph_engine/worker.py` | 节点执行和事件队列 | + +### 13.2 参考实现 + +| 功能 | 参考代码 | 说明 | +| ------------- | ------------------------ | ------------------------------------------------------ | +| 模板解析 | `VariableTemplateParser` | `core/workflow/nodes/base/variable_template_parser.py` | +| 历史消息处理 | `TokenBufferMemory` | `core/memory/token_buffer_memory.py` | +| LLM 流式调用 | `LLMNode.invoke_llm` | `core/workflow/nodes/llm/node.py` line 386 | +| 事件 dispatch | `Node._dispatch` | `core/workflow/nodes/base/node.py` line 559 | + +### 13.3 新增文件 + +实现本功能需要新增以下文件: + +``` +core/workflow/ +├── entities/ +│ └── variable_extraction.py # NEW: VariableExtractionSpec 定义 +└── nodes/ + └── base/ + ├── extraction_errors.py # NEW: 错误类型定义 + └── extraction_executor.py # NEW: ExtractionExecutor 实现 +``` + +### 13.4 修改文件清单 + +| 文件 | 修改内容 | +| --------------------------------------------------------------- | ------------------------------------------- | +| `core/workflow/graph_events/node.py` | 添加 `is_virtual`, `parent_node_id` 等字段 | +| `core/workflow/graph_engine/event_management/event_handlers.py` | 添加 `_is_virtual_node` 判断 | +| `core/workflow/nodes/llm/llm_utils.py` | 提取 `build_context` 公共函数 | +| `core/workflow/nodes/tool/tool_node.py` | 集成 ExtractionExecutor | +| `core/workflow/nodes/agent/agent_node.py` | 添加 context 输出 + 集成 ExtractionExecutor | +| `core/workflow/nodes/question_classifier/*.py` | 添加 context 输出 | +| `core/workflow/nodes/parameter_extractor/*.py` | 添加 context 输出 | diff --git a/api/core/workflow/graph_engine/event_management/event_handlers.py b/api/core/workflow/graph_engine/event_management/event_handlers.py index 5b0f56e59d..7a10b0f291 100644 --- a/api/core/workflow/graph_engine/event_management/event_handlers.py +++ b/api/core/workflow/graph_engine/event_management/event_handlers.py @@ -125,6 +125,11 @@ class EventHandler: Args: event: The node started event """ + # Check if this is a virtual node (extraction node) + if self._is_virtual_node(event.node_id): + self._handle_virtual_node_started(event) + return + # Track execution in domain model node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) is_initial_attempt = node_execution.retry_count == 0 @@ -164,6 +169,11 @@ class EventHandler: Args: event: The node succeeded event """ + # Check if this is a virtual node (extraction node) + if self._is_virtual_node(event.node_id): + self._handle_virtual_node_success(event) + return + # Update domain model node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) node_execution.mark_taken() @@ -226,6 +236,11 @@ class EventHandler: Args: event: The node failed event """ + # Check if this is a virtual node (extraction node) + if self._is_virtual_node(event.node_id): + self._handle_virtual_node_failed(event) + return + # Update domain model node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) node_execution.mark_failed(event.error) @@ -345,3 +360,57 @@ class EventHandler: self._graph_runtime_state.set_output("answer", value) else: self._graph_runtime_state.set_output(key, value) + + def _is_virtual_node(self, node_id: str) -> bool: + """ + Check if node_id represents a virtual sub-node. + + Virtual nodes have IDs in the format: {parent_node_id}.{local_id} + We check if the part before '.' exists in graph nodes. + """ + if "." in node_id: + parent_id = node_id.rsplit(".", 1)[0] + return parent_id in self._graph.nodes + return False + + def _handle_virtual_node_started(self, event: NodeRunStartedEvent) -> None: + """ + Handle virtual node started event. + + Virtual nodes don't need full execution tracking, just collect the event. + """ + # Track in response coordinator for stream ordering + self._response_coordinator.track_node_execution(event.node_id, event.id) + + # Collect the event + self._event_collector.collect(event) + + def _handle_virtual_node_success(self, event: NodeRunSucceededEvent) -> None: + """ + Handle virtual node success event. + + Virtual nodes (extraction nodes) need special handling: + - Store outputs in variable pool (for reference by other nodes) + - Accumulate token usage + - Collect the event for logging + - Do NOT process edges or enqueue next nodes (parent node handles that) + """ + self._accumulate_node_usage(event.node_run_result.llm_usage) + + # Store outputs in variable pool + self._store_node_outputs(event.node_id, event.node_run_result.outputs) + + # Collect the event + self._event_collector.collect(event) + + def _handle_virtual_node_failed(self, event: NodeRunFailedEvent) -> None: + """ + Handle virtual node failed event. + + Virtual nodes (extraction nodes) failures are collected for logging, + but the parent node is responsible for handling the error. + """ + self._accumulate_node_usage(event.node_run_result.llm_usage) + + # Collect the event for logging + self._event_collector.collect(event) diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index f225798d41..52345ece82 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -20,6 +20,12 @@ class NodeRunStartedEvent(GraphNodeEventBase): provider_type: str = "" provider_id: str = "" + # Virtual node fields for extraction + is_virtual: bool = False + parent_node_id: str | None = None + extraction_source: str | None = None # e.g., "llm1.context" + extraction_prompt: str | None = None + class NodeRunStreamChunkEvent(GraphNodeEventBase): # Spec-compliant fields diff --git a/api/core/workflow/nodes/base/__init__.py b/api/core/workflow/nodes/base/__init__.py index f83df0e323..e6cde91bea 100644 --- a/api/core/workflow/nodes/base/__init__.py +++ b/api/core/workflow/nodes/base/__init__.py @@ -1,5 +1,13 @@ -from .entities import BaseIterationNodeData, BaseIterationState, BaseLoopNodeData, BaseLoopState, BaseNodeData +from .entities import ( + BaseIterationNodeData, + BaseIterationState, + BaseLoopNodeData, + BaseLoopState, + BaseNodeData, + VirtualNodeConfig, +) from .usage_tracking_mixin import LLMUsageTrackingMixin +from .virtual_node_executor import VirtualNodeExecutionError, VirtualNodeExecutor __all__ = [ "BaseIterationNodeData", @@ -8,4 +16,7 @@ __all__ = [ "BaseLoopState", "BaseNodeData", "LLMUsageTrackingMixin", + "VirtualNodeConfig", + "VirtualNodeExecutionError", + "VirtualNodeExecutor", ] diff --git a/api/core/workflow/nodes/base/entities.py b/api/core/workflow/nodes/base/entities.py index e5a20c8e91..41469d6ee8 100644 --- a/api/core/workflow/nodes/base/entities.py +++ b/api/core/workflow/nodes/base/entities.py @@ -167,6 +167,24 @@ class DefaultValue(BaseModel): return self +class VirtualNodeConfig(BaseModel): + """Configuration for a virtual sub-node embedded within a parent node.""" + + # Local ID within parent node (e.g., "ext_1") + # Will be converted to global ID: "{parent_id}.{id}" + id: str + + # Node type (e.g., "llm", "code", "tool") + type: str + + # Full node data configuration + data: dict[str, Any] = {} + + def get_global_id(self, parent_node_id: str) -> str: + """Get the global node ID by combining parent ID and local ID.""" + return f"{parent_node_id}.{self.id}" + + class BaseNodeData(ABC, BaseModel): title: str desc: str | None = None @@ -175,6 +193,9 @@ class BaseNodeData(ABC, BaseModel): default_value: list[DefaultValue] | None = None retry_config: RetryConfig = RetryConfig() + # Virtual sub-nodes that execute before the main node + virtual_nodes: list[VirtualNodeConfig] = [] + @property def default_value_dict(self) -> dict[str, Any]: if self.default_value: diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 55c8db40ea..d49910c9fb 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -229,6 +229,7 @@ class Node(Generic[NodeDataT]): self._node_id = node_id self._node_execution_id: str = "" self._start_at = naive_utc_now() + self._virtual_node_outputs: dict[str, Any] = {} # Outputs from virtual sub-nodes raw_node_data = config.get("data") or {} if not isinstance(raw_node_data, Mapping): @@ -270,10 +271,52 @@ class Node(Generic[NodeDataT]): """Check if execution should be stopped.""" return self.graph_runtime_state.stop_event.is_set() + def _execute_virtual_nodes(self) -> Generator[GraphNodeEventBase, None, dict[str, Any]]: + """ + Execute all virtual sub-nodes defined in node configuration. + + Virtual nodes are complete node definitions that execute before the main node. + Each virtual node: + - Has its own global ID: "{parent_id}.{local_id}" + - Generates standard node events + - Stores outputs in the variable pool (via event handling) + - Supports retry via parent node's retry config + + Returns: + dict mapping local_id -> outputs dict + """ + from .virtual_node_executor import VirtualNodeExecutor + + virtual_nodes = self.node_data.virtual_nodes + if not virtual_nodes: + return {} + + executor = VirtualNodeExecutor( + graph_init_params=self._graph_init_params, + graph_runtime_state=self.graph_runtime_state, + parent_node_id=self._node_id, + parent_retry_config=self.retry_config, + ) + + return (yield from executor.execute_virtual_nodes(virtual_nodes)) + + @property + def virtual_node_outputs(self) -> dict[str, Any]: + """ + Get the outputs from virtual sub-nodes. + + Returns: + dict mapping local_id -> outputs dict + """ + return self._virtual_node_outputs + def run(self) -> Generator[GraphNodeEventBase, None, None]: execution_id = self.ensure_execution_id() self._start_at = naive_utc_now() + # Step 1: Execute virtual sub-nodes before main node execution + self._virtual_node_outputs = yield from self._execute_virtual_nodes() + # Create and push start event with required fields start_event = NodeRunStartedEvent( id=execution_id, diff --git a/api/core/workflow/nodes/base/virtual_node_executor.py b/api/core/workflow/nodes/base/virtual_node_executor.py new file mode 100644 index 0000000000..3f3b8f1f99 --- /dev/null +++ b/api/core/workflow/nodes/base/virtual_node_executor.py @@ -0,0 +1,213 @@ +""" +Virtual Node Executor for running embedded sub-nodes within a parent node. + +This module handles the execution of virtual nodes defined in a parent node's +`virtual_nodes` configuration. Virtual nodes are complete node definitions +that execute before the parent node. + +Example configuration: + virtual_nodes: + - id: ext_1 + type: llm + data: + model: {...} + prompt_template: [...] +""" + +import time +from collections.abc import Generator +from typing import TYPE_CHECKING, Any +from uuid import uuid4 + +from core.workflow.enums import NodeType +from core.workflow.graph_events import ( + GraphNodeEventBase, + NodeRunFailedEvent, + NodeRunRetryEvent, + NodeRunStartedEvent, + NodeRunSucceededEvent, +) +from libs.datetime_utils import naive_utc_now + +from .entities import RetryConfig, VirtualNodeConfig + +if TYPE_CHECKING: + from core.workflow.entities import GraphInitParams + from core.workflow.runtime import GraphRuntimeState + + +class VirtualNodeExecutionError(Exception): + """Error during virtual node execution""" + + def __init__(self, node_id: str, original_error: Exception): + self.node_id = node_id + self.original_error = original_error + super().__init__(f"Virtual node {node_id} execution failed: {original_error}") + + +class VirtualNodeExecutor: + """ + Executes virtual sub-nodes embedded within a parent node. + + Virtual nodes are complete node definitions that execute before the parent node. + Each virtual node: + - Has its own global ID: "{parent_id}.{local_id}" + - Generates standard node events + - Stores outputs in the variable pool + - Supports retry via parent node's retry config + """ + + def __init__( + self, + *, + graph_init_params: "GraphInitParams", + graph_runtime_state: "GraphRuntimeState", + parent_node_id: str, + parent_retry_config: RetryConfig | None = None, + ): + self._graph_init_params = graph_init_params + self._graph_runtime_state = graph_runtime_state + self._parent_node_id = parent_node_id + self._parent_retry_config = parent_retry_config or RetryConfig() + + def execute_virtual_nodes( + self, + virtual_nodes: list[VirtualNodeConfig], + ) -> Generator[GraphNodeEventBase, None, dict[str, Any]]: + """ + Execute all virtual nodes in order. + + Args: + virtual_nodes: List of virtual node configurations + + Yields: + Node events from each virtual node execution + + Returns: + dict mapping local_id -> outputs dict + """ + results: dict[str, Any] = {} + + for vnode_config in virtual_nodes: + global_id = vnode_config.get_global_id(self._parent_node_id) + + # Execute with retry + outputs = yield from self._execute_with_retry(vnode_config, global_id) + results[vnode_config.id] = outputs + + return results + + def _execute_with_retry( + self, + vnode_config: VirtualNodeConfig, + global_id: str, + ) -> Generator[GraphNodeEventBase, None, dict[str, Any]]: + """ + Execute virtual node with retry support. + """ + retry_config = self._parent_retry_config + last_error: Exception | None = None + + for attempt in range(retry_config.max_retries + 1): + try: + return (yield from self._execute_single_node(vnode_config, global_id)) + except Exception as e: + last_error = e + + if attempt < retry_config.max_retries: + # Yield retry event + yield NodeRunRetryEvent( + id=str(uuid4()), + node_id=global_id, + node_type=self._get_node_type(vnode_config.type), + node_title=vnode_config.data.get("title", f"Virtual: {vnode_config.id}"), + start_at=naive_utc_now(), + error=str(e), + retry_index=attempt + 1, + ) + + time.sleep(retry_config.retry_interval_seconds) + continue + + raise VirtualNodeExecutionError(global_id, e) from e + + raise last_error or VirtualNodeExecutionError(global_id, Exception("Unknown error")) + + def _execute_single_node( + self, + vnode_config: VirtualNodeConfig, + global_id: str, + ) -> Generator[GraphNodeEventBase, None, dict[str, Any]]: + """ + Execute a single virtual node by instantiating and running it. + """ + from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING + + # Build node config + node_config: dict[str, Any] = { + "id": global_id, + "data": { + **vnode_config.data, + "title": vnode_config.data.get("title", f"Virtual: {vnode_config.id}"), + }, + } + + # Get the node class for this type + node_type = self._get_node_type(vnode_config.type) + node_mapping = NODE_TYPE_CLASSES_MAPPING.get(node_type) + if not node_mapping: + raise ValueError(f"No class mapping found for node type: {node_type}") + + node_version = str(vnode_config.data.get("version", "1")) + node_cls = node_mapping.get(node_version) or node_mapping.get(LATEST_VERSION) + if not node_cls: + raise ValueError(f"No class found for node type: {node_type}") + + # Instantiate the node + node = node_cls( + id=global_id, + config=node_config, + graph_init_params=self._graph_init_params, + graph_runtime_state=self._graph_runtime_state, + ) + + # Run and collect events + outputs: dict[str, Any] = {} + + for event in node.run(): + # Mark event as coming from virtual node + self._mark_event_as_virtual(event, vnode_config) + yield event + + if isinstance(event, NodeRunSucceededEvent): + outputs = event.node_run_result.outputs or {} + elif isinstance(event, NodeRunFailedEvent): + raise Exception(event.error or "Virtual node execution failed") + + return outputs + + def _mark_event_as_virtual( + self, + event: GraphNodeEventBase, + vnode_config: VirtualNodeConfig, + ) -> None: + """Mark event as coming from a virtual node.""" + if isinstance(event, NodeRunStartedEvent): + event.is_virtual = True + event.parent_node_id = self._parent_node_id + + def _get_node_type(self, type_str: str) -> NodeType: + """Convert type string to NodeType enum.""" + type_mapping = { + "llm": NodeType.LLM, + "code": NodeType.CODE, + "tool": NodeType.TOOL, + "if-else": NodeType.IF_ELSE, + "question-classifier": NodeType.QUESTION_CLASSIFIER, + "parameter-extractor": NodeType.PARAMETER_EXTRACTOR, + "template-transform": NodeType.TEMPLATE_TRANSFORM, + "variable-assigner": NodeType.VARIABLE_ASSIGNER, + "http-request": NodeType.HTTP_REQUEST, + "knowledge-retrieval": NodeType.KNOWLEDGE_RETRIEVAL, + } + return type_mapping.get(type_str, NodeType.LLM) diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 2e7ec757b4..0ba58a9560 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -89,18 +89,20 @@ class ToolNode(Node[ToolNodeData]): ) return - # get parameters + # get parameters (use virtual_node_outputs from base class) tool_parameters = tool_runtime.get_merged_runtime_parameters() or [] parameters = self._generate_parameters( tool_parameters=tool_parameters, variable_pool=self.graph_runtime_state.variable_pool, node_data=self.node_data, + virtual_node_outputs=self.virtual_node_outputs, ) parameters_for_log = self._generate_parameters( tool_parameters=tool_parameters, variable_pool=self.graph_runtime_state.variable_pool, node_data=self.node_data, for_log=True, + virtual_node_outputs=self.virtual_node_outputs, ) # get conversation id conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID]) @@ -176,6 +178,7 @@ class ToolNode(Node[ToolNodeData]): variable_pool: "VariablePool", node_data: ToolNodeData, for_log: bool = False, + virtual_node_outputs: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Generate parameters based on the given tool parameters, variable pool, and node data. @@ -184,12 +187,17 @@ class ToolNode(Node[ToolNodeData]): tool_parameters (Sequence[ToolParameter]): The list of tool parameters. variable_pool (VariablePool): The variable pool containing the variables. node_data (ToolNodeData): The data associated with the tool node. + for_log (bool): Whether to generate parameters for logging. + virtual_node_outputs (dict[str, Any] | None): Outputs from virtual sub-nodes. + Maps local_id -> outputs dict. Virtual node outputs are also in variable_pool + with global IDs like "{parent_id}.{local_id}". Returns: Mapping[str, Any]: A dictionary containing the generated parameters. """ tool_parameters_dictionary = {parameter.name: parameter for parameter in tool_parameters} + virtual_node_outputs = virtual_node_outputs or {} result: dict[str, Any] = {} for parameter_name in node_data.tool_parameters: @@ -199,14 +207,25 @@ class ToolNode(Node[ToolNodeData]): continue tool_input = node_data.tool_parameters[parameter_name] if tool_input.type == "variable": - variable = variable_pool.get(tool_input.value) - if variable is None: - if parameter.required: - raise ToolParameterError(f"Variable {tool_input.value} does not exist") - continue - parameter_value = variable.value + # Check if this references a virtual node output (local ID like [ext_1, text]) + selector = tool_input.value + if len(selector) >= 2 and selector[0] in virtual_node_outputs: + # Reference to virtual node output + local_id = selector[0] + var_name = selector[1] + outputs = virtual_node_outputs.get(local_id, {}) + parameter_value = outputs.get(var_name) + else: + # Normal variable reference + variable = variable_pool.get(selector) + if variable is None: + if parameter.required: + raise ToolParameterError(f"Variable {selector} does not exist") + continue + parameter_value = variable.value elif tool_input.type in {"mixed", "constant"}: - segment_group = variable_pool.convert_template(str(tool_input.value)) + template = str(tool_input.value) + segment_group = variable_pool.convert_template(template) parameter_value = segment_group.log if for_log else segment_group.text else: raise ToolParameterError(f"Unknown tool input type '{tool_input.type}'") diff --git a/api/tests/fixtures/pav-test-extraction.yml b/api/tests/fixtures/pav-test-extraction.yml new file mode 100644 index 0000000000..d1b9d55add --- /dev/null +++ b/api/tests/fixtures/pav-test-extraction.yml @@ -0,0 +1,266 @@ +app: + description: Test for variable extraction feature + icon: 🤖 + icon_background: '#FFEAD5' + mode: advanced-chat + name: pav-test-extraction + use_icon_as_answer_icon: false +dependencies: +- current_identifier: null + type: marketplace + value: + marketplace_plugin_unique_identifier: langgenius/google:0.0.8@3efcf55ffeef9d0f77715e0afb23534952ae0cb385c051d0637e86d71199d1a6 + version: null +- current_identifier: null + type: marketplace + value: + marketplace_plugin_unique_identifier: langgenius/tongyi:0.1.16@d8bffbe45418f0c117fb3393e5e40e61faee98f9a2183f062e5a280e74b15d21 + version: null +kind: app +version: 0.5.0 +workflow: + conversation_variables: [] + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .JPG + - .JPEG + - .PNG + - .GIF + - .WEBP + - .SVG + allowed_file_types: + - image + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: 你好!我是一个搜索助手,请告诉我你想搜索什么内容。 + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + sourceType: start + targetType: llm + id: 1767773675796-llm + source: '1767773675796' + sourceHandle: source + target: llm + targetHandle: target + type: custom + - data: + isInIteration: false + isInLoop: false + sourceType: llm + targetType: tool + id: llm-source-1767773709491-target + source: llm + sourceHandle: source + target: '1767773709491' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: false + isInLoop: false + sourceType: tool + targetType: answer + id: tool-source-answer-target + source: '1767773709491' + sourceHandle: source + target: answer + targetHandle: target + type: custom + zIndex: 0 + nodes: + - data: + selected: false + title: User Input + type: start + variables: [] + height: 73 + id: '1767773675796' + position: + x: 80 + y: 282 + positionAbsolute: + x: 80 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + context: + enabled: false + variable_selector: [] + memory: + query_prompt_template: '' + role_prefix: + assistant: '' + user: '' + window: + enabled: true + size: 10 + model: + completion_params: + temperature: 0.7 + mode: chat + name: qwen-max + provider: langgenius/tongyi/tongyi + prompt_template: + - id: 11d06d15-914a-4915-a5b1-0e35ab4fba51 + role: system + text: '你是一个智能搜索助手。用户会告诉你他们想搜索的内容。 + + 请与用户进行对话,了解他们的搜索需求。 + + 当用户明确表达了想要搜索的内容后,你可以回复"好的,我来帮你搜索"。 + + ' + selected: false + title: LLM + type: llm + vision: + enabled: false + height: 88 + id: llm + position: + x: 380 + y: 282 + positionAbsolute: + x: 380 + y: 282 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + is_team_authorization: true + paramSchemas: + - auto_generate: null + default: null + form: llm + human_description: + en_US: used for searching + ja_JP: used for searching + pt_BR: used for searching + zh_Hans: 用于搜索网页内容 + label: + en_US: Query string + ja_JP: Query string + pt_BR: Query string + zh_Hans: 查询语句 + llm_description: key words for searching + max: null + min: null + name: query + options: [] + placeholder: null + precision: null + required: true + scope: null + template: null + type: string + params: + query: '' + plugin_id: langgenius/google + plugin_unique_identifier: langgenius/google:0.0.8@3efcf55ffeef9d0f77715e0afb23534952ae0cb385c051d0637e86d71199d1a6 + provider_icon: http://localhost:5001/console/api/workspaces/current/plugin/icon?tenant_id=7217e801-f6f5-49ec-8103-d7de97a4b98f&filename=1c5871163478957bac64c3fe33d72d003f767497d921c74b742aad27a8344a74.svg + provider_id: langgenius/google/google + provider_name: langgenius/google/google + provider_type: builtin + selected: false + title: GoogleSearch + tool_configurations: {} + tool_description: A tool for performing a Google SERP search and extracting + snippets and webpages.Input should be a search query. + tool_label: GoogleSearch + tool_name: google_search + tool_node_version: '2' + tool_parameters: + query: + type: variable + value: + - ext_1 + - text + type: tool + virtual_nodes: + - data: + model: + completion_params: + temperature: 0.7 + mode: chat + name: qwen-max + provider: langgenius/tongyi/tongyi + context: + enabled: false + prompt_template: + - role: user + text: '{{#llm.context#}}' + - role: user + text: 请从对话历史中提取用户想要搜索的关键词,只返回关键词本身,不要返回其他内容 + title: 提取搜索关键词 + id: ext_1 + type: llm + height: 52 + id: '1767773709491' + position: + x: 682 + y: 282 + positionAbsolute: + x: 682 + y: 282 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + answer: '搜索结果: + + {{#1767773709491.text#}} + + ' + selected: false + title: Answer + type: answer + height: 103 + id: answer + position: + x: 984 + y: 282 + positionAbsolute: + x: 984 + y: 282 + selected: true + sourcePosition: right + targetPosition: left + type: custom + width: 242 + viewport: + x: 151 + y: 141.5 + zoom: 1 + rag_pipeline_variables: [] diff --git a/api/tests/unit_tests/core/workflow/entities/test_virtual_node.py b/api/tests/unit_tests/core/workflow/entities/test_virtual_node.py new file mode 100644 index 0000000000..ffffccfa1b --- /dev/null +++ b/api/tests/unit_tests/core/workflow/entities/test_virtual_node.py @@ -0,0 +1,77 @@ +""" +Unit tests for virtual node configuration. +""" + +from core.workflow.nodes.base.entities import VirtualNodeConfig + + +class TestVirtualNodeConfig: + """Tests for VirtualNodeConfig entity.""" + + def test_create_basic_config(self): + """Test creating a basic virtual node config.""" + config = VirtualNodeConfig( + id="ext_1", + type="llm", + data={ + "title": "Extract keywords", + "model": {"provider": "openai", "name": "gpt-4o-mini"}, + }, + ) + + assert config.id == "ext_1" + assert config.type == "llm" + assert config.data["title"] == "Extract keywords" + + def test_get_global_id(self): + """Test generating global ID from parent ID.""" + config = VirtualNodeConfig( + id="ext_1", + type="llm", + data={}, + ) + + global_id = config.get_global_id("tool1") + assert global_id == "tool1.ext_1" + + def test_get_global_id_with_different_parents(self): + """Test global ID generation with different parent IDs.""" + config = VirtualNodeConfig(id="sub_node", type="code", data={}) + + assert config.get_global_id("parent1") == "parent1.sub_node" + assert config.get_global_id("node_123") == "node_123.sub_node" + + def test_empty_data(self): + """Test virtual node config with empty data.""" + config = VirtualNodeConfig( + id="test", + type="tool", + ) + + assert config.id == "test" + assert config.type == "tool" + assert config.data == {} + + def test_complex_data(self): + """Test virtual node config with complex data.""" + config = VirtualNodeConfig( + id="llm_1", + type="llm", + data={ + "title": "Generate summary", + "model": { + "provider": "openai", + "name": "gpt-4", + "mode": "chat", + "completion_params": {"temperature": 0.7, "max_tokens": 500}, + }, + "prompt_template": [ + {"role": "user", "text": "{{#llm1.context#}}"}, + {"role": "user", "text": "Please summarize the conversation"}, + ], + }, + ) + + assert config.data["model"]["provider"] == "openai" + assert len(config.data["prompt_template"]) == 2 + diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts index cf0d9bcef1..c18ab909a9 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-finished.ts @@ -37,7 +37,10 @@ export const useWorkflowNodeFinished = () => { })) const newNodes = produce(nodes, (draft) => { - const currentNode = draft.find(node => node.id === data.node_id)! + const currentNode = draft.find(node => node.id === data.node_id) + // Skip if node not found (e.g., virtual extraction nodes) + if (!currentNode) + return currentNode.data._runningStatus = data.status if (data.status === NodeRunningStatus.Exception) { if (data.execution_metadata?.error_strategy === ErrorHandleTypeEnum.failBranch) diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts index 03c7387d38..282e35fbd6 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts @@ -45,6 +45,11 @@ export const useWorkflowNodeStarted = () => { } = reactflow const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) const currentNode = nodes[currentNodeIndex] + + // Skip if node not found (e.g., virtual extraction nodes) + if (!currentNode) + return + const position = currentNode.position const zoom = transform[2]