From 9ce48b4dc488e4f93007503c1b53664c08e6d0cf Mon Sep 17 00:00:00 2001 From: Novice Date: Fri, 12 Dec 2025 11:08:49 +0800 Subject: [PATCH] fix: llm generation variable --- .../advanced_chat/generate_task_pipeline.py | 5 + api/core/workflow/nodes/llm/entities.py | 18 ++ api/core/workflow/nodes/llm/node.py | 283 +++++++++++------- ...4a64f53_add_llm_generation_detail_table.py | 46 +++ 4 files changed, 251 insertions(+), 101 deletions(-) create mode 100644 api/migrations/versions/2025_12_10_1617-85c8b4a64f53_add_llm_generation_detail_table.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 3b2c55aa2d..7a330bbfb1 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -141,6 +141,9 @@ class StreamEventBuffer: def record_tool_call(self, tool_call_id: str, tool_name: str, tool_arguments: str) -> None: """Record a tool call event.""" + if not tool_call_id: + return + # Flush any pending reasoning first if self._last_event_type == "thought": self._flush_current_reasoning() @@ -168,6 +171,8 @@ class StreamEventBuffer: def record_tool_result(self, tool_call_id: str, result: str) -> None: """Record a tool result event (update existing tool call).""" + if not tool_call_id: + return if tool_call_id in self._tool_call_id_map: idx = self._tool_call_id_map[tool_call_id] self.tool_calls[idx]["result"] = result diff --git a/api/core/workflow/nodes/llm/entities.py b/api/core/workflow/nodes/llm/entities.py index 7da5cd241e..2003820d80 100644 --- a/api/core/workflow/nodes/llm/entities.py +++ b/api/core/workflow/nodes/llm/entities.py @@ -3,7 +3,9 @@ from typing import Any, Literal from pydantic import BaseModel, Field, field_validator +from core.file import File from core.model_runtime.entities import ImagePromptMessageContent, LLMMode +from core.model_runtime.entities.llm_entities import LLMUsage from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig from core.tools.entities.tool_entities import ToolProviderType from core.workflow.nodes.base import BaseNodeData @@ -17,6 +19,22 @@ class ModelConfig(BaseModel): completion_params: dict[str, Any] = Field(default_factory=dict) +class LLMGenerationData(BaseModel): + """Generation data from LLM invocation with tools. + + For multi-turn tool calls like: thought1 -> text1 -> tool_call1 -> thought2 -> text2 -> tool_call2 + - reasoning_contents: [thought1, thought2, ...] - one element per turn + - tool_calls: [{id, name, arguments, result}, ...] - all tool calls with results + """ + + text: str = Field(..., description="Accumulated text content from all turns") + reasoning_contents: list[str] = Field(default_factory=list, description="Reasoning content per turn") + tool_calls: list[dict[str, Any]] = Field(default_factory=list, description="Tool calls with results") + usage: LLMUsage = Field(..., description="LLM usage statistics") + finish_reason: str | None = Field(None, description="Finish reason from LLM") + files: list[File] = Field(default_factory=list, description="Generated files") + + class ContextConfig(BaseModel): enabled: bool variable_selector: list[str] | None = None diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 738350f301..fe105c2ddb 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -83,6 +83,7 @@ from core.workflow.runtime import VariablePool from . import llm_utils from .entities import ( + LLMGenerationData, LLMNodeChatModelMessage, LLMNodeCompletionModelPromptTemplate, LLMNodeData, @@ -148,10 +149,84 @@ class LLMNode(Node[LLMNodeData]): def version(cls) -> str: return "1" + def _stream_llm_events( + self, + generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None], + *, + model_instance: ModelInstance, + ) -> Generator[ + NodeEventBase, + None, + tuple[ + str, + str, + LLMUsage, + str | None, + LLMStructuredOutput | None, + LLMGenerationData | None, + ], + ]: + """ + Stream events and capture generator return value in one place. + Uses generator delegation so _run stays concise while still emitting events. + """ + clean_text = "" + reasoning_content = "" + usage = LLMUsage.empty_usage() + finish_reason: str | None = None + structured_output: LLMStructuredOutput | None = None + generation_data: LLMGenerationData | None = None + completed = False + + while True: + try: + event = next(generator) + except StopIteration as exc: + if isinstance(exc.value, LLMGenerationData): + generation_data = exc.value + break + + if completed: + # After completion we still drain to reach StopIteration.value + continue + + match event: + case StreamChunkEvent() | ThoughtChunkEvent(): + yield event + + case ModelInvokeCompletedEvent( + text=text, + usage=usage_event, + finish_reason=finish_reason_event, + reasoning_content=reasoning_event, + structured_output=structured_raw, + ): + clean_text = text + usage = usage_event + finish_reason = finish_reason_event + reasoning_content = reasoning_event or "" + + if self.node_data.reasoning_format != "tagged": + clean_text, _ = LLMNode._split_reasoning(clean_text, self.node_data.reasoning_format) + + structured_output = ( + LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None + ) + + llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) + completed = True + + case LLMStructuredOutput(): + structured_output = event + + case _: + continue + + return clean_text, reasoning_content, usage, finish_reason, structured_output, generation_data + def _run(self) -> Generator: node_inputs: dict[str, Any] = {} process_data: dict[str, Any] = {} - result_text = "" clean_text = "" usage = LLMUsage.empty_usage() finish_reason = None @@ -240,10 +315,13 @@ class LLMNode(Node[LLMNodeData]): tenant_id=self.tenant_id, ) + # Variables for outputs + generation_data: LLMGenerationData | None = None + structured_output: LLMStructuredOutput | None = None + # Check if tools are configured if self.tool_call_enabled: # Use tool-enabled invocation (Agent V2 style) - # This generator handles all events including final events generator = self._invoke_llm_with_tools( model_instance=model_instance, prompt_messages=prompt_messages, @@ -253,10 +331,6 @@ class LLMNode(Node[LLMNodeData]): node_inputs=node_inputs, process_data=process_data, ) - # Forward all events and return early since _invoke_llm_with_tools - # already sends final event and StreamCompletedEvent - yield from generator - return else: # Use traditional LLM invocation generator = LLMNode.invoke_llm( @@ -274,39 +348,23 @@ class LLMNode(Node[LLMNodeData]): reasoning_format=self._node_data.reasoning_format, ) - structured_output: LLMStructuredOutput | None = None + ( + clean_text, + reasoning_content, + usage, + finish_reason, + structured_output, + generation_data, + ) = yield from self._stream_llm_events(generator, model_instance=model_instance) - for event in generator: - if isinstance(event, (StreamChunkEvent, ThoughtChunkEvent)): - yield event - elif isinstance(event, ModelInvokeCompletedEvent): - # Raw text - result_text = event.text - usage = event.usage - finish_reason = event.finish_reason - reasoning_content = event.reasoning_content or "" - - # For downstream nodes, determine clean text based on reasoning_format - if self.node_data.reasoning_format == "tagged": - # Keep tags for backward compatibility - clean_text = result_text - else: - # Extract clean text from tags - clean_text, _ = LLMNode._split_reasoning(result_text, self.node_data.reasoning_format) - - # Process structured output if available from the event. - structured_output = ( - LLMStructuredOutput(structured_output=event.structured_output) - if event.structured_output - else None - ) - - # deduct quota - llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) - break - elif isinstance(event, LLMStructuredOutput): - structured_output = event + # Extract variables from generation_data if available + if generation_data: + clean_text = generation_data.text + reasoning_content = "" + usage = generation_data.usage + finish_reason = generation_data.finish_reason + # Unified process_data building process_data = { "model_mode": model_config.mode, "prompts": PromptMessageUtil.prompt_messages_to_prompt_for_saving( @@ -318,38 +376,56 @@ class LLMNode(Node[LLMNodeData]): "model_name": model_config.model, } + # Unified outputs building outputs = { "text": clean_text, "reasoning_content": reasoning_content, "usage": jsonable_encoder(usage), "finish_reason": finish_reason, - "generation": { + } + + # Build generation field + if generation_data: + # Use generation_data from tool invocation (supports multi-turn) + generation = { + "content": generation_data.text, + "reasoning_content": generation_data.reasoning_contents, # [thought1, thought2, ...] + "tool_calls": generation_data.tool_calls, + } + files_to_output = generation_data.files + else: + # Traditional LLM invocation + generation = { "content": clean_text, "reasoning_content": [reasoning_content] if reasoning_content else [], "tool_calls": [], - }, - } + } + files_to_output = self._file_outputs + + outputs["generation"] = generation + if files_to_output: + outputs["files"] = ArrayFileSegment(value=files_to_output) if structured_output: outputs["structured_output"] = structured_output.structured_output - if self._file_outputs: - outputs["files"] = ArrayFileSegment(value=self._file_outputs) # Send final chunk event to indicate streaming is complete - yield StreamChunkEvent( - selector=[self._node_id, "text"], - chunk="", - is_final=True, - ) - yield StreamChunkEvent( - selector=[self._node_id, "generation", "content"], - chunk="", - is_final=True, - ) - yield ThoughtChunkEvent( - selector=[self._node_id, "generation", "thought"], - chunk="", - is_final=True, - ) + if not self.tool_call_enabled: + # For tool calls, final events are already sent in _process_tool_outputs + yield StreamChunkEvent( + selector=[self._node_id, "text"], + chunk="", + is_final=True, + ) + yield StreamChunkEvent( + selector=[self._node_id, "generation", "content"], + chunk="", + is_final=True, + ) + yield ThoughtChunkEvent( + selector=[self._node_id, "generation", "thought"], + chunk="", + is_final=True, + ) yield StreamCompletedEvent( node_run_result=NodeRunResult( @@ -1313,8 +1389,11 @@ class LLMNode(Node[LLMNodeData]): variable_pool: VariablePool, node_inputs: dict[str, Any], process_data: dict[str, Any], - ) -> Generator[NodeEventBase, None, None]: - """Invoke LLM with tools support (from Agent V2).""" + ) -> Generator[NodeEventBase, None, LLMGenerationData]: + """Invoke LLM with tools support (from Agent V2). + + Returns LLMGenerationData with text, reasoning_contents, tool_calls, usage, finish_reason, files + """ # Get model features to determine strategy model_features = self._get_model_features(model_instance) @@ -1342,8 +1421,9 @@ class LLMNode(Node[LLMNodeData]): stream=True, ) - # Process outputs - yield from self._process_tool_outputs(outputs, strategy, node_inputs, process_data) + # Process outputs and return generation result + result = yield from self._process_tool_outputs(outputs, strategy, node_inputs, process_data) + return result def _get_model_features(self, model_instance: ModelInstance) -> list[ModelFeature]: """Get model schema to determine features.""" @@ -1440,8 +1520,11 @@ class LLMNode(Node[LLMNodeData]): strategy: Any, node_inputs: dict[str, Any], process_data: dict[str, Any], - ) -> Generator[NodeEventBase, None, None]: - """Process strategy outputs and convert to node events.""" + ) -> Generator[NodeEventBase, None, LLMGenerationData]: + """Process strategy outputs and convert to node events. + + Returns LLMGenerationData with text, reasoning_contents, tool_calls, usage, finish_reason, files + """ text = "" files: list[File] = [] usage = LLMUsage.empty_usage() @@ -1450,7 +1533,9 @@ class LLMNode(Node[LLMNodeData]): agent_result: AgentResult | None = None think_parser = llm_utils.ThinkTagStreamParser() - reasoning_chunks: list[str] = [] + # Track reasoning per turn: each tool_call completion marks end of a turn + current_turn_reasoning: list[str] = [] # Buffer for current turn's thought chunks + reasoning_per_turn: list[str] = [] # Final list: one element per turn # Process each output from strategy try: @@ -1532,6 +1617,11 @@ class LLMNode(Node[LLMNodeData]): is_final=False, ) + # End of current turn: save accumulated thought as one element + if current_turn_reasoning: + reasoning_per_turn.append("".join(current_turn_reasoning)) + current_turn_reasoning.clear() + elif isinstance(output, LLMResultChunk): # Handle LLM result chunks - only process text content message = output.delta.message @@ -1549,7 +1639,7 @@ class LLMNode(Node[LLMNodeData]): continue if kind == "thought": - reasoning_chunks.append(segment) + current_turn_reasoning.append(segment) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, @@ -1594,7 +1684,7 @@ class LLMNode(Node[LLMNodeData]): if not segment: continue if kind == "thought": - reasoning_chunks.append(segment) + current_turn_reasoning.append(segment) yield ThoughtChunkEvent( selector=[self._node_id, "generation", "thought"], chunk=segment, @@ -1613,6 +1703,10 @@ class LLMNode(Node[LLMNodeData]): is_final=False, ) + # Save the last turn's thought if any + if current_turn_reasoning: + reasoning_per_turn.append("".join(current_turn_reasoning)) + # Send final events for all streams yield StreamChunkEvent( selector=[self._node_id, "text"], @@ -1653,45 +1747,32 @@ class LLMNode(Node[LLMNodeData]): is_final=True, ) - # Build generation field from agent_logs + # Build tool_calls from agent_logs (with results) tool_calls_for_generation = [] for log in agent_logs: - if log.label == "Tool Call": - tool_call_data = { - "id": log.data.get("tool_call_id", ""), - "name": log.data.get("tool_name", ""), - "arguments": json.dumps(log.data.get("tool_args", {})), - "result": log.data.get("output", ""), - } - tool_calls_for_generation.append(tool_call_data) + tool_call_id = log.data.get("tool_call_id") + if not tool_call_id or log.status == AgentLog.LogStatus.START.value: + continue - # Complete with results - yield StreamCompletedEvent( - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={ - "text": text, - "files": ArrayFileSegment(value=files), - "usage": jsonable_encoder(usage), - "finish_reason": finish_reason, - "generation": { - "reasoning_content": ["".join(reasoning_chunks)] if reasoning_chunks else [], - "tool_calls": tool_calls_for_generation, - "content": text, - }, - }, - inputs={ - **node_inputs, - "tools": [ - {"provider_id": tool.provider_name, "tool_name": tool.tool_name} - for tool in self._node_data.tools - ] - if self._node_data.tools - else [], - }, - process_data=process_data, - llm_usage=usage, + tool_args = log.data.get("tool_args") or {} + tool_calls_for_generation.append( + { + "id": tool_call_id, + "name": log.data.get("tool_name", ""), + "arguments": json.dumps(tool_args) if tool_args else "", + # Prefer output, fall back to error text if present + "result": log.data.get("output") or log.data.get("error") or "", + } ) + + # Return generation data for caller + return LLMGenerationData( + text=text, + reasoning_contents=reasoning_per_turn, # Multi-turn: [thought1, thought2, ...] + tool_calls=tool_calls_for_generation, + usage=usage, + finish_reason=finish_reason, + files=files, ) def _accumulate_usage(self, total_usage: LLMUsage, delta_usage: LLMUsage) -> None: diff --git a/api/migrations/versions/2025_12_10_1617-85c8b4a64f53_add_llm_generation_detail_table.py b/api/migrations/versions/2025_12_10_1617-85c8b4a64f53_add_llm_generation_detail_table.py new file mode 100644 index 0000000000..340cc82bb5 --- /dev/null +++ b/api/migrations/versions/2025_12_10_1617-85c8b4a64f53_add_llm_generation_detail_table.py @@ -0,0 +1,46 @@ +"""add llm generation detail table. + +Revision ID: 85c8b4a64f53 +Revises: 7bb281b7a422 +Create Date: 2025-12-10 16:17:46.597669 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '85c8b4a64f53' +down_revision = '7bb281b7a422' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('llm_generation_details', + sa.Column('id', models.types.StringUUID(), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('message_id', models.types.StringUUID(), nullable=True), + sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True), + sa.Column('node_id', sa.String(length=255), nullable=True), + sa.Column('reasoning_content', models.types.LongText(), nullable=True), + sa.Column('tool_calls', models.types.LongText(), nullable=True), + sa.Column('sequence', models.types.LongText(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.CheckConstraint('(message_id IS NOT NULL AND workflow_run_id IS NULL AND node_id IS NULL) OR (message_id IS NULL AND workflow_run_id IS NOT NULL AND node_id IS NOT NULL)', name=op.f('llm_generation_details_ck_llm_generation_detail_assoc_mode_check')), + sa.PrimaryKeyConstraint('id', name='llm_generation_detail_pkey'), + sa.UniqueConstraint('message_id', name=op.f('llm_generation_details_message_id_key')) + ) + with op.batch_alter_table('llm_generation_details', schema=None) as batch_op: + batch_op.create_index('idx_llm_generation_detail_message', ['message_id'], unique=False) + batch_op.create_index('idx_llm_generation_detail_workflow', ['workflow_run_id', 'node_id'], unique=False) + + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('llm_generation_details') + # ### end Alembic commands ###