WIP: feat(api): add is_resumption to node_started and workflow_started events

This commit is contained in:
QuantumGhost 2026-01-04 01:10:50 +08:00
parent f4642f85b7
commit 6337a9a125
11 changed files with 215 additions and 1 deletions

View File

@ -391,6 +391,7 @@ class WorkflowResponseConverter:
iteration_id=event.in_iteration_id,
loop_id=event.in_loop_id,
agent_strategy=event.agent_strategy,
is_resumption=event.is_resumption,
),
)

View File

@ -418,6 +418,7 @@ class WorkflowBasedAppRunner:
agent_strategy=event.agent_strategy,
provider_type=event.provider_type,
provider_id=event.provider_id,
is_resumption=event.is_resumption,
)
)
elif isinstance(event, NodeRunSucceededEvent):

View File

@ -318,6 +318,7 @@ class QueueNodeStartedEvent(AppQueueEvent):
# FIXME(-LAN-): only for ToolNode, need to refactor
provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType
provider_id: str
is_resumption: bool = False
class QueueNodeSucceededEvent(AppQueueEvent):

View File

@ -307,6 +307,7 @@ class NodeStartStreamResponse(StreamResponse):
iteration_id: str | None = None
loop_id: str | None = None
agent_strategy: AgentNodeStrategyInit | None = None
is_resumption: bool = False
event: StreamEvent = StreamEvent.NODE_STARTED
workflow_run_id: str

View File

@ -131,6 +131,9 @@ class EventHandler:
node_execution.mark_started(event.id)
self._graph_runtime_state.increment_node_run_steps()
# Mark whether this start is part of a resume flow
event.is_resumption = self._graph_runtime_state.consume_resuming_node(event.node_id)
# Track in response coordinator for stream ordering
self._response_coordinator.track_node_execution(event.node_id, event.id)

View File

@ -15,6 +15,7 @@ class NodeRunStartedEvent(GraphNodeEventBase):
predecessor_node_id: str | None = None
agent_strategy: AgentNodeStrategyInit | None = None
start_at: datetime = Field(..., description="node start time")
is_resumption: bool = False
# FIXME(-LAN-): only for ToolNode
provider_type: str = ""

View File

@ -177,6 +177,9 @@ class GraphRuntimeState:
self._pending_response_coordinator_dump: str | None = None
self._pending_graph_execution_workflow_id: str | None = None
self._paused_nodes: set[str] = set()
# Tracks nodes that are being resumed in the current execution cycle.
# Populated when paused nodes are consumed during resume.
self._resuming_nodes: set[str] = set()
if graph is not None:
self.attach_graph(graph)
@ -363,8 +366,19 @@ class GraphRuntimeState:
nodes = list(self._paused_nodes)
self._paused_nodes.clear()
# Mark these nodes as resuming so downstream handlers can annotate events.
self._resuming_nodes.update(nodes)
return nodes
def consume_resuming_node(self, node_id: str) -> bool:
"""
Return True iff `node_id` is in the resuming set and remove it.
"""
if node_id in self._resuming_nodes:
self._resuming_nodes.remove(node_id)
return True
return False
# ------------------------------------------------------------------
# Builders
# ------------------------------------------------------------------

View File

@ -0,0 +1,111 @@
from types import SimpleNamespace
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueNodeStartedEvent
from core.app.entities.task_entities import NodeStartStreamResponse
from core.workflow.entities import AgentNodeStrategyInit
from core.workflow.enums import NodeType
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
def _build_converter() -> WorkflowResponseConverter:
"""Construct a minimal WorkflowResponseConverter for testing."""
system_variables = SystemVariable(
files=[],
user_id="user-1",
app_id="app-1",
workflow_id="wf-1",
workflow_execution_id="run-1",
)
runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0)
app_entity = SimpleNamespace(
task_id="task-1",
app_config=SimpleNamespace(app_id="app-1", tenant_id="tenant-1"),
invoke_from=InvokeFrom.EXPLORE,
files=[],
inputs={},
workflow_execution_id="run-1",
call_depth=0,
)
account = SimpleNamespace(id="acc-1", name="tester", email="tester@example.com")
return WorkflowResponseConverter(
application_generate_entity=app_entity,
user=account,
system_variables=system_variables,
)
def test_node_start_stream_response_carries_resumption_flag():
converter = _build_converter()
# Seed workflow run id for converter
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id="exec-1",
node_id="node-1",
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=AgentNodeStrategyInit(name="test"),
provider_type="",
provider_id="",
is_resumption=True,
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
assert resp.data.is_resumption is True
def test_node_start_stream_response_defaults_to_false():
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id="exec-2",
node_id="node-2",
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=None,
provider_type="",
provider_id="",
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
assert resp.data.is_resumption is False
def test_workflow_start_stream_response_carries_resumption_flag():
converter = _build_converter()
resp = converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=True,
)
assert resp.data.is_resumption is True
def test_workflow_start_stream_response_defaults_to_false():
converter = _build_converter()
resp = converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
assert resp.data.is_resumption is False

View File

