fix: llm generation variable

This commit is contained in:
Novice 2025-12-12 11:08:49 +08:00
parent abb2b860f2
commit 9ce48b4dc4
No known key found for this signature in database
GPG Key ID: EE3F68E3105DAAAB
4 changed files with 251 additions and 101 deletions

View File

@ -141,6 +141,9 @@ class StreamEventBuffer:
def record_tool_call(self, tool_call_id: str, tool_name: str, tool_arguments: str) -> None:
"""Record a tool call event."""
if not tool_call_id:
return
# Flush any pending reasoning first
if self._last_event_type == "thought":
self._flush_current_reasoning()
@ -168,6 +171,8 @@ class StreamEventBuffer:
def record_tool_result(self, tool_call_id: str, result: str) -> None:
"""Record a tool result event (update existing tool call)."""
if not tool_call_id:
return
if tool_call_id in self._tool_call_id_map:
idx = self._tool_call_id_map[tool_call_id]
self.tool_calls[idx]["result"] = result

View File

@ -3,7 +3,9 @@ from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator
from core.file import File
from core.model_runtime.entities import ImagePromptMessageContent, LLMMode
from core.model_runtime.entities.llm_entities import LLMUsage
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
from core.tools.entities.tool_entities import ToolProviderType
from core.workflow.nodes.base import BaseNodeData
@ -17,6 +19,22 @@ class ModelConfig(BaseModel):
completion_params: dict[str, Any] = Field(default_factory=dict)
class LLMGenerationData(BaseModel):
"""Generation data from LLM invocation with tools.
For multi-turn tool calls like: thought1 -> text1 -> tool_call1 -> thought2 -> text2 -> tool_call2
- reasoning_contents: [thought1, thought2, ...] - one element per turn
- tool_calls: [{id, name, arguments, result}, ...] - all tool calls with results
"""
text: str = Field(..., description="Accumulated text content from all turns")
reasoning_contents: list[str] = Field(default_factory=list, description="Reasoning content per turn")
tool_calls: list[dict[str, Any]] = Field(default_factory=list, description="Tool calls with results")
usage: LLMUsage = Field(..., description="LLM usage statistics")
finish_reason: str | None = Field(None, description="Finish reason from LLM")
files: list[File] = Field(default_factory=list, description="Generated files")
class ContextConfig(BaseModel):
enabled: bool
variable_selector: list[str] | None = None

View File

