mirror of
https://github.com/langgenius/dify.git
synced 2026-01-14 06:07:33 +08:00
feat: add think start end tag
This commit is contained in:
parent
f3e7fea628
commit
dc8a618b6a
@ -184,6 +184,8 @@ class ChunkType(StrEnum):
|
||||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class QueueTextChunkEvent(AppQueueEvent):
|
||||
|
||||
@ -617,6 +617,8 @@ class ChunkType(StrEnum):
|
||||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class TextChunkStreamResponse(StreamResponse):
|
||||
|
||||
@ -29,6 +29,8 @@ class ChunkType(StrEnum):
|
||||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class NodeRunStreamChunkEvent(GraphNodeEventBase):
|
||||
|
||||
@ -41,6 +41,8 @@ class ChunkType(StrEnum):
|
||||
TOOL_CALL = "tool_call" # Tool call arguments streaming
|
||||
TOOL_RESULT = "tool_result" # Tool execution result
|
||||
THOUGHT = "thought" # Agent thinking process (ReAct)
|
||||
THOUGHT_START = "thought_start" # Agent thought start
|
||||
THOUGHT_END = "thought_end" # Agent thought end
|
||||
|
||||
|
||||
class StreamChunkEvent(NodeEventBase):
|
||||
@ -70,6 +72,18 @@ class ToolResultChunkEvent(StreamChunkEvent):
|
||||
tool_result: ToolResult | None = Field(default=None, description="structured tool result payload")
|
||||
|
||||
|
||||
class ThoughtStartChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought start streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_START, frozen=True)
|
||||
|
||||
|
||||
class ThoughtEndChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought end streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
chunk_type: ChunkType = Field(default=ChunkType.THOUGHT_END, frozen=True)
|
||||
|
||||
|
||||
class ThoughtChunkEvent(StreamChunkEvent):
|
||||
"""Agent thought streaming event - Agent thinking process (ReAct)."""
|
||||
|
||||
|
||||
@ -163,6 +163,7 @@ class ThinkTagStreamParser:
|
||||
thought_text = self._buffer[: end_match.start()]
|
||||
if thought_text:
|
||||
parts.append(("thought", thought_text))
|
||||
parts.append(("thought_end", ""))
|
||||
self._buffer = self._buffer[end_match.end() :]
|
||||
self._in_think = False
|
||||
continue
|
||||
@ -180,6 +181,7 @@ class ThinkTagStreamParser:
|
||||
if prefix:
|
||||
parts.append(("text", prefix))
|
||||
self._buffer = self._buffer[start_match.end() :]
|
||||
parts.append(("thought_start", ""))
|
||||
self._in_think = True
|
||||
continue
|
||||
|
||||
@ -195,7 +197,7 @@ class ThinkTagStreamParser:
|
||||
# Extra safeguard: strip any stray tags that slipped through.
|
||||
content = self._START_PATTERN.sub("", content)
|
||||
content = self._END_PATTERN.sub("", content)
|
||||
if content:
|
||||
if content or kind in {"thought_start", "thought_end"}:
|
||||
cleaned_parts.append((kind, content))
|
||||
|
||||
return cleaned_parts
|
||||
@ -210,12 +212,19 @@ class ThinkTagStreamParser:
|
||||
if content.lower().startswith(self._START_PREFIX) or content.lower().startswith(self._END_PREFIX):
|
||||
content = ""
|
||||
self._buffer = ""
|
||||
if not content:
|
||||
if not content and not self._in_think:
|
||||
return []
|
||||
# Strip any complete tags that might still be present.
|
||||
content = self._START_PATTERN.sub("", content)
|
||||
content = self._END_PATTERN.sub("", content)
|
||||
return [(kind, content)] if content else []
|
||||
|
||||
result: list[tuple[str, str]] = []
|
||||
if content:
|
||||
result.append((kind, content))
|
||||
if self._in_think:
|
||||
result.append(("thought_end", ""))
|
||||
self._in_think = False
|
||||
return result
|
||||
|
||||
|
||||
class StreamBuffers(BaseModel):
|
||||
|
||||
@ -80,6 +80,7 @@ from core.workflow.node_events import (
|
||||
ToolCallChunkEvent,
|
||||
ToolResultChunkEvent,
|
||||
)
|
||||
from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent
|
||||
from core.workflow.nodes.base.entities import VariableSelector
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
|
||||
@ -565,15 +566,28 @@ class LLMNode(Node[LLMNodeData]):
|
||||
# Generation output: split out thoughts, forward only non-thought content chunks
|
||||
for kind, segment in think_parser.process(text_part):
|
||||
if not segment:
|
||||
continue
|
||||
if kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
reasoning_chunks.append(segment)
|
||||
yield ThoughtChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "generation", "content"],
|
||||
@ -596,15 +610,27 @@ class LLMNode(Node[LLMNodeData]):
|
||||
raise LLMNodeError(f"Failed to parse structured output: {e}")
|
||||
|
||||
for kind, segment in think_parser.flush():
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
reasoning_chunks.append(segment)
|
||||
yield ThoughtChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "generation", "content"],
|
||||
@ -1811,10 +1837,17 @@ class LLMNode(Node[LLMNodeData]):
|
||||
chunk_text = str(chunk_text)
|
||||
|
||||
for kind, segment in buffers.think_parser.process(chunk_text):
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
buffers.current_turn_reasoning.append(segment)
|
||||
buffers.pending_thought.append(segment)
|
||||
@ -1823,6 +1856,13 @@ class LLMNode(Node[LLMNodeData]):
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
aggregate.text += segment
|
||||
@ -1848,9 +1888,16 @@ class LLMNode(Node[LLMNodeData]):
|
||||
self, buffers: StreamBuffers, trace_state: TraceState, aggregate: AggregatedResult
|
||||
) -> Generator[NodeEventBase, None, None]:
|
||||
for kind, segment in buffers.think_parser.flush():
|
||||
if not segment:
|
||||
if not segment and kind not in {"thought_start", "thought_end"}:
|
||||
continue
|
||||
if kind == "thought":
|
||||
if kind == "thought_start":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
yield ThoughtStartChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought":
|
||||
self._flush_content_segment(buffers, trace_state)
|
||||
buffers.current_turn_reasoning.append(segment)
|
||||
buffers.pending_thought.append(segment)
|
||||
@ -1859,6 +1906,13 @@ class LLMNode(Node[LLMNodeData]):
|
||||
chunk=segment,
|
||||
is_final=False,
|
||||
)
|
||||
elif kind == "thought_end":
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
yield ThoughtEndChunkEvent(
|
||||
selector=[self._node_id, "generation", "thought"],
|
||||
chunk="",
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
self._flush_thought_segment(buffers, trace_state)
|
||||
aggregate.text += segment
|
||||
|
||||
Loading…
Reference in New Issue
Block a user