@ -118,7 +118,27 @@ class TestGraphRuntimeState:
from core.workflow.graph_engine.ready_queue import InMemoryReadyQueue
assert isinstance(queue, InMemoryReadyQueue)
assert state.ready_queue is queue
def test_resuming_nodes_tracking(self):
"""
Register paused nodes, consume them for resume, and ensure the
resumption marker is consumed exactly once per node.
"""
state = GraphRuntimeState(variable_pool=VariablePool(), start_at=time())
state.register_paused_node("node-1")
state.register_paused_node("node-2")
# Consume paused nodes to populate the resuming set
consumed = state.consume_paused_nodes()
assert set(consumed) == {"node-1", "node-2"}
# Consume marks one-time
assert state.consume_resuming_node("node-1") is True
assert state.consume_resuming_node("node-1") is False
# Other nodes in the consumed set still return True once when consumed
assert state.consume_resuming_node(node_id="node-2") is True
assert state.consume_resuming_node("node-2") is False
def test_graph_execution_lazy_instantiation(self):
state = GraphRuntimeState(variable_pool=VariablePool(), start_at=time())

View File

@ -117,3 +117,51 @@ def test_retry_does_not_emit_additional_start_event() -> None:
node_execution = graph_execution.get_or_create_node_execution(node_id)
assert node_execution.retry_count == 1
def test_node_start_marks_resumption_when_resuming_node() -> None:
"""Ensure NodeRunStartedEvent is annotated with is_resumption when resuming."""
node_id = "resumed-node"
handler, event_manager, _ = _build_event_handler(node_id)
# Simulate paused node being consumed for resume
handler._graph_runtime_state.register_paused_node(node_id)
handler._graph_runtime_state.consume_paused_nodes()
start_event = NodeRunStartedEvent(
id="exec-1",
node_id=node_id,
node_type=NodeType.CODE,
node_title="Resumed Node",
start_at=naive_utc_now(),
)
handler.dispatch(start_event)
collected = event_manager._events # type: ignore[attr-defined]
assert len(collected) == 1
emitted_event = collected[0]
assert isinstance(emitted_event, NodeRunStartedEvent)
assert emitted_event.is_resumption is True
def test_node_start_marks_fresh_run_as_not_resumption() -> None:
"""Ensure fresh NodeRunStartedEvent carries is_resumption=False."""
node_id = "fresh-node"
handler, event_manager, _ = _build_event_handler(node_id)
start_event = NodeRunStartedEvent(
id="exec-2",
node_id=node_id,
node_type=NodeType.CODE,
node_title="Fresh Node",
start_at=naive_utc_now(),
)
handler.dispatch(start_event)
collected = event_manager._events # type: ignore[attr-defined]
assert len(collected) == 1
emitted_event = collected[0]
assert isinstance(emitted_event, NodeRunStartedEvent)
assert emitted_event.is_resumption is False

View File

@ -12,6 +12,7 @@ from core.workflow.graph_events import (
GraphRunSucceededEvent,
NodeRunSucceededEvent,
)
from core.workflow.graph_events.graph import GraphRunStartedEvent
from core.workflow.nodes.base.entities import OutputVariableEntity
from core.workflow.nodes.end.end_node import EndNode
from core.workflow.nodes.end.entities import EndNodeData
@ -157,6 +158,10 @@ def test_engine_resume_restores_state_and_completion():
baseline_repo = _mock_form_repository_with_submission(action_id="continue")
baseline_graph = _build_human_input_graph(baseline_state, baseline_repo)
baseline_events = _run_graph(baseline_graph, baseline_state)
assert baseline_events
first_paused_event = baseline_events[0]
assert isinstance(first_paused_event, GraphRunStartedEvent)
assert first_paused_event.is_resumption is False
assert isinstance(baseline_events[-1], GraphRunSucceededEvent)
baseline_success_nodes = _node_successes(baseline_events)
@ -165,6 +170,10 @@ def test_engine_resume_restores_state_and_completion():
pause_repo = _mock_form_repository_without_submission()
paused_graph = _build_human_input_graph(paused_state, pause_repo)
paused_events = _run_graph(paused_graph, paused_state)
assert paused_events
first_paused_event = paused_events[0]
assert isinstance(first_paused_event, GraphRunStartedEvent)
assert first_paused_event.is_resumption is False
assert isinstance(paused_events[-1], GraphRunPausedEvent)
snapshot = paused_state.dumps()
@ -173,6 +182,10 @@ def test_engine_resume_restores_state_and_completion():
resume_repo = _mock_form_repository_with_submission(action_id="continue")
resumed_graph = _build_human_input_graph(resumed_state, resume_repo)
resumed_events = _run_graph(resumed_graph, resumed_state)
assert resumed_events
first_resumed_event = resumed_events[0]
assert isinstance(first_resumed_event, GraphRunStartedEvent)
assert first_resumed_event.is_resumption is True
assert isinstance(resumed_events[-1], GraphRunSucceededEvent)
combined_success_nodes = _node_successes(paused_events) + _node_successes(resumed_events)