@ -83,6 +83,7 @@ from core.workflow.runtime import VariablePool
from . import llm_utils
from .entities import (
LLMGenerationData,
LLMNodeChatModelMessage,
LLMNodeCompletionModelPromptTemplate,
LLMNodeData,
@ -148,10 +149,84 @@ class LLMNode(Node[LLMNodeData]):
def version(cls) -> str:
return "1"
def _stream_llm_events(
self,
generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None],
*,
model_instance: ModelInstance,
) -> Generator[
NodeEventBase,
None,
tuple[
str,
str,
LLMUsage,
str | None,
LLMStructuredOutput | None,
LLMGenerationData | None,
],
]:
"""
Stream events and capture generator return value in one place.
Uses generator delegation so _run stays concise while still emitting events.
"""
clean_text = ""
reasoning_content = ""
usage = LLMUsage.empty_usage()
finish_reason: str | None = None
structured_output: LLMStructuredOutput | None = None
generation_data: LLMGenerationData | None = None
completed = False
while True:
try:
event = next(generator)
except StopIteration as exc:
if isinstance(exc.value, LLMGenerationData):
generation_data = exc.value
break
if completed:
# After completion we still drain to reach StopIteration.value
continue
match event:
case StreamChunkEvent() | ThoughtChunkEvent():
yield event
case ModelInvokeCompletedEvent(
text=text,
usage=usage_event,
finish_reason=finish_reason_event,
reasoning_content=reasoning_event,
structured_output=structured_raw,
):
clean_text = text
usage = usage_event
finish_reason = finish_reason_event
reasoning_content = reasoning_event or ""
if self.node_data.reasoning_format != "tagged":
clean_text, _ = LLMNode._split_reasoning(clean_text, self.node_data.reasoning_format)
structured_output = (
LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None
)
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
completed = True
case LLMStructuredOutput():
structured_output = event
case _:
continue
return clean_text, reasoning_content, usage, finish_reason, structured_output, generation_data
def _run(self) -> Generator:
node_inputs: dict[str, Any] = {}
process_data: dict[str, Any] = {}
result_text = ""
clean_text = ""
usage = LLMUsage.empty_usage()
finish_reason = None
@ -240,10 +315,13 @@ class LLMNode(Node[LLMNodeData]):
tenant_id=self.tenant_id,
)
# Variables for outputs
generation_data: LLMGenerationData | None = None
structured_output: LLMStructuredOutput | None = None
# Check if tools are configured
if self.tool_call_enabled:
# Use tool-enabled invocation (Agent V2 style)
# This generator handles all events including final events
generator = self._invoke_llm_with_tools(
model_instance=model_instance,
prompt_messages=prompt_messages,
@ -253,10 +331,6 @@ class LLMNode(Node[LLMNodeData]):
node_inputs=node_inputs,
process_data=process_data,
)
# Forward all events and return early since _invoke_llm_with_tools
# already sends final event and StreamCompletedEvent
yield from generator
return
else:
# Use traditional LLM invocation
generator = LLMNode.invoke_llm(
@ -274,39 +348,23 @@ class LLMNode(Node[LLMNodeData]):
reasoning_format=self._node_data.reasoning_format,
)
structured_output: LLMStructuredOutput | None = None
(
clean_text,
reasoning_content,
usage,
finish_reason,
structured_output,
generation_data,
) = yield from self._stream_llm_events(generator, model_instance=model_instance)
for event in generator:
if isinstance(event, (StreamChunkEvent, ThoughtChunkEvent)):
yield event
elif isinstance(event, ModelInvokeCompletedEvent):
# Raw text
result_text = event.text
usage = event.usage
finish_reason = event.finish_reason
reasoning_content = event.reasoning_content or ""
# For downstream nodes, determine clean text based on reasoning_format
if self.node_data.reasoning_format == "tagged":
# Keep <think> tags for backward compatibility
clean_text = result_text
else:
# Extract clean text from <think> tags
clean_text, _ = LLMNode._split_reasoning(result_text, self.node_data.reasoning_format)
# Process structured output if available from the event.
structured_output = (
LLMStructuredOutput(structured_output=event.structured_output)
if event.structured_output
else None
)
# deduct quota
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
break
elif isinstance(event, LLMStructuredOutput):
structured_output = event
# Extract variables from generation_data if available
if generation_data:
clean_text = generation_data.text
reasoning_content = ""
usage = generation_data.usage
finish_reason = generation_data.finish_reason
# Unified process_data building
process_data = {
"model_mode": model_config.mode,
"prompts": PromptMessageUtil.prompt_messages_to_prompt_for_saving(
@ -318,38 +376,56 @@ class LLMNode(Node[LLMNodeData]):
"model_name": model_config.model,
}
# Unified outputs building
outputs = {
"text": clean_text,
"reasoning_content": reasoning_content,
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"generation": {
}
# Build generation field
if generation_data:
# Use generation_data from tool invocation (supports multi-turn)
generation = {
"content": generation_data.text,
"reasoning_content": generation_data.reasoning_contents, # [thought1, thought2, ...]
"tool_calls": generation_data.tool_calls,
}
files_to_output = generation_data.files
else:
# Traditional LLM invocation
generation = {
"content": clean_text,
"reasoning_content": [reasoning_content] if reasoning_content else [],
"tool_calls": [],
},
}
}
files_to_output = self._file_outputs
outputs["generation"] = generation
if files_to_output:
outputs["files"] = ArrayFileSegment(value=files_to_output)
if structured_output:
outputs["structured_output"] = structured_output.structured_output
if self._file_outputs:
outputs["files"] = ArrayFileSegment(value=self._file_outputs)
# Send final chunk event to indicate streaming is complete
yield StreamChunkEvent(
selector=[self._node_id, "text"],
chunk="",
is_final=True,
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "content"],
chunk="",
is_final=True,
)
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=True,
)
if not self.tool_call_enabled:
# For tool calls, final events are already sent in _process_tool_outputs
yield StreamChunkEvent(
selector=[self._node_id, "text"],
chunk="",
is_final=True,
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "content"],
chunk="",
is_final=True,
)
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=True,
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
@ -1313,8 +1389,11 @@ class LLMNode(Node[LLMNodeData]):
variable_pool: VariablePool,
node_inputs: dict[str, Any],
process_data: dict[str, Any],
) -> Generator[NodeEventBase, None, None]:
"""Invoke LLM with tools support (from Agent V2)."""
) -> Generator[NodeEventBase, None, LLMGenerationData]:
"""Invoke LLM with tools support (from Agent V2).
Returns LLMGenerationData with text, reasoning_contents, tool_calls, usage, finish_reason, files
"""
# Get model features to determine strategy
model_features = self._get_model_features(model_instance)
@ -1342,8 +1421,9 @@ class LLMNode(Node[LLMNodeData]):
stream=True,
)
# Process outputs
yield from self._process_tool_outputs(outputs, strategy, node_inputs, process_data)
# Process outputs and return generation result
result = yield from self._process_tool_outputs(outputs, strategy, node_inputs, process_data)
return result
def _get_model_features(self, model_instance: ModelInstance) -> list[ModelFeature]:
"""Get model schema to determine features."""
@ -1440,8 +1520,11 @@ class LLMNode(Node[LLMNodeData]):
strategy: Any,
node_inputs: dict[str, Any],
process_data: dict[str, Any],
) -> Generator[NodeEventBase, None, None]:
"""Process strategy outputs and convert to node events."""
) -> Generator[NodeEventBase, None, LLMGenerationData]:
"""Process strategy outputs and convert to node events.
Returns LLMGenerationData with text, reasoning_contents, tool_calls, usage, finish_reason, files
"""
text = ""
files: list[File] = []
usage = LLMUsage.empty_usage()
@ -1450,7 +1533,9 @@ class LLMNode(Node[LLMNodeData]):
agent_result: AgentResult | None = None
think_parser = llm_utils.ThinkTagStreamParser()
reasoning_chunks: list[str] = []
# Track reasoning per turn: each tool_call completion marks end of a turn
current_turn_reasoning: list[str] = [] # Buffer for current turn's thought chunks
reasoning_per_turn: list[str] = [] # Final list: one element per turn
# Process each output from strategy
try:
@ -1532,6 +1617,11 @@ class LLMNode(Node[LLMNodeData]):
is_final=False,
)
# End of current turn: save accumulated thought as one element
if current_turn_reasoning:
reasoning_per_turn.append("".join(current_turn_reasoning))
current_turn_reasoning.clear()
elif isinstance(output, LLMResultChunk):
# Handle LLM result chunks - only process text content
message = output.delta.message
@ -1549,7 +1639,7 @@ class LLMNode(Node[LLMNodeData]):
continue
if kind == "thought":
reasoning_chunks.append(segment)
current_turn_reasoning.append(segment)
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk=segment,
@ -1594,7 +1684,7 @@ class LLMNode(Node[LLMNodeData]):
if not segment:
continue
if kind == "thought":
reasoning_chunks.append(segment)
current_turn_reasoning.append(segment)
yield ThoughtChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk=segment,
@ -1613,6 +1703,10 @@ class LLMNode(Node[LLMNodeData]):
is_final=False,
)
# Save the last turn's thought if any
if current_turn_reasoning:
reasoning_per_turn.append("".join(current_turn_reasoning))
# Send final events for all streams
yield StreamChunkEvent(
selector=[self._node_id, "text"],
@ -1653,45 +1747,32 @@ class LLMNode(Node[LLMNodeData]):
is_final=True,
)
# Build generation field from agent_logs
# Build tool_calls from agent_logs (with results)
tool_calls_for_generation = []
for log in agent_logs:
if log.label == "Tool Call":
tool_call_data = {
"id": log.data.get("tool_call_id", ""),
"name": log.data.get("tool_name", ""),
"arguments": json.dumps(log.data.get("tool_args", {})),
"result": log.data.get("output", ""),
}
tool_calls_for_generation.append(tool_call_data)
tool_call_id = log.data.get("tool_call_id")
if not tool_call_id or log.status == AgentLog.LogStatus.START.value:
continue
# Complete with results
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={
"text": text,
"files": ArrayFileSegment(value=files),
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"generation": {
"reasoning_content": ["".join(reasoning_chunks)] if reasoning_chunks else [],
"tool_calls": tool_calls_for_generation,
"content": text,
},
},
inputs={
**node_inputs,
"tools": [
{"provider_id": tool.provider_name, "tool_name": tool.tool_name}
for tool in self._node_data.tools
]
if self._node_data.tools
else [],
},
process_data=process_data,
llm_usage=usage,
tool_args = log.data.get("tool_args") or {}
tool_calls_for_generation.append(
{
"id": tool_call_id,
"name": log.data.get("tool_name", ""),
"arguments": json.dumps(tool_args) if tool_args else "",
# Prefer output, fall back to error text if present
"result": log.data.get("output") or log.data.get("error") or "",
}
)
# Return generation data for caller
return LLMGenerationData(
text=text,
reasoning_contents=reasoning_per_turn, # Multi-turn: [thought1, thought2, ...]
tool_calls=tool_calls_for_generation,
usage=usage,
finish_reason=finish_reason,
files=files,
)
def _accumulate_usage(self, total_usage: LLMUsage, delta_usage: LLMUsage) -> None:

