From 6337a9a125fbb4a6574c923d804bda94e0bcb453 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Sun, 4 Jan 2026 01:10:50 +0800 Subject: [PATCH] WIP: feat(api): add is_resumption to node_started and workflow_started events --- .../common/workflow_response_converter.py | 1 + api/core/app/apps/workflow_app_runner.py | 1 + api/core/app/entities/queue_entities.py | 1 + api/core/app/entities/task_entities.py | 1 + .../event_management/event_handlers.py | 3 + api/core/workflow/graph_events/node.py | 1 + .../workflow/runtime/graph_runtime_state.py | 14 +++ ..._workflow_response_converter_resumption.py | 111 ++++++++++++++++++ .../entities/test_graph_runtime_state.py | 22 +++- .../event_management/test_event_handlers.py | 48 ++++++++ .../graph_engine/test_pause_resume_state.py | 13 ++ 11 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_resumption.py diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 4bad52648d..6c7089e888 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -391,6 +391,7 @@ class WorkflowResponseConverter: iteration_id=event.in_iteration_id, loop_id=event.in_loop_id, agent_strategy=event.agent_strategy, + is_resumption=event.is_resumption, ), ) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 6472351266..acc2205644 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -418,6 +418,7 @@ class WorkflowBasedAppRunner: agent_strategy=event.agent_strategy, provider_type=event.provider_type, provider_id=event.provider_id, + is_resumption=event.is_resumption, ) ) elif isinstance(event, NodeRunSucceededEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 8c96be81eb..4145cded24 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -318,6 +318,7 @@ class QueueNodeStartedEvent(AppQueueEvent): # FIXME(-LAN-): only for ToolNode, need to refactor provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType provider_id: str + is_resumption: bool = False class QueueNodeSucceededEvent(AppQueueEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index d668fbdf69..b186125425 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -307,6 +307,7 @@ class NodeStartStreamResponse(StreamResponse): iteration_id: str | None = None loop_id: str | None = None agent_strategy: AgentNodeStrategyInit | None = None + is_resumption: bool = False event: StreamEvent = StreamEvent.NODE_STARTED workflow_run_id: str diff --git a/api/core/workflow/graph_engine/event_management/event_handlers.py b/api/core/workflow/graph_engine/event_management/event_handlers.py index 5b0f56e59d..08a64fd35b 100644 --- a/api/core/workflow/graph_engine/event_management/event_handlers.py +++ b/api/core/workflow/graph_engine/event_management/event_handlers.py @@ -131,6 +131,9 @@ class EventHandler: node_execution.mark_started(event.id) self._graph_runtime_state.increment_node_run_steps() + # Mark whether this start is part of a resume flow + event.is_resumption = self._graph_runtime_state.consume_resuming_node(event.node_id) + # Track in response coordinator for stream ordering self._response_coordinator.track_node_execution(event.node_id, event.id) diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index f225798d41..c8ac1a6fa6 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -15,6 +15,7 @@ class NodeRunStartedEvent(GraphNodeEventBase): predecessor_node_id: str | None = None agent_strategy: AgentNodeStrategyInit | None = None start_at: datetime = Field(..., description="node start time") + is_resumption: bool = False # FIXME(-LAN-): only for ToolNode provider_type: str = "" diff --git a/api/core/workflow/runtime/graph_runtime_state.py b/api/core/workflow/runtime/graph_runtime_state.py index 7f99d2a3ca..e2fcc03c10 100644 --- a/api/core/workflow/runtime/graph_runtime_state.py +++ b/api/core/workflow/runtime/graph_runtime_state.py @@ -177,6 +177,9 @@ class GraphRuntimeState: self._pending_response_coordinator_dump: str | None = None self._pending_graph_execution_workflow_id: str | None = None self._paused_nodes: set[str] = set() + # Tracks nodes that are being resumed in the current execution cycle. + # Populated when paused nodes are consumed during resume. + self._resuming_nodes: set[str] = set() if graph is not None: self.attach_graph(graph) @@ -363,8 +366,19 @@ class GraphRuntimeState: nodes = list(self._paused_nodes) self._paused_nodes.clear() + # Mark these nodes as resuming so downstream handlers can annotate events. + self._resuming_nodes.update(nodes) return nodes + def consume_resuming_node(self, node_id: str) -> bool: + """ + Return True iff `node_id` is in the resuming set and remove it. + """ + if node_id in self._resuming_nodes: + self._resuming_nodes.remove(node_id) + return True + return False + # ------------------------------------------------------------------ # Builders # ------------------------------------------------------------------ diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_resumption.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_resumption.py new file mode 100644 index 0000000000..1408d52eb2 --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_resumption.py @@ -0,0 +1,111 @@ +from types import SimpleNamespace + +from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter +from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.entities.queue_entities import QueueNodeStartedEvent +from core.app.entities.task_entities import NodeStartStreamResponse +from core.workflow.entities import AgentNodeStrategyInit +from core.workflow.enums import NodeType +from core.workflow.runtime import GraphRuntimeState, VariablePool +from core.workflow.system_variable import SystemVariable + + +def _build_converter() -> WorkflowResponseConverter: + """Construct a minimal WorkflowResponseConverter for testing.""" + system_variables = SystemVariable( + files=[], + user_id="user-1", + app_id="app-1", + workflow_id="wf-1", + workflow_execution_id="run-1", + ) + runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0) + app_entity = SimpleNamespace( + task_id="task-1", + app_config=SimpleNamespace(app_id="app-1", tenant_id="tenant-1"), + invoke_from=InvokeFrom.EXPLORE, + files=[], + inputs={}, + workflow_execution_id="run-1", + call_depth=0, + ) + account = SimpleNamespace(id="acc-1", name="tester", email="tester@example.com") + return WorkflowResponseConverter( + application_generate_entity=app_entity, + user=account, + system_variables=system_variables, + ) + + +def test_node_start_stream_response_carries_resumption_flag(): + converter = _build_converter() + # Seed workflow run id for converter + converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + is_resumption=False, + ) + + queue_event = QueueNodeStartedEvent( + node_execution_id="exec-1", + node_id="node-1", + node_title="Title", + node_type=NodeType.CODE, + start_at=converter._workflow_started_at, # type: ignore[attr-defined] + agent_strategy=AgentNodeStrategyInit(name="test"), + provider_type="", + provider_id="", + is_resumption=True, + ) + + resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1") + assert isinstance(resp, NodeStartStreamResponse) + assert resp.data.is_resumption is True + + +def test_node_start_stream_response_defaults_to_false(): + converter = _build_converter() + converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + is_resumption=False, + ) + + queue_event = QueueNodeStartedEvent( + node_execution_id="exec-2", + node_id="node-2", + node_title="Title", + node_type=NodeType.CODE, + start_at=converter._workflow_started_at, # type: ignore[attr-defined] + agent_strategy=None, + provider_type="", + provider_id="", + ) + + resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1") + assert isinstance(resp, NodeStartStreamResponse) + assert resp.data.is_resumption is False + + +def test_workflow_start_stream_response_carries_resumption_flag(): + converter = _build_converter() + resp = converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + is_resumption=True, + ) + assert resp.data.is_resumption is True + + +def test_workflow_start_stream_response_defaults_to_false(): + converter = _build_converter() + resp = converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + is_resumption=False, + ) + assert resp.data.is_resumption is False diff --git a/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py b/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py index deff06fc5d..72bf6b2559 100644 --- a/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py +++ b/api/tests/unit_tests/core/workflow/entities/test_graph_runtime_state.py @@ -118,7 +118,27 @@ class TestGraphRuntimeState: from core.workflow.graph_engine.ready_queue import InMemoryReadyQueue assert isinstance(queue, InMemoryReadyQueue) - assert state.ready_queue is queue + + def test_resuming_nodes_tracking(self): + """ + Register paused nodes, consume them for resume, and ensure the + resumption marker is consumed exactly once per node. + """ + state = GraphRuntimeState(variable_pool=VariablePool(), start_at=time()) + state.register_paused_node("node-1") + state.register_paused_node("node-2") + + # Consume paused nodes to populate the resuming set + consumed = state.consume_paused_nodes() + assert set(consumed) == {"node-1", "node-2"} + + # Consume marks one-time + assert state.consume_resuming_node("node-1") is True + assert state.consume_resuming_node("node-1") is False + + # Other nodes in the consumed set still return True once when consumed + assert state.consume_resuming_node(node_id="node-2") is True + assert state.consume_resuming_node("node-2") is False def test_graph_execution_lazy_instantiation(self): state = GraphRuntimeState(variable_pool=VariablePool(), start_at=time()) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/event_management/test_event_handlers.py b/api/tests/unit_tests/core/workflow/graph_engine/event_management/test_event_handlers.py index 5d17b7a243..24c1e40034 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/event_management/test_event_handlers.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/event_management/test_event_handlers.py @@ -117,3 +117,51 @@ def test_retry_does_not_emit_additional_start_event() -> None: node_execution = graph_execution.get_or_create_node_execution(node_id) assert node_execution.retry_count == 1 + + +def test_node_start_marks_resumption_when_resuming_node() -> None: + """Ensure NodeRunStartedEvent is annotated with is_resumption when resuming.""" + + node_id = "resumed-node" + handler, event_manager, _ = _build_event_handler(node_id) + + # Simulate paused node being consumed for resume + handler._graph_runtime_state.register_paused_node(node_id) + handler._graph_runtime_state.consume_paused_nodes() + + start_event = NodeRunStartedEvent( + id="exec-1", + node_id=node_id, + node_type=NodeType.CODE, + node_title="Resumed Node", + start_at=naive_utc_now(), + ) + handler.dispatch(start_event) + + collected = event_manager._events # type: ignore[attr-defined] + assert len(collected) == 1 + emitted_event = collected[0] + assert isinstance(emitted_event, NodeRunStartedEvent) + assert emitted_event.is_resumption is True + + +def test_node_start_marks_fresh_run_as_not_resumption() -> None: + """Ensure fresh NodeRunStartedEvent carries is_resumption=False.""" + + node_id = "fresh-node" + handler, event_manager, _ = _build_event_handler(node_id) + + start_event = NodeRunStartedEvent( + id="exec-2", + node_id=node_id, + node_type=NodeType.CODE, + node_title="Fresh Node", + start_at=naive_utc_now(), + ) + handler.dispatch(start_event) + + collected = event_manager._events # type: ignore[attr-defined] + assert len(collected) == 1 + emitted_event = collected[0] + assert isinstance(emitted_event, NodeRunStartedEvent) + assert emitted_event.is_resumption is False diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_pause_resume_state.py b/api/tests/unit_tests/core/workflow/graph_engine/test_pause_resume_state.py index 3de58e3f61..ee7477f4fa 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_pause_resume_state.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_pause_resume_state.py @@ -12,6 +12,7 @@ from core.workflow.graph_events import ( GraphRunSucceededEvent, NodeRunSucceededEvent, ) +from core.workflow.graph_events.graph import GraphRunStartedEvent from core.workflow.nodes.base.entities import OutputVariableEntity from core.workflow.nodes.end.end_node import EndNode from core.workflow.nodes.end.entities import EndNodeData @@ -157,6 +158,10 @@ def test_engine_resume_restores_state_and_completion(): baseline_repo = _mock_form_repository_with_submission(action_id="continue") baseline_graph = _build_human_input_graph(baseline_state, baseline_repo) baseline_events = _run_graph(baseline_graph, baseline_state) + assert baseline_events + first_paused_event = baseline_events[0] + assert isinstance(first_paused_event, GraphRunStartedEvent) + assert first_paused_event.is_resumption is False assert isinstance(baseline_events[-1], GraphRunSucceededEvent) baseline_success_nodes = _node_successes(baseline_events) @@ -165,6 +170,10 @@ def test_engine_resume_restores_state_and_completion(): pause_repo = _mock_form_repository_without_submission() paused_graph = _build_human_input_graph(paused_state, pause_repo) paused_events = _run_graph(paused_graph, paused_state) + assert paused_events + first_paused_event = paused_events[0] + assert isinstance(first_paused_event, GraphRunStartedEvent) + assert first_paused_event.is_resumption is False assert isinstance(paused_events[-1], GraphRunPausedEvent) snapshot = paused_state.dumps() @@ -173,6 +182,10 @@ def test_engine_resume_restores_state_and_completion(): resume_repo = _mock_form_repository_with_submission(action_id="continue") resumed_graph = _build_human_input_graph(resumed_state, resume_repo) resumed_events = _run_graph(resumed_graph, resumed_state) + assert resumed_events + first_resumed_event = resumed_events[0] + assert isinstance(first_resumed_event, GraphRunStartedEvent) + assert first_resumed_event.is_resumption is True assert isinstance(resumed_events[-1], GraphRunSucceededEvent) combined_success_nodes = _node_successes(paused_events) + _node_successes(resumed_events)