View File

@ -0,0 +1,46 @@
"""add llm generation detail table.
Revision ID: 85c8b4a64f53
Revises: 7bb281b7a422
Create Date: 2025-12-10 16:17:46.597669
"""
from alembic import op
import models as models
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '85c8b4a64f53'
down_revision = '7bb281b7a422'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('llm_generation_details',
sa.Column('id', models.types.StringUUID(), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('message_id', models.types.StringUUID(), nullable=True),
sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True),
sa.Column('node_id', sa.String(length=255), nullable=True),
sa.Column('reasoning_content', models.types.LongText(), nullable=True),
sa.Column('tool_calls', models.types.LongText(), nullable=True),
sa.Column('sequence', models.types.LongText(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.CheckConstraint('(message_id IS NOT NULL AND workflow_run_id IS NULL AND node_id IS NULL) OR (message_id IS NULL AND workflow_run_id IS NOT NULL AND node_id IS NOT NULL)', name=op.f('llm_generation_details_ck_llm_generation_detail_assoc_mode_check')),
sa.PrimaryKeyConstraint('id', name='llm_generation_detail_pkey'),
sa.UniqueConstraint('message_id', name=op.f('llm_generation_details_message_id_key'))
)
with op.batch_alter_table('llm_generation_details', schema=None) as batch_op:
batch_op.create_index('idx_llm_generation_detail_message', ['message_id'], unique=False)
batch_op.create_index('idx_llm_generation_detail_workflow', ['workflow_run_id', 'node_id'], unique=False)
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('llm_generation_details')
# ### end Alembic commands ###